diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index ca0908c675..2e4365697d 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -23,6 +23,7 @@ interface ToConnectionOptions { * https://github.com/libp2p/interface-transport#multiaddrconnection */ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => { + let status: 'open' | 'closing' | 'closed' = 'open' const log = options.logger.forComponent('libp2p:tcp:socket') const metrics = options.metrics const metricPrefix = options.metricPrefix ?? '' @@ -126,10 +127,11 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio timeline: { open: Date.now() }, async close (options: AbortOptions = {}) { - if (socket.destroyed) { - log('%s socket was already destroyed when trying to close', lOptsStr) + if (status === 'closed' || status === 'closing' || socket.destroyed) { + log('The %s socket is either closed, closing, or already destroyed', lOptsStr) return } + status = 'closing' if (options.signal == null) { const signal = AbortSignal.timeout(closeTimeout) @@ -153,6 +155,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio // socket completely closed log('%s socket closed', lOptsStr) + status = 'closed' resolve() }) socket.once('error', (err: Error) => { @@ -163,6 +166,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio maConn.timeline.close = Date.now() } + status = 'closed' reject(err) }) @@ -195,7 +199,10 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio abort: (err: Error) => { log('%s socket abort due to error', lOptsStr, err) - socket.destroy(err) + // the abortSignalListener may already destroyed the socket with an error + if (!socket.destroyed) { + socket.destroy(err) + } if (maConn.timeline.close == null) { maConn.timeline.close = Date.now() diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index bd9664d5d8..017d5d454a 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -3,6 +3,7 @@ import os from 'os' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import defer from 'p-defer' +import Sinon from 'sinon' import { toMultiaddrConnection } from '../src/socket-to-conn.js' async function setup (opts?: { server?: ServerOpts, client?: SocketConstructorOpts }): Promise<{ server: Server, serverSocket: Socket, clientSocket: Socket }> { @@ -287,6 +288,66 @@ describe('socket-to-conn', () => { expect(serverSocket.destroyed).to.be.true() }) + it('should not close MultiaddrConnection twice', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + // proxyServerSocket.writableLength returns 100 which cause socket cannot be destroyed immediately + const proxyServerSocket = new Proxy(serverSocket, { + get (target, prop, receiver) { + if (prop === 'writableLength') { + return 100 + } + return Reflect.get(target, prop, receiver) + } + }) + + // spy on `.destroy()` invocations + const serverSocketDestroySpy = Sinon.spy(serverSocket, 'destroy') + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + const socketCloseTimeout = 10 + + const inboundMaConn = toMultiaddrConnection(proxyServerSocket, { + socketInactivityTimeout: 100, + socketCloseTimeout, + logger: defaultLogger() + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + const signal = AbortSignal.timeout(socketCloseTimeout) + const addEventListenerSpy = Sinon.spy(signal, 'addEventListener') + + // the 2nd and 3rd call should return immediately + await Promise.all([ + inboundMaConn.close({ signal }), + inboundMaConn.close({ signal }), + inboundMaConn.close({ signal }) + ]) + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + + // the server socket was only closed once + expect(serverSocketDestroySpy.callCount).to.equal(1) + expect(addEventListenerSpy.callCount).to.equal(1) + }) + it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => { ({ server, clientSocket, serverSocket } = await setup({ server: {