diff --git a/src/upgrader.ts b/src/upgrader.ts index f83365ac5d..23f29619d4 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -301,6 +301,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg }) .catch(err => { log.error(err) + + if (muxedStream.timeline.close == null) { + muxedStream.close() + } }) }, // Run anytime a stream closes @@ -330,6 +334,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg } catch (err: any) { log.error('could not create new stream', err) + if (muxedStream.timeline.close == null) { + muxedStream.close() + } + if (err.code != null) { throw err } diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 6a6772988c..26e3169959 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -405,6 +405,29 @@ describe('Upgrader', () => { })) .to.eventually.be.rejected.with.property('code', 'ABORT_ERR') }) + + it('should close streams when protocol negotiation fails', async () => { + await remoteComponents.getRegistrar().unhandle('/echo/1.0.0') + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections[0].streams).to.have.lengthOf(0) + expect(connections[1].streams).to.have.lengthOf(0) + + await expect(connections[0].newStream('/echo/1.0.0')) + .to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') + + // wait for remote to close + await delay(100) + + expect(connections[0].streams).to.have.lengthOf(0) + expect(connections[1].streams).to.have.lengthOf(0) + }) }) describe('libp2p.upgrader', () => {