Skip to content

Commit

Permalink
fix: sendTo uses shared connection: making sure messages get delivere…
Browse files Browse the repository at this point in the history
…d to the correct rooms. Should fix #25
  • Loading branch information
pgte committed Dec 4, 2017
1 parent 0fe6e17 commit 6d091d0
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 69 deletions.
4 changes: 1 addition & 3 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ module.exports = class Connection extends EventEmitter {
this._connection.push(encoding(message))
} else {
if (!this._connecting) {
this.once('connect', () => this.push(message))
this._getConnection()
}
this.once('connect', () => {
this.push(message)
})
}
}

Expand Down
62 changes: 45 additions & 17 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,24 @@ class PubSubRoom extends EventEmitter {
conn = new Connection(peer, this._ipfs, this)
conn.on('error', (err) => this.emit('error', err))
}
conn.push(message)

// We should use the same sequence number generation as js-libp2p-floosub does:
// const seqno = Buffer.from(utils.randomSeqno())

// Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
// generator, let's use 0 as the sequence number for all private messages
// const seqno = Buffer.from([0])
const seqno = Buffer.from([0])

const msg = {
from: this._ipfs._peerInfo.id._idB58String,
data: Buffer.from(message).toString('hex'),
seqno: seqno.toString('hex'),
topicIDs: [ this._topic ],
topicCIDs: [ this._topic ]
}

conn.push(Buffer.from(JSON.stringify(msg)))
}

_start () {
Expand Down Expand Up @@ -127,34 +144,45 @@ class PubSubRoom extends EventEmitter {
_handleDirectConnection (protocol, conn) {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
throw err
return this.emit('error', err)
}

const peerId = peerInfo.id.toB58String()

pull(
conn,
pull.map((message) => {
// We should use the same sequence number generation as js-libp2p-floosub does:
// const seqno = Buffer.from(utils.randomSeqno())

// Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
// generator, let's use 0 as the sequence number for all private messages
const seqno = Buffer.from([0])

this.emit('message', {
from: peerId,
data: message,
seqno: seqno,
topicIDs: [ this._topic ],
topicCIDs: [ this._topic ]
})
let msg
try {
msg = JSON.parse(message.toString())
} catch (err) {
this.emit('warning', err.message)
return // early
}

if (peerId !== msg.from) {
this.emit('warning', 'no peerid match ' + msg.from)
return // early
}

const topicIDs = msg.topicIDs
if (!Array.isArray(topicIDs)) {
this.emit('warning', 'no topic IDs')
return // early
}

if (topicIDs.indexOf(this._topic) >= 0) {
msg.data = Buffer.from(msg.data, 'hex')
msg.seqno = Buffer.from(msg.seqno, 'hex')
this.emit('message', msg)
}

return message
}),
pull.onEnd((err) => {
// do nothinfg
if (err) {
this.emit('warning', err)
this.emit('error', err)
}
})
)
Expand Down
2 changes: 1 addition & 1 deletion src/protocol.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
'use strict'

module.exports = 'ipfs-pubsub-room/v1'
module.exports = 'ipfs-pubsub-room/v2'
116 changes: 68 additions & 48 deletions test/room.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
Expand All @@ -12,7 +13,7 @@ const clone = require('lodash.clonedeep')
const Room = require('../')
const createRepo = require('./utils/create-repo-node')

const topic = 'pubsub-room-test-' + Date.now() + '-' + Math.random()
const topicBase = 'pubsub-room-test-' + Date.now() + '-' + Math.random()

const ipfsOptions = {
EXPERIMENTAL: {
Expand Down Expand Up @@ -66,58 +67,77 @@ describe('sync', function () {
})
})

after((done) => each(repos, (repo, cb) => repo.teardown(cb), done))
after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done))

it('can create a room, and they find each other', (done) => {
room1 = Room(node1, topic)
room2 = Room(node2, topic)
let left = 2
room1.once('peer joined', (id) => {
expect(id).to.equal(id2)
if (--left === 0) {
done()
}
})
room2.once('peer joined', (id) => {
expect(id).to.equal(id1)
if (--left === 0) {
;([1, 2].forEach((n) => {
const topic = topicBase + '-' + n
describe('topic ' + n, () => {
it('can create a room, and they find each other', (done) => {
room1 = Room(node1, topic)
room2 = Room(node2, topic)
room1.on('warning', console.log)
room2.on('warning', console.log)

let left = 2
room1.once('peer joined', (id) => {
expect(id).to.equal(id2)
if (--left === 0) {
done()
}
})
room2.once('peer joined', (id) => {
expect(id).to.equal(id1)
if (--left === 0) {
done()
}
})
})

it('has peer', (done) => {
expect(room1.getPeers()).to.deep.equal([id2])
expect(room2.getPeers()).to.deep.equal([id1])
done()
}
})
})
})

it('has peer', (done) => {
expect(room1.getPeers()).to.deep.equal([id2])
expect(room2.getPeers()).to.deep.equal([id1])
done()
})
it('can broadcast', (done) => {
let gotMessage = false
room1.on('message', (message) => {
if (gotMessage) {
throw new Error('double message')
}
gotMessage = true
expect(message.from).to.equal(id2)
expect(message.data.toString()).to.equal('message 1')
done()
})
room2.broadcast('message 1')
})

it('can broadcast', (done) => {
room1.broadcast('message 1')
room2.once('message', (message) => {
expect(message.from).to.equal(id1)
expect(message.data.toString()).to.equal('message 1')
done()
})
})
it('can send private message', (done) => {
let gotMessage = false

it('can send private message', (done) => {
room2.sendTo(id1, 'message 2')
room1.once('message', (message) => {
expect(message.from).to.equal(id2)
expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
expect(message.topicIDs).to.deep.equal([room2._topic])
expect(message.topicCIDs).to.deep.equal([room2._topic])
expect(message.data.toString()).to.equal('message 2')
done()
})
})
room2.on('message', (message) => {
if (gotMessage) {
throw new Error('double message')
}
gotMessage = true
expect(message.from).to.equal(id1)
expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
expect(message.topicIDs).to.deep.equal([topic])
expect(message.topicCIDs).to.deep.equal([topic])
expect(message.data.toString()).to.equal('message 2')
done()
})
room1.sendTo(id2, 'message 2')
})

it('can be stopped', (done) => {
room1.once('peer left', (peer) => {
expect(peer).to.equal(id2)
done()
it('can leave room', (done) => {
room1.once('peer left', (peer) => {
expect(peer).to.equal(id2)
done()
})
room2.leave()
})
})
room2.leave()
})
}))
})

0 comments on commit 6d091d0

Please sign in to comment.