Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
🚩(listener): add 'subscribe' option, allowing pubsub behaviour
Browse files Browse the repository at this point in the history
If the 'subscribe' option is truthy, create a non-permanent queue to subscribe to emitted events as long as the consumer is alive.
Great for places where you just want usual pubsub behaviour without fucking with service names.
  • Loading branch information
jpwilliams committed Apr 16, 2018
1 parent 9416e04 commit 3675c3c
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions lib/Listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Listener {

try {
consumeResult = await this._consumer.consume(
this._options.queue,
this._consumerQueueName,
bubble.bind(this._incoming.bind(this)),
{
noAck: false,
Expand Down Expand Up @@ -174,17 +174,18 @@ class Listener {
this._consumer.ack(message)
}

async _setup ({ queue, event, prefetch = 48 }) {
async _setup ({ queue, event, prefetch = 48, subscribe }) {
this._starting = true

try {
const worker = await this._remit._workers.acquire()
let ok

try {
await worker.assertQueue(queue, {
exclusive: false,
durable: true,
autoDelete: false,
ok = await worker.assertQueue(subscribe ? '' : queue, {
exclusive: subscribe,
durable: !subscribe,
autoDelete: subscribe,
maxPriority: 10
})

Expand All @@ -195,6 +196,7 @@ class Listener {
throw e
}

this._consumerQueueName = ok.queue
const connection = await this._remit._connection
this._consumer = await connection.createChannel()
this._consumer.on('error', console.error)
Expand All @@ -207,7 +209,7 @@ class Listener {
}

await this._consumer.bindQueue(
queue,
this._consumerQueueName,
this._remit._exchange,
event
)
Expand Down

0 comments on commit 3675c3c

Please sign in to comment.