Skip to content

Commit

Permalink
test: add more test-case and example for channel end()
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobbubu committed May 4, 2020
1 parent 5776d52 commit c33d1c8
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 12 deletions.
90 changes: 90 additions & 0 deletions examples/nested-end.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
process.env.DEBUG = process.env.DEBUG ?? 'plex*'
process.env.DEBUG_NAME_WIDTH = '24'

import * as pull from 'pull-stream'
import { Plex, Channel } from '../src'

const main = function () {
return new Promise((resolve) => {
const generator = function (initial: string) {
let counter = 0
return () => `${initial}-${++counter}`
}
const d1 = {
source: pull(
pull.infinite(generator('d1')),
pull.asyncMap((data, cb) => {
setTimeout(() => cb(null, data), 30)
})
),
sink: pull.drain(),
}

const d2 = {
source: pull(
pull.infinite(generator('d2')),
pull.asyncMap((data, cb) => {
setTimeout(() => cb(null, data), 30)
})
),
sink: pull.drain(),
}

const plex1 = new Plex({ name: 'p1', level: 1 })
const plex2 = new Plex({ name: 'p2', level: 1 })

let localChildPlexClosed = false
let remoteChildPlexClosed = false
let localChannelClosed = false
let remoteChannelClosed = false

const hasDone = () => {
if (
localChildPlexClosed &&
remoteChildPlexClosed &&
localChannelClosed &&
remoteChannelClosed
) {
resolve()
}
}

const childPlex = plex1.createPlex({ name: 'child', level: 2 })
childPlex.on('close', () => {
localChildPlexClosed = true
hasDone()
})

const a = childPlex.createChannel('a')
a.on('close', () => {
localChannelClosed = true
hasDone()
})
pull(a, d1, a)

plex2.on('plex', (remoteChild) => {
remoteChild.on('close', (_) => {
remoteChildPlexClosed = true
hasDone()
})
remoteChild.on('channel', (channel: Channel) => {
channel.on('close', (ch) => {
remoteChannelClosed = true
remoteChild.end()
hasDone()
})

setTimeout(() => {
a.end()
}, 100)

pull(channel, d2, channel)
})
})

pull(plex1, plex2, plex1)
})
}

// tslint:disable-next-line no-floating-promises
main()
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"precommit": "lint-staged",
"example:ex1": "tsnd --respawn examples/ex1.ts",
"example:net": "tsnd --respawn examples/net.ts",
"example:nested": "tsnd --respawn examples/nested.ts"
"example:nested": "tsnd --respawn examples/nested.ts",
"example:nested-end": "tsnd --respawn examples/nested-end.ts"
},
"lint-staged": {
"{src,test}/**/*.ts": [
Expand Down
12 changes: 6 additions & 6 deletions src/plex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ export class Plex extends EventEmitter {
pl._sendSinkEnd(true)

delete this._plexes[pl.name]
this.logger.debug(`plex "${pl.name}" closed`)
this.logger.debug(`plex "${pl.getDisplayName()}" closed`)
})

if (initiator) {
this.pushToSource(OpenPlex(plex.name, plex.meta))
}
this.logger.debug(`plex "${plex.name}" opened`)
this.logger.debug(`plex "${plex.getDisplayName()}" opened`)
return plex
}

Expand All @@ -260,24 +260,24 @@ export class Plex extends EventEmitter {
private _openChannel(id: number, name: string, initiator: boolean) {
const channels = this._channels
if (channels[id]) {
throw new Error(`Channel("${id}/${name}") exists`)
throw new Error(`Channel("${channels[id].getDisplayName()}") exists`)
}

const ch = this.findChannelByName(name)
if (ch) {
this.logger.warn(`Channel("${name}") exists with id ${id}`)
this.logger.warn(`Channel("${ch.getDisplayName()}")`)
this.emit('channelNameConflict', id, ch)
}

const channel = new Channel(id, name, this)
channel.on('close', (ch) => {
delete this._channels[ch.id]
this.logger.debug(`channel "${ch.id}/${ch.name}" closed`)
this.logger.debug(`channel "${ch.getDisplayName()}" closed`)
})
channel.open(initiator)
channels[id] = channel

this.logger.debug(`channel "${channel.id}/${channel.name}" opened`)
this.logger.debug(`channel "${channel.getDisplayName()}" opened`)
return channel
}

Expand Down
89 changes: 84 additions & 5 deletions test/nested.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ describe('nested', () => {
const plex2 = new Plex({ name: 'p2', level: 1 })
const plex2PeerMetaEvent = jest.fn()

let localChildClosed = false
let remoteChildClosed = false
let localChildPlexClosed = false
let remoteChildPlexClosed = false
let localChannelClosed = false

const hasDone = () => {
if (localChildClosed && remoteChildClosed && localChannelClosed) {
if (localChildPlexClosed && remoteChildPlexClosed && localChannelClosed) {
done()
}
}
Expand All @@ -24,7 +24,7 @@ describe('nested', () => {

const childPlex = plex1.createPlex({ name: 'child', level: 2 })
childPlex.on('close', () => {
localChildClosed = true
localChildPlexClosed = true
hasDone()
})
expect(childPlex.meta).toEqual({ name: 'child', level: 2 })
Expand All @@ -45,7 +45,7 @@ describe('nested', () => {
expect(plex2PeerMetaEvent).toBeCalledTimes(1)
expect(plex2PeerMetaEvent.mock.calls[0][0]).toEqual({ name: 'p1', level: 1 })

remoteChildClosed = true
remoteChildPlexClosed = true
hasDone()
})
remoteChild.on('channel', (channel: Channel) => {
Expand All @@ -58,4 +58,83 @@ describe('nested', () => {

pull(plex1, plex2, plex1)
})

it('channel end on child plex', (done) => {
const generator = function (initial: string) {
return () => initial
}
const d1 = {
source: pull(
pull.infinite(generator('d1')),
pull.asyncMap((data, cb) => {
setTimeout(() => cb(null, data), 30)
})
),
sink: pull.drain(),
}

const d2 = {
source: pull(
pull.infinite(generator('d2')),
pull.asyncMap((data, cb) => {
setTimeout(() => cb(null, data), 30)
})
),
sink: pull.drain(),
}

const plex1 = new Plex({ name: 'p1', level: 1 })
const plex2 = new Plex({ name: 'p2', level: 1 })

let localChildPlexClosed = false
let remoteChildPlexClosed = false
let localChannelClosed = false
let remoteChannelClosed = false

const hasDone = () => {
if (
localChildPlexClosed &&
remoteChildPlexClosed &&
localChannelClosed &&
remoteChannelClosed
) {
done()
}
}

const childPlex = plex1.createPlex({ name: 'child', level: 2 })
childPlex.on('close', () => {
localChildPlexClosed = true
hasDone()
})

const a = childPlex.createChannel('a')
a.on('close', () => {
localChannelClosed = true
hasDone()
})
pull(a, d1, a)

plex2.on('plex', (remoteChild) => {
remoteChild.on('close', (_) => {
remoteChildPlexClosed = true
hasDone()
})
remoteChild.on('channel', (channel: Channel) => {
channel.on('close', (ch) => {
remoteChannelClosed = true
remoteChild.end()
hasDone()
})

setTimeout(() => {
a.end()
}, 100)

pull(channel, d2, channel)
})
})

pull(plex1, plex2, plex1)
})
})

0 comments on commit c33d1c8

Please sign in to comment.