Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
chore: remove peer-info usage (#102)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: using new topology api with peer-id instead of peer-info and new pubsub internal peer data structure
  • Loading branch information
vasco-santos committed Apr 23, 2020
1 parent 6641c65 commit cb3497c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 98 deletions.
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -38,7 +38,7 @@ js-libp2p-floodsub
const FloodSub = require('libp2p-floodsub')

// registrar is provided by libp2p
const fsub = new FloodSub(peerInfo, registrar, options)
const fsub = new FloodSub(peerId, registrar, options)

await fsub.start()

Expand All @@ -56,7 +56,7 @@ fsub.publish('fruit', new Buffer('banana'))

```js
const options = {…}
const floodsub = new Floodsub(peerInfo, registrar, options)
const floodsub = new Floodsub(peerId, registrar, options)
```

Options is an optional object with the following key-value pairs:
Expand All @@ -75,9 +75,9 @@ Floodsub emits two kinds of events:
- `data`: a Buffer containing the data that was published to the topic
2. `floodsub:subscription-change` when the local peer receives an update to the subscriptions of a remote peer.
```Javascript
fsub.on('floodsub:subscription-change', (peerInfo, topics, changes) => { ... })
fsub.on('floodsub:subscription-change', (peerId, topics, changes) => { ... })
```
- `peerInfo`: a [PeerInfo](https://github.com/libp2p/js-peer-info) object
- `peerId`: a [PeerId](https://github.com/libp2p/js-peer-id) object
- `topics`: the topics that the peer is now subscribed to
- `changes`: an array of `{ topicID: <topic>, subscribe: <boolean> }`
eg `[ { topicID: 'fruit', subscribe: true }, { topicID: 'vegetables': false } ]`
Expand Down
5 changes: 2 additions & 3 deletions package.json
Expand Up @@ -52,17 +52,16 @@
"lodash": "^4.17.15",
"multiaddr": "^7.1.0",
"p-defer": "^3.0.0",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
"sinon": "^9.0.1"
},
"dependencies": {
"async.nexttick": "^0.5.2",
"debug": "^4.1.1",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "~0.4.0",
"libp2p-pubsub": "^0.5.0",
"p-map": "^3.0.0",
"peer-id": "~0.13.3",
"protons": "^1.0.1",
"time-cache": "^0.3.0"
},
Expand Down
30 changes: 15 additions & 15 deletions src/index.js
Expand Up @@ -11,7 +11,7 @@ const pMap = require('p-map')
const TimeCache = require('time-cache')
const nextTick = require('async.nexttick')

const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const BaseProtocol = require('libp2p-pubsub')
const { message, utils } = require('libp2p-pubsub')
const { multicodec } = require('./config')
Expand Down Expand Up @@ -43,7 +43,7 @@ function validateRegistrar (registrar) {
*/
class FloodSub extends BaseProtocol {
/**
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
* @param {PeerId} peerId instance of the peer's PeerId
* @param {Object} registrar
* @param {function} registrar.handle
* @param {function} registrar.register
Expand All @@ -52,17 +52,17 @@ class FloodSub extends BaseProtocol {
* @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to false
* @constructor
*/
constructor (peerInfo, registrar, options = {}) {
if (!PeerInfo.isPeerInfo(peerInfo)) {
throw new Error('peer info must be an instance of `peer-info`')
constructor (peerId, registrar, options = {}) {
if (!PeerId.isPeerId(peerId)) {
throw new Error('peerId must be an instance of `peer-id`')
}

validateRegistrar(registrar)

super({
debugName: debugName,
multicodecs: multicodec,
peerInfo: peerInfo,
peerId: peerId,
registrar: registrar,
...options
})
Expand Down Expand Up @@ -94,13 +94,13 @@ class FloodSub extends BaseProtocol {
/**
* Peer connected successfully with pubsub protocol.
* @override
* @param {PeerInfo} peerInfo peer info
* @param {PeerId} peerId peer id
* @param {Connection} conn connection to the peer
* @returns {Promise<void>}
*/
async _onPeerConnected (peerInfo, conn) {
await super._onPeerConnected(peerInfo, conn)
const idB58Str = peerInfo.id.toB58String()
async _onPeerConnected (peerId, conn) {
await super._onPeerConnected(peerId, conn)
const idB58Str = peerId.toB58String()
const peer = this.peers.get(idB58Str)

if (peer && peer.isWritable) {
Expand All @@ -115,7 +115,7 @@ class FloodSub extends BaseProtocol {
* @override
* @param {string} idB58Str peer id string in base58
* @param {Connection} conn connection
* @param {PeerInfo} peer peer info
* @param {Peer} peer peer
* @returns {void}
*
*/
Expand All @@ -134,7 +134,7 @@ class FloodSub extends BaseProtocol {
}
)
} catch (err) {
this._onPeerDisconnected(peer, err)
this._onPeerDisconnected(peer.id, err)
}
}

Expand All @@ -161,7 +161,7 @@ class FloodSub extends BaseProtocol {

if (peer && subs && subs.length) {
peer.updateSubscriptions(subs)
this.emit('floodsub:subscription-change', peer.info, peer.topics, subs)
this.emit('floodsub:subscription-change', peer.id, peer.topics, subs)
}
}

Expand Down Expand Up @@ -222,7 +222,7 @@ class FloodSub extends BaseProtocol {

peer.sendMessages(utils.normalizeOutRpcMessages(messages))

log('publish msgs on topics', topics, peer.info.id.toB58String())
log('publish msgs on topics', topics, peer.id.toB58String())
})
}

Expand Down Expand Up @@ -254,7 +254,7 @@ class FloodSub extends BaseProtocol {
topics = ensureArray(topics)
messages = ensureArray(messages)

const from = this.peerInfo.id.toB58String()
const from = this.peerId.toB58String()

const buildMessage = (msg) => {
const seqno = utils.randomSeqno()
Expand Down
46 changes: 23 additions & 23 deletions test/2-nodes.spec.js
Expand Up @@ -15,7 +15,7 @@ const { multicodec } = require('../src')
const {
defOptions,
first,
createPeerInfo,
createPeerId,
createMockRegistrar,
expectSet,
ConnectionPair
Expand All @@ -27,21 +27,21 @@ function shouldNotHappen (_) {

describe('basics between 2 nodes', () => {
describe('fresh nodes', () => {
let peerInfoA, peerInfoB
let peerIdA, peerIdB
let fsA, fsB

const registrarRecordA = {}
const registrarRecordB = {}

// Mount pubsub protocol
before(async () => {
[peerInfoA, peerInfoB] = await Promise.all([
createPeerInfo(),
createPeerInfo()
[peerIdA, peerIdB] = await Promise.all([
createPeerId(),
createPeerId()
])

fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)
fsA = new FloodSub(peerIdA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerIdB, createMockRegistrar(registrarRecordB), defOptions)

expect(fsA.peers.size).to.be.eql(0)
expect(fsA.subscriptions.size).to.eql(0)
Expand All @@ -62,13 +62,13 @@ describe('basics between 2 nodes', () => {

// Notice peers of connection
const [c0, c1] = ConnectionPair()
await onConnectA(peerInfoB, c0)
await onConnectA(peerIdB, c0)

await handleB({
protocol: multicodec,
stream: c1.stream,
connection: {
remotePeer: peerInfoA.id
remotePeer: peerIdA
}
})

Expand All @@ -87,11 +87,11 @@ describe('basics between 2 nodes', () => {
const defer = pDefer()

fsA.subscribe('Z')
fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => {
fsB.once('floodsub:subscription-change', (changedPeerId, changedTopics, changedSubs) => {
expectSet(fsA.subscriptions, ['Z'])
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, ['Z'])
expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String())
expect(changedPeerId.toB58String()).to.equal(first(fsB.peers).id.toB58String())
expectSet(changedTopics, ['Z'])
expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: true }])
defer.resolve()
Expand Down Expand Up @@ -147,7 +147,7 @@ describe('basics between 2 nodes', () => {

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String())
expect(msg.from).to.be.eql(fsB.peerId.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql(['Z'])

Expand All @@ -172,7 +172,7 @@ describe('basics between 2 nodes', () => {

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String())
expect(msg.from).to.be.eql(fsB.peerId.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql(['Z'])

Expand All @@ -197,10 +197,10 @@ describe('basics between 2 nodes', () => {
fsA.unsubscribe('Z')
expect(fsA.subscriptions.size).to.equal(0)

fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => {
fsB.once('floodsub:subscription-change', (changedPeerId, changedTopics, changedSubs) => {
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, [])
expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String())
expect(changedPeerId.toB58String()).to.equal(first(fsB.peers).id.toB58String())
expectSet(changedTopics, [])
expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: false }])

Expand Down Expand Up @@ -230,21 +230,21 @@ describe('basics between 2 nodes', () => {
})

describe('nodes send state on connection', () => {
let peerInfoA, peerInfoB
let peerIdA, peerIdB
let fsA, fsB

const registrarRecordA = {}
const registrarRecordB = {}

// Mount pubsub protocol
before(async () => {
[peerInfoA, peerInfoB] = await Promise.all([
createPeerInfo(),
createPeerInfo()
[peerIdA, peerIdB] = await Promise.all([
createPeerId(),
createPeerId()
])

fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)
fsA = new FloodSub(peerIdA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerIdB, createMockRegistrar(registrarRecordB), defOptions)
})

// Start pubsub
Expand Down Expand Up @@ -278,8 +278,8 @@ describe('basics between 2 nodes', () => {

// Notice peers of connection
const [c0, c1] = ConnectionPair()
await onConnectA(peerInfoB, c0)
await onConnectB(peerInfoA, c1)
await onConnectA(peerIdB, c0)
await onConnectB(peerIdA, c1)
}

await Promise.all([
Expand Down
12 changes: 6 additions & 6 deletions test/emit-self.spec.js
Expand Up @@ -9,20 +9,20 @@ const expect = chai.expect
const FloodSub = require('../src')

const {
createPeerInfo, mockRegistrar
createPeerId, mockRegistrar
} = require('./utils')

const shouldNotHappen = (_) => expect.fail()

describe('emit self', () => {
let floodsub
let peerInfo
let peerId
const topic = 'Z'

describe('enabled', () => {
before(async () => {
peerInfo = await createPeerInfo()
floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: true })
peerId = await createPeerId()
floodsub = new FloodSub(peerId, mockRegistrar, { emitSelf: true })
})

before(async () => {
Expand All @@ -44,8 +44,8 @@ describe('emit self', () => {

describe('disabled', () => {
before(async () => {
peerInfo = await createPeerInfo()
floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: false })
peerId = await createPeerId()
floodsub = new FloodSub(peerId, mockRegistrar, { emitSelf: false })
})

before(async () => {
Expand Down

0 comments on commit cb3497c

Please sign in to comment.