Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: allow stream methods to be async (#404)
Browse files Browse the repository at this point in the history
If a stream implementation needs to return a promise let them do so,
this allows them to implement internal backpressure by slowing down
reads from the source passed to the sink function.
  • Loading branch information
achingbrain committed May 17, 2023
1 parent 311a587 commit cfcd6d7
Showing 1 changed file with 53 additions and 13 deletions.
66 changes: 53 additions & 13 deletions packages/interface-stream-muxer/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export interface AbstractStreamInit {
onEnd?: (err?: Error | undefined) => void
}

function isPromise (res?: any): res is Promise<void> {
return res != null && typeof res.then === 'function'
}

export abstract class AbstractStream implements Stream {
public id: string
public stat: StreamStat
Expand Down Expand Up @@ -82,7 +86,13 @@ export abstract class AbstractStream implements Stream {
onEnd: () => {
// already sent a reset message
if (this.stat.timeline.reset !== null) {
this.sendCloseRead()
const res = this.sendCloseRead()

if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close read', err)
})
}
}

this.onSourceEnd()
Expand Down Expand Up @@ -169,7 +179,13 @@ export abstract class AbstractStream implements Stream {
try {
// need to call this here as the sink method returns in the catch block
// when the close controller is aborted
this.sendCloseWrite()
const res = this.sendCloseWrite()

if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close write', err)
})
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}
Expand Down Expand Up @@ -215,17 +231,31 @@ export abstract class AbstractStream implements Stream {
source = abortableSource(source, signal)

if (this.stat.direction === 'outbound') { // If initiator, open a new stream
this.sendNewStream()
const res = this.sendNewStream()

if (isPromise(res)) {
await res
}
}

for await (let data of source) {
while (data.length > 0) {
if (data.length <= this.maxDataSize) {
this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)

if (isPromise(res)) { // eslint-disable-line max-depth
await res
}

break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
this.sendData(data.sublist(0, this.maxDataSize))
const res = this.sendData(data.sublist(0, this.maxDataSize))

if (isPromise(res)) {
await res
}

data.consume(this.maxDataSize)
}
}
Expand All @@ -252,7 +282,12 @@ export abstract class AbstractStream implements Stream {
} else {
log.trace('%s stream %s error', this.stat.direction, this.id, err)
try {
this.sendReset()
const res = this.sendReset()

if (isPromise(res)) {
await res
}

this.stat.timeline.reset = Date.now()
} catch (err) {
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err)
Expand All @@ -261,13 +296,18 @@ export abstract class AbstractStream implements Stream {

this.streamSource.end(err)
this.onSinkEnd(err)
return

throw err
} finally {
signal.clear()
}

try {
this.sendCloseWrite()
const res = this.sendCloseWrite()

if (isPromise(res)) {
await res
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}
Expand Down Expand Up @@ -295,27 +335,27 @@ export abstract class AbstractStream implements Stream {
* Send a message to the remote muxer informing them a new stream is being
* opened
*/
abstract sendNewStream (): void
abstract sendNewStream (): void | Promise<void>

/**
* Send a data message to the remote muxer
*/
abstract sendData (buf: Uint8ArrayList): void
abstract sendData (buf: Uint8ArrayList): void | Promise<void>

/**
* Send a reset message to the remote muxer
*/
abstract sendReset (): void
abstract sendReset (): void | Promise<void>

/**
* Send a message to the remote muxer, informing them no more data messages
* will be sent by this end of the stream
*/
abstract sendCloseWrite (): void
abstract sendCloseWrite (): void | Promise<void>

/**
* Send a message to the remote muxer, informing them no more data messages
* will be read by this end of the stream
*/
abstract sendCloseRead (): void
abstract sendCloseRead (): void | Promise<void>
}

0 comments on commit cfcd6d7

Please sign in to comment.