diff --git a/package.json b/package.json index b518360d9b..700801655f 100644 --- a/package.json +++ b/package.json @@ -172,13 +172,13 @@ "release": "aegir release" }, "dependencies": { - "@libp2p/components": "^1.0.0", + "@libp2p/components": "^2.0.0", "@libp2p/crypto": "^1.0.0", "@libp2p/interfaces": "^3.0.2", - "@libp2p/logger": "^1.1.0", + "@libp2p/logger": "^2.0.0", "@libp2p/peer-collections": "^1.0.0", "@libp2p/peer-id": "^1.1.0", - "@libp2p/topology": "^2.0.0", + "@libp2p/topology": "^3.0.0", "@multiformats/multiaddr": "^10.2.0", "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", @@ -191,10 +191,10 @@ "uint8arrays": "^3.0.0" }, "devDependencies": { - "@libp2p/interface-connection": "^1.0.1", + "@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-peer-id": "^1.0.2", "@libp2p/interface-pubsub": "^1.0.1", - "@libp2p/interface-registrar": "^1.0.0", + "@libp2p/interface-registrar": "^2.0.0", "@libp2p/peer-id-factory": "^1.0.0", "aegir": "^37.2.0", "delay": "^5.0.0", diff --git a/src/index.ts b/src/index.ts index 66058841b1..a8d0f521ed 100644 --- a/src/index.ts +++ b/src/index.ts @@ -165,9 +165,15 @@ export abstract class PubSubBaseProtocol extends EventEmi * On an inbound stream opened */ protected _onIncomingStream (data: IncomingStreamData) { - const { protocol, stream, connection } = data + const { stream, connection } = data const peerId = connection.remotePeer - const peer = this.addPeer(peerId, protocol) + + if (stream.stat.protocol == null) { + stream.abort(new Error('Stream was not multiplexed')) + return + } + + const peer = this.addPeer(peerId, stream.stat.protocol) const inboundStream = peer.attachInboundStream(stream) this.processMessages(peerId, inboundStream, peer) @@ -182,8 +188,14 @@ export abstract class PubSubBaseProtocol extends EventEmi void Promise.resolve().then(async () => { try { - const { stream, protocol } = await conn.newStream(this.multicodecs) - const peer = this.addPeer(peerId, protocol) + const stream = await conn.newStream(this.multicodecs) + + if (stream.stat.protocol == null) { + stream.abort(new Error('Stream was not multiplexed')) + return + } + + const peer = this.addPeer(peerId, stream.stat.protocol) await peer.attachOutboundStream(stream) } catch (err: any) { log.error(err) diff --git a/test/utils/index.ts b/test/utils/index.ts index 8110d56d95..abeb01b4ac 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -132,15 +132,19 @@ export const ConnectionPair = (): [Connection, Connection] => { { // @ts-expect-error incomplete implementation newStream: async (protocol: string[]) => await Promise.resolve({ - protocol: protocol[0], - stream: d0 + ...d0, + stat: { + protocol: protocol[0] + } }) }, { // @ts-expect-error incomplete implementation newStream: async (protocol: string[]) => await Promise.resolve({ - protocol: protocol[0], - stream: d1 + ...d1, + stat: { + protocol: protocol[0] + } }) } ] @@ -148,7 +152,7 @@ export const ConnectionPair = (): [Connection, Connection] => { export async function mockIncomingStreamEvent (protocol: string, conn: Connection, remotePeer: PeerId): Promise { return { - ...await conn.newStream([protocol]), + stream: await conn.newStream([protocol]), // @ts-expect-error incomplete implementation connection: { remotePeer