diff --git a/lib/socket.ts b/lib/socket.ts index 18a5364d..61d6514b 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -560,10 +560,18 @@ export class Socket extends EventEmitter { this.readyState = "closing"; if (this.writeBuffer.length) { - this.once("drain", this.closeTransport.bind(this, discard)); + debug( + "there are %d remaining packets in the buffer, waiting for the 'drain' event", + this.writeBuffer.length + ); + this.once("drain", () => { + debug("all packets have been sent, closing the transport"); + this.closeTransport(discard); + }); return; } + debug("the buffer is empty, closing the transport right away", discard); this.closeTransport(discard); } @@ -574,6 +582,7 @@ export class Socket extends EventEmitter { * @api private */ private closeTransport(discard) { + debug("closing the transport (discard? %s)", discard); if (discard) this.transport.discard(); this.transport.close(this.onClose.bind(this, "forced close")); } diff --git a/lib/transports-uws/websocket.ts b/lib/transports-uws/websocket.ts index 65c4ef32..9fc43ce3 100644 --- a/lib/transports-uws/websocket.ts +++ b/lib/transports-uws/websocket.ts @@ -53,36 +53,32 @@ export class WebSocket extends Transport { * @api private */ send(packets) { - const packet = packets.shift(); - if (typeof packet === "undefined") { - this.writable = true; - this.emit("drain"); - return; - } + this.writable = false; - // always creates a new object since ws modifies it - const opts: { compress?: boolean } = {}; - if (packet.options) { - opts.compress = packet.options.compress; - } + for (let i = 0; i < packets.length; i++) { + const packet = packets[i]; + const isLast = i + 1 === packets.length; - const send = (data) => { - const isBinary = typeof data !== "string"; - const compress = - this.perMessageDeflate && - Buffer.byteLength(data) > this.perMessageDeflate.threshold; + const send = (data) => { + const isBinary = typeof data !== "string"; + const compress = + this.perMessageDeflate && + Buffer.byteLength(data) > this.perMessageDeflate.threshold; - debug('writing "%s"', data); - this.writable = false; + debug('writing "%s"', data); + this.socket.send(data, isBinary, compress); - this.socket.send(data, isBinary, compress); - this.send(packets); - }; + if (isLast) { + this.writable = true; + this.emit("drain"); + } + }; - if (packet.options && typeof packet.options.wsPreEncoded === "string") { - send(packet.options.wsPreEncoded); - } else { - this.parser.encodePacket(packet, this.supportsBinary, send); + if (packet.options && typeof packet.options.wsPreEncoded === "string") { + send(packet.options.wsPreEncoded); + } else { + this.parser.encodePacket(packet, this.supportsBinary, send); + } } } @@ -94,7 +90,7 @@ export class WebSocket extends Transport { doClose(fn) { debug("closing"); fn && fn(); - // call fn first since socket.close() immediately emits a "close" event - this.socket.close(); + // call fn first since socket.end() immediately emits a "close" event + this.socket.end(); } } diff --git a/lib/transports/websocket.ts b/lib/transports/websocket.ts index 6f4f551a..57c4a7b3 100644 --- a/lib/transports/websocket.ts +++ b/lib/transports/websocket.ts @@ -61,47 +61,48 @@ export class WebSocket extends Transport { * @api private */ send(packets) { - const packet = packets.shift(); - if (typeof packet === "undefined") { - this.writable = true; - this.emit("drain"); - return; - } + this.writable = false; - // always creates a new object since ws modifies it - const opts: { compress?: boolean } = {}; - if (packet.options) { - opts.compress = packet.options.compress; - } + for (let i = 0; i < packets.length; i++) { + const packet = packets[i]; + const isLast = i + 1 === packets.length; - const send = (data) => { - if (this.perMessageDeflate) { - const len = - "string" === typeof data ? Buffer.byteLength(data) : data.length; - if (len < this.perMessageDeflate.threshold) { - opts.compress = false; - } + // always creates a new object since ws modifies it + const opts: { compress?: boolean } = {}; + if (packet.options) { + opts.compress = packet.options.compress; } - debug('writing "%s"', data); - this.writable = false; - this.socket.send(data, opts, (err) => { - if (err) return this.onError("write error", err.stack); - this.send(packets); - }); - }; + const onSent = (err) => { + if (err) { + return this.onError("write error", err.stack); + } else if (isLast) { + this.writable = true; + this.emit("drain"); + } + }; - if (packet.options && typeof packet.options.wsPreEncoded === "string") { - send(packet.options.wsPreEncoded); - } else if (this._canSendPreEncodedFrame(packet)) { - // the WebSocket frame was computed with WebSocket.Sender.frame() - // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 - this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, (err) => { - if (err) return this.onError("write error", err.stack); - this.send(packets); - }); - } else { - this.parser.encodePacket(packet, this.supportsBinary, send); + const send = (data) => { + if (this.perMessageDeflate) { + const len = + "string" === typeof data ? Buffer.byteLength(data) : data.length; + if (len < this.perMessageDeflate.threshold) { + opts.compress = false; + } + } + debug('writing "%s"', data); + this.socket.send(data, opts, onSent); + }; + + if (packet.options && typeof packet.options.wsPreEncoded === "string") { + send(packet.options.wsPreEncoded); + } else if (this._canSendPreEncodedFrame(packet)) { + // the WebSocket frame was computed with WebSocket.Sender.frame() + // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 + this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, onSent); + } else { + this.parser.encodePacket(packet, this.supportsBinary, send); + } } } diff --git a/test/server.js b/test/server.js index c65f176e..19dd3fbe 100644 --- a/test/server.js +++ b/test/server.js @@ -1797,9 +1797,7 @@ describe("server", () => { conn.send("a"); conn.send("b"); conn.send("c"); - setTimeout(() => { - conn.close(); - }, 50); + conn.close(); }); socket.on("open", () => {