diff --git a/lib/Endpoint.js b/lib/Endpoint.js index a2a7606..d82fa79 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -118,7 +118,7 @@ class Endpoint { } } - async _setup ({ queue, event }) { + async _setup ({ queue, event, prefetch = 48 }) { try { const worker = await this._remit._workers.acquire() @@ -142,7 +142,10 @@ class Endpoint { this._consumer.on('close', () => { throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying')) }) - this._consumer.prefetch(48) + + if (prefetch > 0) { + this._consumer.prefetch(prefetch, true) + } await this._consumer.bindQueue( queue, diff --git a/lib/Listener.js b/lib/Listener.js index b2a2194..696730e 100644 --- a/lib/Listener.js +++ b/lib/Listener.js @@ -94,7 +94,7 @@ class Listener { this._consumer.ack(message) } - async _setup ({ queue, event }) { + async _setup ({ queue, event, prefetch = 48 }) { try { const worker = await this._remit._workers.acquire() @@ -118,7 +118,10 @@ class Listener { this._consumer.on('close', () => { throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying')) }) - this._consumer.prefetch(48) + + if (prefetch > 0) { + this._consumer.prefetch(prefetch, true) + } await this._consumer.bindQueue( queue,