Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close MultiaddrConnection once #2478

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/transport-tcp/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ interface ToConnectionOptions {
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => {
let status: 'open' | 'closing' | 'closed' = 'open'
const log = options.logger.forComponent('libp2p:tcp:socket')
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''
Expand Down Expand Up @@ -126,10 +127,11 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
timeline: { open: Date.now() },

async close (options: AbortOptions = {}) {
if (socket.destroyed) {
log('%s socket was already destroyed when trying to close', lOptsStr)
if (status === 'closed' || status === 'closing' || socket.destroyed) {
log('The %s socket is either closed, closing, or already destroyed', lOptsStr)
return
}
status = 'closing'

if (options.signal == null) {
const signal = AbortSignal.timeout(closeTimeout)
Expand All @@ -153,6 +155,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// socket completely closed
log('%s socket closed', lOptsStr)

status = 'closed'
resolve()
})
socket.once('error', (err: Error) => {
Expand All @@ -163,6 +166,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
maConn.timeline.close = Date.now()
}

status = 'closed'
reject(err)
})

Expand Down Expand Up @@ -195,7 +199,10 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
abort: (err: Error) => {
log('%s socket abort due to error', lOptsStr, err)

socket.destroy(err)
// the abortSignalListener may already destroyed the socket with an error
if (!socket.destroyed) {
socket.destroy(err)
}

if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
Expand Down
61 changes: 61 additions & 0 deletions packages/transport-tcp/test/socket-to-conn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import os from 'os'
import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import defer from 'p-defer'
import Sinon from 'sinon'
import { toMultiaddrConnection } from '../src/socket-to-conn.js'

async function setup (opts?: { server?: ServerOpts, client?: SocketConstructorOpts }): Promise<{ server: Server, serverSocket: Socket, clientSocket: Socket }> {
Expand Down Expand Up @@ -287,6 +288,66 @@ describe('socket-to-conn', () => {
expect(serverSocket.destroyed).to.be.true()
})

it('should not close MultiaddrConnection twice', async () => {
({ server, clientSocket, serverSocket } = await setup())
// proxyServerSocket.writableLength returns 100 which cause socket cannot be destroyed immediately
const proxyServerSocket = new Proxy(serverSocket, {
get (target, prop, receiver) {
if (prop === 'writableLength') {
return 100
}
return Reflect.get(target, prop, receiver)
}
})

// spy on `.destroy()` invocations
const serverSocketDestroySpy = Sinon.spy(serverSocket, 'destroy')
// promise that is resolved when our outgoing socket is closed
const serverClosed = defer<boolean>()
const socketCloseTimeout = 10

const inboundMaConn = toMultiaddrConnection(proxyServerSocket, {
socketInactivityTimeout: 100,
socketCloseTimeout,
logger: defaultLogger()
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()

clientSocket.once('error', () => {})

serverSocket.once('close', () => {
serverClosed.resolve(true)
})

// send some data between the client and server
clientSocket.write('hello')
serverSocket.write('goodbye')

const signal = AbortSignal.timeout(socketCloseTimeout)
const addEventListenerSpy = Sinon.spy(signal, 'addEventListener')

// the 2nd and 3rd call should return immediately
await Promise.all([
inboundMaConn.close({ signal }),
inboundMaConn.close({ signal }),
inboundMaConn.close({ signal })
])

// server socket was closed for reading and writing
await expect(serverClosed.promise).to.eventually.be.true()

// the connection closing was recorded
expect(inboundMaConn.timeline.close).to.be.a('number')

// server socket is destroyed
expect(serverSocket.destroyed).to.be.true()
twoeths marked this conversation as resolved.
Show resolved Hide resolved

// the server socket was only closed once
expect(serverSocketDestroySpy.callCount).to.equal(1)
expect(addEventListenerSpy.callCount).to.equal(1)
})

it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => {
({ server, clientSocket, serverSocket } = await setup({
server: {
Expand Down