Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to support libp2p >=0.29.0 #81

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 .aegir.js 100644 → 100755
@@ -1,22 +1,20 @@
'use strict'

const Libp2p = require('libp2p')
const PeerInfo = require('peer-info')
const { config } = require('./test/utils/create-libp2p')

let relay

module.exports = {
hooks: {
pre: async () => {
const peerInfo = await PeerInfo.create()
peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/24642/ws')

const defaultConfig = await config()
const defaultConfig = await config(true)

relay = new Libp2p({
...defaultConfig,
peerInfo,
addresses: {
listen: ['/ip4/127.0.0.1/tcp/24642/ws'],
},
config: {
...defaultConfig.config,
relay: {
0 .gitignore 100644 → 100755
Empty file.
0 .travis.yml 100644 → 100755
Empty file.
0 CHANGELOG.md 100644 → 100755
Empty file.
0 LICENSE 100644 → 100755
Empty file.
6 README.md 100644 → 100755
@@ -71,11 +71,11 @@ const room = Room(libp2p, 'some-room-name')

## room.broadcast(message)

Broacasts message (string or buffer).
Broacasts message (string).

## room.sendTo(cid, message)

Sends message (string or buffer) to peer.
Sends message (string) to peer.

## async room.leave()

@@ -94,7 +94,7 @@ Returns a boolean indicating if the given peer is present in the room.
Listens for messages. A `message` is an object containing the following properties:

* `from` (string): peer id
* `data` (Buffer): message content
* `data` (string): message content

## room.on('peer joined', (cid) => {})

13 package.json 100644 → 100755
@@ -43,12 +43,13 @@
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"libp2p": "0.27.0-rc.0",
"libp2p-gossipsub": "0.2.1",
"libp2p-mplex": "^0.9.3",
"libp2p-secio": "^0.12.2",
"libp2p-websockets": "^0.13.2",
"peer-info": "^0.17.1",
"libp2p": "^0.29.0",
"libp2p-gossipsub": "^0.6.1",
"libp2p-mplex": "^0.10.0",
"libp2p-secio": "^0.13.1",
"libp2p-websockets": "^0.14.0",
"multiaddr": "^8.0.0",
"peer-id": "^0.14.1",
"rimraf": "^3.0.0"
},
"contributors": [
8 src/connection.js 100644 → 100755
@@ -2,9 +2,9 @@

const EventEmitter = require('events')
const pipe = require('it-pipe')
const PeerId = require('peer-id')

const PROTOCOL = require('./protocol')
const encoding = require('./encoding')

module.exports = class Connection extends EventEmitter {
constructor (remoteId, libp2p, room) {
@@ -18,7 +18,7 @@ module.exports = class Connection extends EventEmitter {

push (message) {
if (this._connection) {
this._connection.push(encoding(message))
this._connection.push(message)

return
}
@@ -47,8 +47,8 @@ module.exports = class Connection extends EventEmitter {
return // early
}

const peerInfo = this._libp2p.peerStore.get(this._remoteId)
const { stream } = await this._libp2p.dialProtocol(peerInfo, PROTOCOL)
const remotePeerId = await PeerId.createFromB58String(this._remoteId)
const { stream } = await this._libp2p.dialProtocol(remotePeerId, PROTOCOL)
this._connection = new FiFoMessageQueue()

pipe(this._connection, stream, async (source) => {
@@ -0,0 +1,11 @@
'use strict'

const uint8ArrayToString = require('uint8arrays/to-string')

module.exports = (_message) => {
let message = _message
if (message.constructor === Uint8Array) {
message = String(uint8ArrayToString(message))
}
return message
}
6 src/direct-connection-handler.js 100644 → 100755
@@ -2,6 +2,7 @@

const EventEmitter = require('events')
const pipe = require('it-pipe')
const decoding = require('./decoding')

const emitter = new EventEmitter()

@@ -15,7 +16,7 @@ function handler ({ connection, stream }) {
let msg

try {
msg = JSON.parse(message.toString())
msg = JSON.parse(message)
} catch (err) {
emitter.emit('warning', err.message)
continue // early
@@ -32,9 +33,6 @@ function handler ({ connection, stream }) {
continue // early
}

msg.data = Buffer.from(msg.data, 'hex')
msg.seqno = Buffer.from(msg.seqno, 'hex')

topicIDs.forEach((topic) => {
emitter.emit(topic, msg)
})
6 src/encoding.js 100644 → 100755
@@ -1,9 +1,11 @@
'use strict'

const uint8ArrayFromString = require('uint8arrays/from-string')

module.exports = (_message) => {
let message = _message
if (!Buffer.isBuffer(message)) {
message = Buffer.from(message)
if (message.constructor !== Uint8Array) {
message = uint8ArrayFromString(String(message))
}
return message
}
30 src/index.js 100644 → 100755
@@ -7,6 +7,7 @@ const clone = require('lodash.clonedeep')
const PROTOCOL = require('./protocol')
const Connection = require('./connection')
const encoding = require('./encoding')
const decoding = require('./decoding')
const directConnection = require('./direct-connection-handler')

const DEFAULT_OPTIONS = {
@@ -39,7 +40,8 @@ class PubSubRoom extends EventEmitter {
this._libp2p.handle(PROTOCOL, directConnection.handler)
directConnection.emitter.on(this._topic, this._handleDirectMessage)

this._libp2p.pubsub.subscribe(this._topic, this._handleMessage)
this._libp2p.pubsub.on(this._topic, this._handleMessage)
this._libp2p.pubsub.subscribe(this._topic)

this._idx = index++
}
@@ -59,7 +61,9 @@ class PubSubRoom extends EventEmitter {
})
directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
this._libp2p.unhandle(PROTOCOL, directConnection.handler)
await this._libp2p.pubsub.unsubscribe(this._topic, this._handleMessage)

await this._libp2p.pubsub.removeListener(this._topic, this._handleMessage)
await this._libp2p.pubsub.unsubscribe(this._topic)
}

async broadcast (_message) {
@@ -83,23 +87,21 @@ class PubSubRoom extends EventEmitter {
}

// We should use the same sequence number generation as js-libp2p-floosub does:
// const seqno = Buffer.from(utils.randomSeqno())
// const seqno = encoding(utils.randomSeqno())

// Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
// generator, let's use 0 as the sequence number for all private messages
// const seqno = Buffer.from([0])
const seqno = Buffer.from([0])
const seqno = 0

const msg = {
to: peer,
from: this._libp2p.peerInfo.id.toB58String(),
data: Buffer.from(message).toString('hex'),
seqno: seqno.toString('hex'),
from: this._libp2p.peerId.toB58String(),
data: message,
seqno: seqno,
topicIDs: [this._topic],
topicCIDs: [this._topic]
}

conn.push(Buffer.from(JSON.stringify(msg)))
conn.push(encoding(JSON.stringify(msg)))
}

async _pollPeers () {
@@ -114,17 +116,21 @@ class PubSubRoom extends EventEmitter {
const differences = diff(this._peers, newPeers)

differences.added.forEach((peer) => this.emit('peer joined', peer))
differences.removed.forEach((peer) => this.emit('peer left', peer))
differences.removed.forEach((peer) => {
delete this._connections[peer]
this.emit('peer left', peer)
})

return differences.added.length > 0 || differences.removed.length > 0
}

_onMessage (message) {
message.data = decoding(message.data)
this.emit('message', message)
}

_handleDirectMessage (message) {
if (message.to.toString() === this._libp2p.peerInfo.id.toB58String()) {
if (message.to.toString() === this._libp2p.peerId.toB58String()) {
const m = Object.assign({}, message)
delete m.to
this.emit('message', m)
0 src/protocol.js 100644 → 100755
Empty file.
6 test/concurrent-rooms.spec.js 100644 → 100755
@@ -23,12 +23,12 @@ describe('concurrent rooms', function () {

before(async () => {
node1 = await createLibp2p()
id1 = node1.peerInfo.id.toB58String()
id1 = node1.peerId.toB58String()
})

before(async () => {
node2 = await createLibp2p(node1)
id2 = node2.peerInfo.id.toB58String()
id2 = node2.peerId.toB58String()
})

after(() => {
@@ -107,7 +107,7 @@ describe('concurrent rooms', function () {
room2B.on('message', crash)
room2A.once('message', (message) => {
expect(message.from.toString()).to.equal(id1.toString())
expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
expect(message.seqno).to.equal(0)
expect(message.topicIDs).to.deep.equal([topicA])
expect(message.topicCIDs).to.deep.equal([topicA])
expect(message.data.toString()).to.equal('message 2')
6 test/room.spec.js 100644 → 100755
@@ -18,12 +18,12 @@ describe('room', function () {

before(async () => {
node1 = await createLibp2p()
id1 = node1.peerInfo.id.toB58String()
id1 = node1.peerId.toB58String()
})

before(async () => {
node2 = await createLibp2p(node1)
id2 = node2.peerInfo.id.toB58String()
id2 = node2.peerId.toB58String()
})

const rooms = []
@@ -89,7 +89,7 @@ describe('room', function () {
}
gotMessage = true
expect(message.from).to.deep.equal(id1)
expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
expect(message.seqno).to.equal(0)
expect(message.topicIDs).to.deep.equal([topic])
expect(message.topicCIDs).to.deep.equal([topic])
expect(message.data.toString()).to.equal('message 2')
4 test/same-node.spec.js 100644 → 100755
@@ -36,9 +36,9 @@ describe('same node', function () {

it('mirrors broadcast', (done) => {
rooms[0].once('message', (message) => {
expect(message.data.toString()).to.equal('message 1')
expect(message.data).to.equal('message 1')
rooms[0].once('message', (message) => {
expect(message.data.toString()).to.equal('message 2')
expect(message.data).to.equal('message 2')
done()
})
})
20 test/utils/create-libp2p.js 100644 → 100755
@@ -5,13 +5,19 @@ const WS = require('libp2p-websockets')
const Multiplex = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const GossipSub = require('libp2p-gossipsub')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')

const RELAY_MULTIADDR = '/ip4/127.0.0.1/tcp/24642/ws'
const RELAY_PEER_ID_JSON = {
id: 'QmQLNqWfXdTAEPsCWtiaVhDCCFfsYian7YLryRdQn3fgL4',
privKey: 'CAASqAkwggSkAgEAAoIBAQDVCnVIHt/xx3LlS0bHUlS6A1oxuqfwrzCSNrr0T68RM1i/2zc1Pl6dMLdhpzIrMHrt6J4nMA2nzC71vHNvymQeLWvKmAWTEcKbGR8lQGHibBMP/1vwOwFlKxy3JzaqDS8hP6qTnMgGpMJosiroitTNHAJg2PCQM0zX+Q8ehsiwhoJ5IaIoCzutyo7S5X7uubOjUMj+tAVuX34/lz8ynQfnfFeS6GuCddsljg/RZ3c+wS+EPwsNd0VJjm3L5M9b+Ofh8IW5FI/MMmHg2+dpHJnZv9QCtclvCZJj0xUKpxtluM5NHd64h+fBQsmpB75b2eUkB3RUNLy/ngC50NbAIRN1AgMBAAECggEBAJW/HiUtnpgia76EpSGh23BMvu9JlpZ1bhy4X70u7Y2XnABvpGTGjFbNUXlQvtDg6OelpNVCz7ZsrW2Jo1Km3qzfnG7xYKm5yCKhC+VxVdyDvvp1sjgwIZDtNuf+pkvtrH0gdVQA1hDlasmQwtxmCaKK15kfpCiYBqGgrWH1t8dr3H4Vk2BUkQeryQ1kmRZ3fUJw+VNiB7yhmw+tx5ytFB3OMNxvUPylHM07s8VjpZ7B8Xde3jOlJYJFBQ7k1rkfuULtc5XD+7DrsF0DutN7X3N1TZf2iBYGwttN6CfHC8yRxwIgWzli53ldUV1YhtBB8MgCWUMbttfmoe9Fo9yMNoECgYEA8PTwPXwljlx4eAzr8FZD8HifXXTYcuVhpepFx5jlMwJ1sTYr2nIfaEf136iweYIv6A/5+7/S8yFmdlaf3Ito9m/6qXGUD8I5h90YSUyeBJy1IEPg1dqM+nIufMh9Oo++VPs/xnUEbcpCVfMUK4Gr1l/iLes70nEFjcI9VZVvZdECgYEA4ldbUfGvc/cwUbIgVTHFIeg0czd1XlOic8ur9x1MdhVIi0AjVZzV223E0X2qg90uCV9Ztn7kz9e9Z5e2tRidcfJ+1/t7M5l/NSCbD85H3HSZVBklwOft968KauboLtE2cFyve/ZM7s9aG8fEwi88IktsxHk9Dvb4x35JQOQBaGUCgYEAhPsZL0W50GS2U8MF36EsY6Wehkx7PIXdq1ys4ChArjM4UvILp8Z+EOZOCv6lTpoL6G4Qz+ChAm+3ha3vEh+acQ+B7kvxo/TUHWhnA+UV/IOj7senaT7xuTKU92cKvewg5fO30cY5CIKss5Sw2AX7mRdX03HUlSKtJvxBL1+GmFECgYADYZCwqa6YSeID5mhLPYIXXpOiAPsU3KT5m9pGx75DqU+7HMsqVTxwmbQt+PWaIKy2YSFC86RRYoSmzoJhNCvt7tRsP4p4m9tlnMYUN12lcmxz8Cg7OHu6jnfWXvqq8F8i0I+ih2xgyOIsthA/YltAm+XVDYaW+aN/v2gyuvU2bQKBgH2wWDWTQEJ4eX9ZwDmzCWKZlS6Hgb7bxxxE6g9asE6t0GwjcaMsLL4H7NajuEmxYWmDOVcGN04n/9NbTwZ+WQeTCLUGVLZxRo7bfyNVrbUoY21X7FrMptwX2GXg4U/JmkOYXN4xhgHXbzZVZwCiIjuXi9h/lJ4s+djses5P5T32',
pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVCnVIHt/xx3LlS0bHUlS6A1oxuqfwrzCSNrr0T68RM1i/2zc1Pl6dMLdhpzIrMHrt6J4nMA2nzC71vHNvymQeLWvKmAWTEcKbGR8lQGHibBMP/1vwOwFlKxy3JzaqDS8hP6qTnMgGpMJosiroitTNHAJg2PCQM0zX+Q8ehsiwhoJ5IaIoCzutyo7S5X7uubOjUMj+tAVuX34/lz8ynQfnfFeS6GuCddsljg/RZ3c+wS+EPwsNd0VJjm3L5M9b+Ofh8IW5FI/MMmHg2+dpHJnZv9QCtclvCZJj0xUKpxtluM5NHd64h+fBQsmpB75b2eUkB3RUNLy/ngC50NbAIRN1AgMBAAE='
}

const config = async () => {
const config = async (isRelay) => {
return {
peerInfo: await PeerInfo.create(),
peerId: isRelay ? await PeerId.createFromJSON(RELAY_PEER_ID_JSON) : await PeerId.create(),
dialer: {
maxParallelDials: 150, // 150 total parallel multiaddr dials
maxDialsPerPeer: 4, // Allow 4 multiaddrs to be dialed per peer in parallel
@@ -50,13 +56,15 @@ module.exports = async (otherNode) => {
await node.start()

// connect to relay peer
await node.dial(RELAY_MULTIADDR)
const relayPeerId = await PeerId.createFromJSON(RELAY_PEER_ID_JSON)
node.peerStore.addressBook.add(relayPeerId, [multiaddr(RELAY_MULTIADDR)])
await node.dial(relayPeerId)

// both nodes created, get them to dial each other via the relay
if (otherNode) {
const relayId = node.connections.keys().next().value
const otherNodeId = otherNode.peerInfo.id.toB58String()
const nodeId = node.peerInfo.id.toB58String()
const otherNodeId = otherNode.peerId.toB58String()
const nodeId = node.peerId.toB58String()

await node.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${otherNodeId}`)
await otherNode.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${nodeId}`)