Skip to content

Commit

Permalink
[FIX] remove flusher timers if we do an inline flush of the outbound
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 1, 2023
1 parent ace0fec commit 5fb53d6
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
servers: Servers;
server!: ServerImpl;
features: Features;
flusher?: number;

constructor(options: ConnectionOptions, publisher: Publisher) {
this._closed = false;
Expand Down Expand Up @@ -843,10 +844,17 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
this.outbound.fill(buf, ...payloads);

if (len === 0) {
setTimeout(() => {
this.flusher = setTimeout(() => {
this.flushPending();
});
} else if (this.outbound.size() >= this.pendingLimit) {
// if we have a flusher, clear it - otherwise in a bench
// type scenario where the main loop is dominated by a publisher
// we create many timers.
if (this.flusher) {
clearTimeout(this.flusher);
this.flusher = undefined;
}
this.flushPending();
}
}
Expand Down Expand Up @@ -890,7 +898,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
let proto: string;
if (options.headers) {
if (options.reply) {
proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}${CR_LF}`;
proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}\r\n`;
} else {
proto = `HPUB ${subject} ${hlen} ${len}\r\n`;
}
Expand Down Expand Up @@ -945,9 +953,9 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
return;
}
if (max) {
this.sendCommand(`UNSUB ${s.sid} ${max}${CR_LF}`);
this.sendCommand(`UNSUB ${s.sid} ${max}\r\n`);
} else {
this.sendCommand(`UNSUB ${s.sid}${CR_LF}`);
this.sendCommand(`UNSUB ${s.sid}\r\n`);
}
s.max = max;
}
Expand Down

0 comments on commit 5fb53d6

Please sign in to comment.