From 50afde24178fe092d116a24b3a2261c3df31764a Mon Sep 17 00:00:00 2001 From: wangruiyuan Date: Mon, 5 Nov 2018 17:44:30 +0800 Subject: [PATCH 1/2] fix bug channel create too much --- src/brokers/amqp.ts | 9 +++++++-- src/brokers/interface.ts | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/brokers/amqp.ts b/src/brokers/amqp.ts index 693cf55..e6bfda6 100644 --- a/src/brokers/amqp.ts +++ b/src/brokers/amqp.ts @@ -89,7 +89,7 @@ export class AMQPBroker extends Broker { }, options); this.options.queueOptions = _.assign({ - durable: true, + durable: true, }, options.queueOptions); } @@ -184,6 +184,7 @@ export class AMQPBroker extends Broker { this.emit('close', 'channel'); eventLog('Channel close'); register.channel = null; + register.channeling = false; }); await channel.assertExchange( @@ -234,7 +235,11 @@ export class AMQPBroker extends Broker { } private async drainQueue(): Promise { - const unregisteredTasks = _.filter(_.values(this.taskRegisterMap), registerTask => !registerTask.channel); + const unregisteredTasks = _.filter(_.values(this.taskRegisterMap), registerTask => { + if (registerTask.channeling) return false + registerTask.channeling = true; + return true + }) log('TaskRegister Queue size %d', unregisteredTasks.length); await Promise.map(unregisteredTasks, async (taskRegister) => { await this.createChannelAndConsume(taskRegister); diff --git a/src/brokers/interface.ts b/src/brokers/interface.ts index 5bc271f..1f66334 100644 --- a/src/brokers/interface.ts +++ b/src/brokers/interface.ts @@ -12,6 +12,7 @@ export interface TaskRegister { queueName: string; consumerTag?: string; channel?: amqp.Channel; + channeling?: boolean; } export interface TaskRegisterMap { From 7fb52097bbba6dcf4a500bafd1de177ee6538472 Mon Sep 17 00:00:00 2001 From: wangruiyuan Date: Wed, 7 Nov 2018 14:03:54 +0800 Subject: [PATCH 2/2] add semicolon --- src/brokers/amqp.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/brokers/amqp.ts b/src/brokers/amqp.ts index e6bfda6..280200c 100644 --- a/src/brokers/amqp.ts +++ b/src/brokers/amqp.ts @@ -236,10 +236,10 @@ export class AMQPBroker extends Broker { private async drainQueue(): Promise { const unregisteredTasks = _.filter(_.values(this.taskRegisterMap), registerTask => { - if (registerTask.channeling) return false + if (registerTask.channeling) return false; registerTask.channeling = true; - return true - }) + return true; + }); log('TaskRegister Queue size %d', unregisteredTasks.length); await Promise.map(unregisteredTasks, async (taskRegister) => { await this.createChannelAndConsume(taskRegister);