From 01cba9ba44d2df09faeb4cb4a39aae0233da3ee9 Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Thu, 12 Oct 2017 10:06:53 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20adjustable=20prefetch=20limit?= =?UTF-8?q?=20to=20listeners=20and=20endpoints?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This uses the 'global' flag to achieve per-channel behaviour in RabbitMQ v3.3.0+. We already use 3.5.0+ for priority queues, so we're safe here. --- lib/Endpoint.js | 7 +++++-- lib/Listener.js | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/Endpoint.js b/lib/Endpoint.js index a2a7606..d82fa79 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -118,7 +118,7 @@ class Endpoint { } } - async _setup ({ queue, event }) { + async _setup ({ queue, event, prefetch = 48 }) { try { const worker = await this._remit._workers.acquire() @@ -142,7 +142,10 @@ class Endpoint { this._consumer.on('close', () => { throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying')) }) - this._consumer.prefetch(48) + + if (prefetch > 0) { + this._consumer.prefetch(prefetch, true) + } await this._consumer.bindQueue( queue, diff --git a/lib/Listener.js b/lib/Listener.js index b2a2194..696730e 100644 --- a/lib/Listener.js +++ b/lib/Listener.js @@ -94,7 +94,7 @@ class Listener { this._consumer.ack(message) } - async _setup ({ queue, event }) { + async _setup ({ queue, event, prefetch = 48 }) { try { const worker = await this._remit._workers.acquire() @@ -118,7 +118,10 @@ class Listener { this._consumer.on('close', () => { throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying')) }) - this._consumer.prefetch(48) + + if (prefetch > 0) { + this._consumer.prefetch(prefetch, true) + } await this._consumer.bindQueue( queue,