Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
✨ Add adjustable prefetch limit to listeners and endpoints
Browse files Browse the repository at this point in the history
This uses the 'global' flag to achieve per-channel behaviour in RabbitMQ v3.3.0+.
We already use 3.5.0+ for priority queues, so we're safe here.
  • Loading branch information
jpwilliams committed Oct 12, 2017
1 parent 70ea11d commit 01cba9b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
7 changes: 5 additions & 2 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class Endpoint {
}
}

async _setup ({ queue, event }) {
async _setup ({ queue, event, prefetch = 48 }) {
try {
const worker = await this._remit._workers.acquire()

Expand All @@ -142,7 +142,10 @@ class Endpoint {
this._consumer.on('close', () => {
throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying'))
})
this._consumer.prefetch(48)

if (prefetch > 0) {
this._consumer.prefetch(prefetch, true)
}

await this._consumer.bindQueue(
queue,
Expand Down
7 changes: 5 additions & 2 deletions lib/Listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class Listener {
this._consumer.ack(message)
}

async _setup ({ queue, event }) {
async _setup ({ queue, event, prefetch = 48 }) {
try {
const worker = await this._remit._workers.acquire()

Expand All @@ -118,7 +118,10 @@ class Listener {
this._consumer.on('close', () => {
throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying'))
})
this._consumer.prefetch(48)

if (prefetch > 0) {
this._consumer.prefetch(prefetch, true)
}

await this._consumer.bindQueue(
queue,
Expand Down

0 comments on commit 01cba9b

Please sign in to comment.