Skip to content
This repository has been archived by the owner on Feb 26, 2021. It is now read-only.

Commit

Permalink
feat: fix stream closing behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 authored and jacobheun committed Oct 2, 2018
1 parent 50de6af commit 9953f95
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
63 changes: 53 additions & 10 deletions src/listener.js
Expand Up @@ -14,6 +14,7 @@ const setImmediate = require('async/setImmediate')
const utils = require('./utils')
const cleanUrlSIO = utils.cleanUrlSIO
const crypto = require('libp2p-crypto')
const pull = require('pull-stream')

const noop = once(() => {})

Expand All @@ -38,6 +39,8 @@ class Listener extends EE {
this._handler = options.handler || noop
this.listeners_list = options.listeners || {}
this.flag = options.flag
this.conns = []
this.connected = false
}

// "private" functions
Expand Down Expand Up @@ -209,6 +212,11 @@ class Listener extends EE {
this.listeners_list[this.server] = this
callback = callback ? once(callback) : noop

if (this.connected) { // listener was .close()'d yet not all conns disconnected. we're still connected, so don't do anything
this.closing = false
return setImmediate(() => callback())
}

series([
(cb) => this._up(cb),
(cb) => this._crypto(cb)
Expand All @@ -220,7 +228,10 @@ class Listener extends EE {
this.emit('error', err)
this.emit('close')
return callback(err)
} else this.log('success')
}

this.log('success')
this.connected = true

this.io.on('reconnect', () => {
// force to get a new signature
Expand All @@ -247,14 +258,52 @@ class Listener extends EE {
setImmediate(() => callback(null, this.ma ? [this.ma] : []))
}

get activeConnections () {
this.conns = this.conns.filter(c => c.sink || c.source)
return Boolean(this.conns.length)
}

maybeClose () {
if (!this.activeConnections && this.closing) {
this.connected = false
this.closing = false
this.log('no more connections and listener is offline - closing')
this._down()
}
}

close (callback) {
callback = callback ? once(callback) : noop

this._down()
this.closing = true // will close once the last connection quits
this.maybeClose()

callback()
}

stateWatch (sink, source) {
let cstate = {sink: true, source: true}
const watch = (name) => pull.through(v => v, e => {
cstate[name] = false
if (!cstate.sink && !cstate.source) {
this.maybeClose()
}
})

this.conns.push(cstate)

return {
sink: pull(
watch('sink'),
sink
),
source: pull(
source,
watch('source')
)
}
}

// called from transport
/**
* Dials a peer
Expand Down Expand Up @@ -293,14 +342,8 @@ class Listener extends EE {
if (err) return callback(err instanceof Error ? err : new Error(err))
dlog(err ? 'error: ' + err.toString() : 'success')
const source = io.createSource(dialId + '.listener')
conn.setInnerConn(
{
sink: sink,
source: source
}, {
getObservedAddrs: (cb) => cb(null, [_ma])
}
)

conn.setInnerConn(this.stateWatch(sink, source), { getObservedAddrs: (cb) => cb(null, [_ma]) })
callback(null, conn)
})

Expand Down
17 changes: 17 additions & 0 deletions test/dial.spec.js
Expand Up @@ -96,6 +96,23 @@ describe('dial', () => {
})
})

it('dial on IPv4, close listener, prevent end, re-start listener', (done) => {
ws1.dial(ma2, (err, conn) => {
expect(err).to.not.exist()

pull(
(end, cb) => {},
conn,
pull.drain(() => {
expect('Stream should never end').to.not.exist()
})
)

listeners[0].close(() => {})
listeners[0].listen(ma1, done)
})
})

it('dial offline / non-exist()ent node on IPv4, check callback', (done) => {
const maOffline = multiaddr('/ip4/127.0.0.1/tcp/40404/ws/p2p-websocket-star/ipfs/ABCD')

Expand Down

0 comments on commit 9953f95

Please sign in to comment.