Skip to content

Commit

Permalink
feat!: muxed streams as web streams
Browse files Browse the repository at this point in the history
Refactors streams from duplex async iterables:

```js
{
  source: Duplex<AsyncGenerator<Uint8Array, void, unknown>, Source<Uint8Array | Uint8ArrayList>, Promise<void>
  sink: (Source<Uint8Array | Uint8ArrayList>) => Promise<void>
}
```

to `ReadableWriteablePair<Uint8Array>`s:

```js
{
  readable: ReadableStream<Uint8Array>
  writable: WritableStream<Uint8Array>
}
```

Since the close methods for web streams are asynchronous, this lets
us close streams cleanly - that is, wait for any buffered data to
be sent/consumed before closing the stream.

We still need to be able abort a stream in an emergency, so streams
have the following methods for graceful closing:

```js
stream.readable.cancel(reason?: any): Promise<void>
stream.writable.close(): Promise<void>

// or

stream.close(): Promise<void>
```

..and for emergency closing:

```js
stream.abort(err: Error): void
```

Connections and multiaddr connections have the same `close`/`abort`
semantics, but are still Duplexes since making them web streams
would mean we need to convert things like node streams (for tcp) to
web streams which would just make things slower.

Transports such as WebTransport and WebRTC already deal in web
streams when multiplexing so these no longer need to be converted to
Duplex streams so it's win-win.

Fixes #1793
  • Loading branch information
achingbrain committed Jun 23, 2023
1 parent 6fdaa7d commit 96187aa
Show file tree
Hide file tree
Showing 148 changed files with 6,305 additions and 2,745 deletions.
4 changes: 2 additions & 2 deletions examples/protocol-and-stream-muxing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ There is still one last feature, you can provide multiple protocols for the same

```JavaScript
node2.handle(['/another-protocol/1.0.0', '/another-protocol/2.0.0'], ({ stream }) => {
if (stream.stat.protocol === '/another-protocol/2.0.0') {
if (stream.protocol === '/another-protocol/2.0.0') {
// handle backwards compatibility
}

Expand Down Expand Up @@ -136,7 +136,7 @@ node2.handle(['/a', '/b'], ({ stream }) => {
stream,
async function (source) {
for await (const msg of source) {
console.log(`from: ${stream.stat.protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
console.log(`from: ${stream.protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
}
}
)
Expand Down
9 changes: 6 additions & 3 deletions packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main"
"test:electron-main": "aegir test -t electron-main",
"generate": "protons src/stream-muxer/fixtures/pb/*.proto"
},
"dependencies": {
"@libp2p/interface": "~0.0.1",
Expand All @@ -104,9 +105,9 @@
"@libp2p/peer-collections": "^3.0.0",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/utils": "^3.0.12",
"@multiformats/multiaddr": "^12.1.3",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"delay": "^6.0.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.2",
Expand All @@ -121,13 +122,15 @@
"p-defer": "^4.0.0",
"p-limit": "^4.0.0",
"p-wait-for": "^5.0.2",
"protons-runtime": "^5.0.0",
"sinon": "^15.1.2",
"ts-sinon": "^2.0.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"aegir": "^39.0.10"
"aegir": "^39.0.10",
"protons": "^7.0.2"
},
"typedoc": {
"entryPoint": "./src/index.ts"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export function createMaConnPair (): [MultiaddrConnection, MultiaddrConnection]
const output: MultiaddrConnection = {
...duplex,
close: async () => {},
abort: () => {},
remoteAddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
timeline: {
open: Date.now()
Expand Down
50 changes: 24 additions & 26 deletions packages/interface-compliance-tests/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ export default (test: TestSetup<Connection>): void => {
expect(connection.id).to.exist()
expect(connection.remotePeer).to.exist()
expect(connection.remoteAddr).to.exist()
expect(connection.stat.status).to.equal('OPEN')
expect(connection.stat.timeline.open).to.exist()
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.stat.direction).to.exist()
expect(connection.status).to.equal('OPEN')
expect(connection.timeline.open).to.exist()
expect(connection.timeline.close).to.not.exist()
expect(connection.direction).to.exist()
expect(connection.streams).to.eql([])
expect(connection.tags).to.eql([])
})

it('should get the metadata of an open connection', () => {
const stat = connection.stat

expect(stat.status).to.equal('OPEN')
expect(stat.direction).to.exist()
expect(stat.timeline.open).to.exist()
expect(stat.timeline.close).to.not.exist()
expect(connection.status).to.equal('OPEN')
expect(connection.direction).to.exist()
expect(connection.timeline.open).to.exist()
expect(connection.timeline.close).to.not.exist()
})

it('should return an empty array of streams', () => {
Expand All @@ -51,7 +49,7 @@ export default (test: TestSetup<Connection>): void => {
const protocolToUse = '/echo/0.0.1'
const stream = await connection.newStream([protocolToUse])

expect(stream).to.have.nested.property('stat.protocol', protocolToUse)
expect(stream).to.have.property('protocol', protocolToUse)

const connStreams = connection.streams

Expand Down Expand Up @@ -79,19 +77,19 @@ export default (test: TestSetup<Connection>): void => {
}, proxyHandler)

connection = await test.setup()
connection.stat.timeline = timelineProxy
connection.timeline = timelineProxy
})

afterEach(async () => {
await test.teardown()
})

it('should be able to close the connection after being created', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

expect(connection.stat.timeline.close).to.exist()
expect(connection.stat.status).to.equal('CLOSED')
expect(connection.timeline.close).to.exist()
expect(connection.status).to.equal('CLOSED')
})

it('should be able to close the connection after opening a stream', async () => {
Expand All @@ -100,21 +98,21 @@ export default (test: TestSetup<Connection>): void => {
await connection.newStream([protocol])

// Close connection
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

expect(connection.stat.timeline.close).to.exist()
expect(connection.stat.status).to.equal('CLOSED')
expect(connection.timeline.close).to.exist()
expect(connection.status).to.equal('CLOSED')
})

it('should properly track streams', async () => {
// Open stream
const protocol = '/echo/0.0.1'
const stream = await connection.newStream([protocol])
expect(stream).to.have.nested.property('stat.protocol', protocol)
expect(stream).to.have.property('protocol', protocol)

// Close stream
stream.close()
await stream.close()

expect(connection.streams.filter(s => s.id === stream.id)).to.be.empty()
})
Expand All @@ -123,7 +121,7 @@ export default (test: TestSetup<Connection>): void => {
// Open stream
const protocol = '/echo/0.0.1'
const stream = await connection.newStream(protocol)
expect(stream).to.have.nested.property('stat.direction', 'outbound')
expect(stream).to.have.property('direction', 'outbound')
})

it.skip('should track inbound streams', async () => {
Expand All @@ -135,20 +133,20 @@ export default (test: TestSetup<Connection>): void => {

it('should support a proxy on the timeline', async () => {
sinon.spy(proxyHandler, 'set')
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()

await connection.close()
// @ts-expect-error - fails to infer callCount
expect(proxyHandler.set.callCount).to.equal(1)
// @ts-expect-error - fails to infer getCall
const [obj, key, value] = proxyHandler.set.getCall(0).args
expect(obj).to.eql(connection.stat.timeline)
expect(obj).to.eql(connection.timeline)
expect(key).to.equal('close')
expect(value).to.be.a('number').that.equals(connection.stat.timeline.close)
expect(value).to.be.a('number').that.equals(connection.timeline.close)
})

it('should fail to create a new stream if the connection is closing', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
const p = connection.close()

try {
Expand All @@ -165,7 +163,7 @@ export default (test: TestSetup<Connection>): void => {
})

it('should fail to create a new stream if the connection is closed', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

try {
Expand Down
87 changes: 39 additions & 48 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as STATUS from '@libp2p/interface/connection/status'
import * as Status from '@libp2p/interface/connection/status'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
Expand All @@ -9,13 +9,11 @@ import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions } from '@libp2p/interface'
import type { MultiaddrConnection, Connection, Stream, ConnectionStat, Direction } from '@libp2p/interface/connection'
import type { MultiaddrConnection, Connection, Stream, Direction, ByteStream, ConnectionTimeline } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:mock-connection')

Expand All @@ -38,7 +36,10 @@ class MockConnection implements Connection {
public remoteAddr: Multiaddr
public remotePeer: PeerId
public direction: Direction
public stat: ConnectionStat
public timeline: ConnectionTimeline
public multiplexer?: string
public encryption?: string
public status: keyof typeof Status
public streams: Stream[]
public tags: string[]

Expand All @@ -52,13 +53,10 @@ class MockConnection implements Connection {
this.remoteAddr = remoteAddr
this.remotePeer = remotePeer
this.direction = direction
this.stat = {
status: STATUS.OPEN,
direction,
timeline: maConn.timeline,
multiplexer: 'test-multiplexer',
encryption: 'yes-yes-very-secure'
}
this.status = Status.OPEN
this.timeline = maConn.timeline
this.multiplexer = 'test-multiplexer'
this.encryption = 'yes-yes-very-secure'
this.streams = []
this.tags = []
this.muxer = muxer
Expand All @@ -74,30 +72,20 @@ class MockConnection implements Connection {
throw new Error('protocols must have a length')
}

if (this.stat.status !== STATUS.OPEN) {
if (this.status !== Status.OPEN) {
throw new CodeError('connection must be open to create streams', 'ERR_CONNECTION_CLOSED')
}

const id = `${Math.random()}`
const stream = await this.muxer.newStream(id)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
...stream,
...result.stream,
stat: {
...stream.stat,
direction: 'outbound',
protocol: result.protocol
}
}
const protocolStream = await mss.select(stream, protocols, options)

this.streams.push(streamWithProtocol)
this.streams.push(protocolStream)

return streamWithProtocol
return protocolStream
}

addStream (stream: Stream): void {
addStream (stream: any): void {
this.streams.push(stream)
}

Expand All @@ -106,13 +94,23 @@ class MockConnection implements Connection {
}

async close (): Promise<void> {
this.stat.status = STATUS.CLOSING
this.status = Status.CLOSING
await Promise.all(
this.streams.map(async s => s.close())
)
await this.maConn.close()
this.status = Status.CLOSED
this.timeline.close = Date.now()
}

abort (err: Error): void {
this.status = Status.CLOSING
this.streams.forEach(s => {
s.close()
s.abort(err)
})
this.stat.status = STATUS.CLOSED
this.stat.timeline.close = Date.now()
this.maConn.abort(err)
this.status = Status.CLOSED
this.timeline.close = Date.now()
}
}

Expand All @@ -134,15 +132,13 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
onIncomingStream: (muxedStream) => {
try {
mss.handle(muxedStream, registrar.getProtocols())
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream = { ...muxedStream, ...stream }
muxedStream.stat.protocol = protocol
.then(stream => {
log('%s: incoming stream opened on %s', stream.direction, stream.protocol)

connection.addStream(muxedStream)
const { handler } = registrar.getHandler(protocol)
const { handler } = registrar.getHandler(stream.protocol)

handler({ connection, stream: muxedStream })
handler({ connection, stream })
}).catch(err => {
log.error(err)
})
Expand Down Expand Up @@ -170,20 +166,15 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
return connection
}

export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>): Stream {
export function mockStream (stream: ByteStream): Stream {
return {
...stream,
close: () => {},
closeRead: () => {},
closeWrite: () => {},
close: async () => {},
abort: () => {},
reset: () => {},
stat: {
direction: 'outbound',
protocol: '/foo/1.0.0',
timeline: {
open: Date.now()
}
direction: 'outbound',
protocol: '/foo/1.0.0',
timeline: {
open: Date.now()
},
metadata: {},
id: `stream-${Date.now()}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Arra
async close () {

},
abort: () => {},
timeline: {
open: Date.now()
},
Expand Down Expand Up @@ -44,6 +45,10 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
close: async () => {
outbound.timeline.close = Date.now()
controller.abort()
},
abort: () => {
outbound.timeline.close = Date.now()
controller.abort()
}
}

Expand All @@ -56,6 +61,10 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
close: async () => {
inbound.timeline.close = Date.now()
controller.abort()
},
abort: () => {
inbound.timeline.close = Date.now()
controller.abort()
}
}

Expand Down
Loading

0 comments on commit 96187aa

Please sign in to comment.