From 5fb53d68380fc2bebae4cbac1ceaa31c4cc309eb Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 1 Jun 2023 08:51:30 -0500 Subject: [PATCH] [FIX] remove flusher timers if we do an inline flush of the outbound --- nats-base-client/protocol.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/nats-base-client/protocol.ts b/nats-base-client/protocol.ts index 90c0a050..35edf338 100644 --- a/nats-base-client/protocol.ts +++ b/nats-base-client/protocol.ts @@ -395,6 +395,7 @@ export class ProtocolHandler implements Dispatcher { servers: Servers; server!: ServerImpl; features: Features; + flusher?: number; constructor(options: ConnectionOptions, publisher: Publisher) { this._closed = false; @@ -843,10 +844,17 @@ export class ProtocolHandler implements Dispatcher { this.outbound.fill(buf, ...payloads); if (len === 0) { - setTimeout(() => { + this.flusher = setTimeout(() => { this.flushPending(); }); } else if (this.outbound.size() >= this.pendingLimit) { + // if we have a flusher, clear it - otherwise in a bench + // type scenario where the main loop is dominated by a publisher + // we create many timers. + if (this.flusher) { + clearTimeout(this.flusher); + this.flusher = undefined; + } this.flushPending(); } } @@ -890,7 +898,7 @@ export class ProtocolHandler implements Dispatcher { let proto: string; if (options.headers) { if (options.reply) { - proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}${CR_LF}`; + proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}\r\n`; } else { proto = `HPUB ${subject} ${hlen} ${len}\r\n`; } @@ -945,9 +953,9 @@ export class ProtocolHandler implements Dispatcher { return; } if (max) { - this.sendCommand(`UNSUB ${s.sid} ${max}${CR_LF}`); + this.sendCommand(`UNSUB ${s.sid} ${max}\r\n`); } else { - this.sendCommand(`UNSUB ${s.sid}${CR_LF}`); + this.sendCommand(`UNSUB ${s.sid}\r\n`); } s.max = max; }