From 8ba68c0d68d0770b4270b43e040328e0ae3f79a4 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 11 Apr 2024 15:32:02 +0700 Subject: [PATCH 1/5] fix: close MultiaddrConnection once --- packages/transport-tcp/src/socket-to-conn.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index ca0908c675..24abf99ef9 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -23,6 +23,7 @@ interface ToConnectionOptions { * https://github.com/libp2p/interface-transport#multiaddrconnection */ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => { + let status: 'open' | 'closing' | 'closed' = 'open' const log = options.logger.forComponent('libp2p:tcp:socket') const metrics = options.metrics const metricPrefix = options.metricPrefix ?? '' @@ -126,10 +127,11 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio timeline: { open: Date.now() }, async close (options: AbortOptions = {}) { - if (socket.destroyed) { - log('%s socket was already destroyed when trying to close', lOptsStr) + if (status === 'closed' || status === 'closing' || socket.destroyed) { + log('%s socket was already destroyed or closed or closing', lOptsStr) return } + status = 'closing' if (options.signal == null) { const signal = AbortSignal.timeout(closeTimeout) @@ -153,6 +155,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio // socket completely closed log('%s socket closed', lOptsStr) + status = 'closed' resolve() }) socket.once('error', (err: Error) => { @@ -163,6 +166,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio maConn.timeline.close = Date.now() } + status = 'closed' reject(err) }) From e51028215eb6f2bc1cab8102e8e0900cfdc1813b Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 12 Apr 2024 10:49:57 +0700 Subject: [PATCH 2/5] fix: log message and unit test --- packages/transport-tcp/src/socket-to-conn.ts | 2 +- .../transport-tcp/test/socket-to-conn.spec.ts | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index 24abf99ef9..2475fa2f0b 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -128,7 +128,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio async close (options: AbortOptions = {}) { if (status === 'closed' || status === 'closing' || socket.destroyed) { - log('%s socket was already destroyed or closed or closing', lOptsStr) + log('The %s socket is either closed, closing, or already destroyed', lOptsStr) return } status = 'closing' diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index bd9664d5d8..ed6eb1c0a2 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -1,7 +1,9 @@ import { createServer, Socket, type Server, type ServerOpts, type SocketConstructorOpts } from 'net' import os from 'os' import { defaultLogger } from '@libp2p/logger' +import Sinon from 'sinon' import { expect } from 'aegir/chai' +import { type ComponentLogger, type Logger } from '@libp2p/logger' import defer from 'p-defer' import { toMultiaddrConnection } from '../src/socket-to-conn.js' @@ -287,6 +289,50 @@ describe('socket-to-conn', () => { expect(serverSocket.destroyed).to.be.true() }) + it('should not close MultiaddrConnection twice', async () => { + const loggerStub = Sinon.stub(); + const logger: ComponentLogger = { + forComponent: () => loggerStub as unknown as Logger + }; + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100, + socketCloseTimeout: 10, + logger: logger + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + // the 2nd call should return immediately + inboundMaConn.close() + inboundMaConn.close() + + expect(loggerStub.calledWithMatch("socket is either closed, closing, or already destroyed")).to.be.true() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => { ({ server, clientSocket, serverSocket } = await setup({ server: { From a8eb82b681b3838852b3a5c644e4fd2f0bdb96a7 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 12 Apr 2024 16:42:05 +0700 Subject: [PATCH 3/5] chore: fix lint --- .../transport-tcp/test/socket-to-conn.spec.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index ed6eb1c0a2..9a949b2122 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -1,10 +1,10 @@ import { createServer, Socket, type Server, type ServerOpts, type SocketConstructorOpts } from 'net' import os from 'os' import { defaultLogger } from '@libp2p/logger' -import Sinon from 'sinon' -import { expect } from 'aegir/chai' import { type ComponentLogger, type Logger } from '@libp2p/logger' +import { expect } from 'aegir/chai' import defer from 'p-defer' +import Sinon from 'sinon' import { toMultiaddrConnection } from '../src/socket-to-conn.js' async function setup (opts?: { server?: ServerOpts, client?: SocketConstructorOpts }): Promise<{ server: Server, serverSocket: Socket, clientSocket: Socket }> { @@ -290,7 +290,7 @@ describe('socket-to-conn', () => { }) it('should not close MultiaddrConnection twice', async () => { - const loggerStub = Sinon.stub(); + const loggerStub = Sinon.stub() const logger: ComponentLogger = { forComponent: () => loggerStub as unknown as Logger }; @@ -302,7 +302,7 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, socketCloseTimeout: 10, - logger: logger + logger }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -318,10 +318,12 @@ describe('socket-to-conn', () => { serverSocket.write('goodbye') // the 2nd call should return immediately - inboundMaConn.close() - inboundMaConn.close() + await Promise.all([ + inboundMaConn.close(), + inboundMaConn.close() + ]) - expect(loggerStub.calledWithMatch("socket is either closed, closing, or already destroyed")).to.be.true() + expect(loggerStub.calledWithMatch('socket is either closed, closing, or already destroyed')).to.be.true() // server socket was closed for reading and writing await expect(serverClosed.promise).to.eventually.be.true() From a17556175d75e19dbd5c25e080ff160c0251d963 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Mon, 15 Apr 2024 09:08:01 +0700 Subject: [PATCH 4/5] chore: confirm test by socket.destroy and AbortSignal listeners --- packages/transport-tcp/src/socket-to-conn.ts | 5 ++- .../transport-tcp/test/socket-to-conn.spec.ts | 39 ++++++++++++------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index 2475fa2f0b..2e4365697d 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -199,7 +199,10 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio abort: (err: Error) => { log('%s socket abort due to error', lOptsStr, err) - socket.destroy(err) + // the abortSignalListener may already destroyed the socket with an error + if (!socket.destroyed) { + socket.destroy(err) + } if (maConn.timeline.close == null) { maConn.timeline.close = Date.now() diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index 9a949b2122..5a1a833726 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -1,7 +1,6 @@ import { createServer, Socket, type Server, type ServerOpts, type SocketConstructorOpts } from 'net' import os from 'os' import { defaultLogger } from '@libp2p/logger' -import { type ComponentLogger, type Logger } from '@libp2p/logger' import { expect } from 'aegir/chai' import defer from 'p-defer' import Sinon from 'sinon' @@ -290,19 +289,27 @@ describe('socket-to-conn', () => { }) it('should not close MultiaddrConnection twice', async () => { - const loggerStub = Sinon.stub() - const logger: ComponentLogger = { - forComponent: () => loggerStub as unknown as Logger - }; ({ server, clientSocket, serverSocket } = await setup()) + // proxyServerSocket.writableLength returns 100 which cause socket cannot be destroyed immediately + const proxyServerSocket = new Proxy(serverSocket, { + get(target, prop, receiver) { + if (prop === 'writableLength') { + return 100 + } + return Reflect.get(target, prop, receiver) + } + }) + // spy on `.destroy()` invocations + const serverSocketDestroySpy = Sinon.spy(serverSocket, 'destroy') // promise that is resolved when our outgoing socket is closed const serverClosed = defer() + const socketCloseTimeout = 10; - const inboundMaConn = toMultiaddrConnection(serverSocket, { + const inboundMaConn = toMultiaddrConnection(proxyServerSocket, { socketInactivityTimeout: 100, - socketCloseTimeout: 10, - logger + socketCloseTimeout, + logger: defaultLogger() }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -317,14 +324,16 @@ describe('socket-to-conn', () => { clientSocket.write('hello') serverSocket.write('goodbye') - // the 2nd call should return immediately + const signal = AbortSignal.timeout(socketCloseTimeout); + const addEventListenerSpy = Sinon.spy(signal, 'addEventListener') + + // the 2nd and 3rd call should return immediately await Promise.all([ - inboundMaConn.close(), - inboundMaConn.close() + inboundMaConn.close({ signal }), + inboundMaConn.close({ signal }), + inboundMaConn.close({ signal }) ]) - expect(loggerStub.calledWithMatch('socket is either closed, closing, or already destroyed')).to.be.true() - // server socket was closed for reading and writing await expect(serverClosed.promise).to.eventually.be.true() @@ -333,6 +342,10 @@ describe('socket-to-conn', () => { // server socket is destroyed expect(serverSocket.destroyed).to.be.true() + + // the server socket was only closed once + expect(serverSocketDestroySpy.callCount).to.equal(1) + expect(addEventListenerSpy.callCount).to.equal(1) }) it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => { From 1c123f0d1139c7c63aeb1be896132553aec65ae0 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Mon, 15 Apr 2024 09:24:22 +0700 Subject: [PATCH 5/5] chore: fix lint --- packages/transport-tcp/test/socket-to-conn.spec.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index 5a1a833726..017d5d454a 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -292,7 +292,7 @@ describe('socket-to-conn', () => { ({ server, clientSocket, serverSocket } = await setup()) // proxyServerSocket.writableLength returns 100 which cause socket cannot be destroyed immediately const proxyServerSocket = new Proxy(serverSocket, { - get(target, prop, receiver) { + get (target, prop, receiver) { if (prop === 'writableLength') { return 100 } @@ -304,7 +304,7 @@ describe('socket-to-conn', () => { const serverSocketDestroySpy = Sinon.spy(serverSocket, 'destroy') // promise that is resolved when our outgoing socket is closed const serverClosed = defer() - const socketCloseTimeout = 10; + const socketCloseTimeout = 10 const inboundMaConn = toMultiaddrConnection(proxyServerSocket, { socketInactivityTimeout: 100, @@ -324,7 +324,7 @@ describe('socket-to-conn', () => { clientSocket.write('hello') serverSocket.write('goodbye') - const signal = AbortSignal.timeout(socketCloseTimeout); + const signal = AbortSignal.timeout(socketCloseTimeout) const addEventListenerSpy = Sinon.spy(signal, 'addEventListener') // the 2nd and 3rd call should return immediately @@ -343,9 +343,9 @@ describe('socket-to-conn', () => { // server socket is destroyed expect(serverSocket.destroyed).to.be.true() - // the server socket was only closed once - expect(serverSocketDestroySpy.callCount).to.equal(1) - expect(addEventListenerSpy.callCount).to.equal(1) + // the server socket was only closed once + expect(serverSocketDestroySpy.callCount).to.equal(1) + expect(addEventListenerSpy.callCount).to.equal(1) }) it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => {