Skip to content

Commit

Permalink
fix: sendTo uses shared connection - part 2: making sure messages get…
Browse files Browse the repository at this point in the history
… delivered to the correct rooms by using a global handler. Should fix #25
  • Loading branch information
pgte committed Dec 5, 2017
1 parent c9884a2 commit 88f2ff0
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 52 deletions.
61 changes: 61 additions & 0 deletions src/direct-connection-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict'

const pull = require('pull-stream')
const Buffer = require('safe-buffer').Buffer
const EventEmitter = require('events')

const emitter = new EventEmitter()

function handler (protocol, conn) {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
return this.emit('error', err)
}

const peerId = peerInfo.id.toB58String()

pull(
conn,
pull.map((message) => {
let msg
try {
msg = JSON.parse(message.toString())
} catch (err) {
emitter.emit('warning', err.message)
return // early
}

if (peerId !== msg.from) {
emitter.emit('warning', 'no peerid match ' + msg.from)
return // early
}

const topicIDs = msg.topicIDs
if (!Array.isArray(topicIDs)) {
emitter.emit('warning', 'no topic IDs')
return // early
}

msg.data = Buffer.from(msg.data, 'hex')
msg.seqno = Buffer.from(msg.seqno, 'hex')

topicIDs.forEach((topic) => {
emitter.emit(topic, msg)
})

return msg
}),
pull.onEnd((err) => {
// do nothinfg
if (err) {
emitter.emit('error', err)
}
})
)
})
}

exports = module.exports = {
handler: handler,
emitter: emitter
}
64 changes: 15 additions & 49 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ const diff = require('hyperdiff')
const EventEmitter = require('events')
const timers = require('timers')
const clone = require('lodash.clonedeep')
const pull = require('pull-stream')
const Buffer = require('safe-buffer').Buffer

const PROTOCOL = require('./protocol')
const Connection = require('./connection')
const encoding = require('./encoding')
const directConnection = require('./direct-connection-handler')

const DEFAULT_OPTIONS = {
pollInterval: 1000
Expand All @@ -28,6 +28,8 @@ class PubSubRoom extends EventEmitter {
this._peers = []
this._connections = {}

this._handleDirectMessage = this._handleDirectMessage.bind(this)

if (!this._ipfs.pubsub) {
throw new Error('This IPFS node does not have pubsub.')
}
Expand All @@ -54,6 +56,7 @@ class PubSubRoom extends EventEmitter {
Object.keys(this._connections).forEach((peer) => {
this._connections[peer].stop()
})
directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
this.emit('stop')
}

Expand Down Expand Up @@ -82,7 +85,8 @@ class PubSubRoom extends EventEmitter {
const seqno = Buffer.from([0])

const msg = {
from: this._ipfs._peerInfo.id._idB58String,
to: peer,
from: this._ipfs._peerInfo.id.toB58String(),
data: Buffer.from(message).toString('hex'),
seqno: seqno.toString('hex'),
topicIDs: [ this._topic ],
Expand Down Expand Up @@ -110,7 +114,9 @@ class PubSubRoom extends EventEmitter {
this._ipfs.pubsub.unsubscribe(this._topic, listener)
})

this._ipfs._libp2pNode.handle(PROTOCOL, this._handleDirectConnection.bind(this))
this._ipfs._libp2pNode.handle(PROTOCOL, directConnection.handler)

directConnection.emitter.on(this._topic, this._handleDirectMessage)
}

_pollPeers () {
Expand Down Expand Up @@ -141,51 +147,11 @@ class PubSubRoom extends EventEmitter {
this.emit('message', message)
}

_handleDirectConnection (protocol, conn) {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
return this.emit('error', err)
}

const peerId = peerInfo.id.toB58String()

pull(
conn,
pull.map((message) => {
let msg
try {
msg = JSON.parse(message.toString())
} catch (err) {
this.emit('warning', err.message)
return // early
}

if (peerId !== msg.from) {
this.emit('warning', 'no peerid match ' + msg.from)
return // early
}

const topicIDs = msg.topicIDs
if (!Array.isArray(topicIDs)) {
this.emit('warning', 'no topic IDs')
return // early
}

if (topicIDs.indexOf(this._topic) >= 0) {
msg.data = Buffer.from(msg.data, 'hex')
msg.seqno = Buffer.from(msg.seqno, 'hex')
this.emit('message', msg)
}

return message
}),
pull.onEnd((err) => {
// do nothinfg
if (err) {
this.emit('error', err)
}
})
)
})
_handleDirectMessage (message) {
if (message.to === this._ipfs._peerInfo.id.toB58String()) {
const m = Object.assign({}, message)
delete m.to
this.emit('message', m)
}
}
}
163 changes: 163 additions & 0 deletions test/concurrent-rooms.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect

const IPFS = require('ipfs')
const each = require('async/each')
const clone = require('lodash.clonedeep')

const Room = require('../')
const createRepo = require('./utils/create-repo-node')

const topic = 'pubsub-room-concurrency-test-' + Date.now() + '-' + Math.random()

const ipfsOptions = {
EXPERIMENTAL: {
pubsub: true
},
config: {
Addresses: {
Swarm: [
'/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star'
]
}
}
}

describe('concurrent rooms', function () {
this.timeout(30000)
const repos = []
let node1, node2
let id1, id2
let room1A, room1B, room2A, room2B
const topicA = topic + '-A'
const topicB = topic + '-B'

before((done) => {
const repo = createRepo()
repos.push(repo)
const options = Object.assign({}, clone(ipfsOptions), {
repo: repo
})
node1 = new IPFS(options)
node1.once('ready', () => {
node1.id((err, info) => {
expect(err).to.not.exist()
id1 = info.id
done()
})
})
})

before((done) => {
const repo = createRepo()
repos.push(repo)
const options = Object.assign({}, clone(ipfsOptions), {
repo: repo
})
node2 = new IPFS(options)
node2.once('ready', () => {
node2.id((err, info) => {
expect(err).to.not.exist()
id2 = info.id
done()
})
})
})

after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done))

it('can create a room, and they find each other', (done) => {
room1A = Room(node1, topicA)
room2A = Room(node2, topicA)
room1B = Room(node1, topicB)
room2B = Room(node2, topicB)
room1A.on('warning', console.log)
room2A.on('warning', console.log)
room1B.on('warning', console.log)
room2B.on('warning', console.log)

const roomNodes = [
[room1A, id2],
[room2A, id1],
[room1B, id2],
[room2A, id1]
]

each(roomNodes, (roomNode, cb) => {
const room = roomNode[0]
const waitingFor = roomNode[1]
room.once('peer joined', (id) => {
expect(id).to.equal(waitingFor)
cb()
})
}, done)
})

it('has peer', (done) => {
expect(room1A.getPeers()).to.deep.equal([id2])
expect(room1B.getPeers()).to.deep.equal([id2])
expect(room2A.getPeers()).to.deep.equal([id1])
expect(room2B.getPeers()).to.deep.equal([id1])
done()
})

it('can broadcast', (done) => {
let gotMessage = false
const crash = Crash('no broadcast message should leak to room B')
room1B.on('message', crash)
room1A.once('message', (message) => {
if (gotMessage) {
throw new Error('double message')
}
gotMessage = true
expect(message.from).to.equal(id2)
expect(message.data.toString()).to.equal('message 1')

room1B.removeListener('message', crash)
done()
})
room2A.broadcast('message 1')
})

it('can send private message', (done) => {
const crash = Crash('no private message should leak to room B')

room2B.on('message', crash)
room2A.once('message', (message) => {
expect(message.from).to.equal(id1)
expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
expect(message.topicIDs).to.deep.equal([topicA])
expect(message.topicCIDs).to.deep.equal([topicA])
expect(message.data.toString()).to.equal('message 2')
room2B.removeListener('message', crash)
done()
})
room1A.sendTo(id2, 'message 2')
})

it('can leave room', (done) => {
room1A.once('peer left', (peer) => {
expect(peer).to.equal(id2)
done()
})
room2A.leave()
})

it('after leaving, it does not receive more messages', (done) => {
room2A.on('message', Crash('should not receive this'))
room2A.leave()
room1A.broadcast('message 3')
setTimeout(done, 3000)
})
})

function Crash (message) {
return function () {
throw new Error(message)
}
}
6 changes: 3 additions & 3 deletions test/room.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ const ipfsOptions = {
}
}

describe('sync', function () {
describe('room', function () {
this.timeout(30000)
const repos = []
let node1, node2
let id1, id2
let room1, room2

before((done) => {
const repo = createRepo()
Expand Down Expand Up @@ -71,6 +70,7 @@ describe('sync', function () {

;([1, 2].forEach((n) => {
const topic = topicBase + '-' + n
let room1, room2
describe('topic ' + n, () => {
it('can create a room, and they find each other', (done) => {
room1 = Room(node1, topic)
Expand Down Expand Up @@ -103,7 +103,7 @@ describe('sync', function () {
let gotMessage = false
room1.on('message', (message) => {
if (gotMessage) {
throw new Error('double message')
throw new Error('double message:' + message.data.toString())
}
gotMessage = true
expect(message.from).to.equal(id2)
Expand Down

0 comments on commit 88f2ff0

Please sign in to comment.