Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: use PubSub API directly from libp2p
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Feb 15, 2018
1 parent 905bdc0 commit 11e8f77
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 101 deletions.
5 changes: 2 additions & 3 deletions src/core/components/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module.exports = function libp2p (self) {
bootstrap: get(config, 'Bootstrap'),
modules: self._libp2pModules,
// EXPERIMENTAL
pubsub: get(self._options, 'EXPERIMENTAL.pubsub', false),
dht: get(self._options, 'EXPERIMENTAL.dht', false),
relay: {
enabled: get(config, 'EXPERIMENTAL.relay.enabled', false),
Expand Down Expand Up @@ -50,9 +51,7 @@ module.exports = function libp2p (self) {
})

self._libp2pNode.start((err) => {
if (err) {
return callback(err)
}
if (err) { return callback(err) }

self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => {
console.log('Swarm listening on', ma.toString())
Expand Down
24 changes: 0 additions & 24 deletions src/core/components/no-floodsub.js

This file was deleted.

62 changes: 7 additions & 55 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,96 +1,48 @@
'use strict'

const promisify = require('promisify-es6')
const setImmediate = require('async/setImmediate')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

module.exports = function pubsub (self) {
return {
subscribe: (topic, options, handler, callback) => {
if (!self.isOnline()) {
throw new Error(OFFLINE_ERROR)
}

if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

function subscribe (cb) {
if (self._pubsub.listenerCount(topic) === 0) {
self._pubsub.subscribe(topic)
}

self._pubsub.on(topic, handler)
setImmediate(cb)
}

if (!callback) {
return new Promise((resolve, reject) => {
subscribe((err) => {
self.libp2p.pubsub.subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
} else {
subscribe(callback)
self.libp2p.pubsub.subscribe(topic, options, handler, callback)
}
},

unsubscribe: (topic, handler) => {
self._pubsub.removeListener(topic, handler)

if (self._pubsub.listenerCount(topic) === 0) {
self._pubsub.unsubscribe(topic)
}
self.libp2p.pubsub.unsubscribe(topic, handler)
},

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}

self._pubsub.publish(topic, data)
setImmediate(() => callback())
self.libp2p.pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

const subscriptions = Array.from(self._pubsub.subscriptions)

setImmediate(() => callback(null, subscriptions))
self.libp2p.pubsub.ls(callback)
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

if (typeof topic === 'function') {
callback = topic
topic = null
}

const peers = Array.from(self._pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
self.libp2p.pubsub.peers(topic, callback)
}),

setMaxListeners (n) {
return self._pubsub.setMaxListeners(n)
self.libp2p.pubsub.setMaxListeners(n)
}
}
}
12 changes: 2 additions & 10 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')
const NoFloodSub = require('./no-floodsub')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

Expand Down Expand Up @@ -38,9 +36,7 @@ module.exports = (self) => {
(cb) => self.preStart(cb),
(cb) => self.libp2p.start(cb)
], (err) => {
if (err) {
return done(err)
}
if (err) { return done(err) }

self._bitswap = new Bitswap(
self._libp2pNode,
Expand All @@ -50,11 +46,7 @@ module.exports = (self) => {

self._bitswap.start()
self._blockService.setExchange(self._bitswap)

self._pubsub = self._options.EXPERIMENTAL.pubsub
? new FloodSub(self._libp2pNode)
: new NoFloodSub()
self._pubsub.start(done)
done()
})
})
}
1 change: 0 additions & 1 deletion src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ module.exports = (self) => {
self._bitswap.stop()

series([
(cb) => self._pubsub.stop(cb),
(cb) => self.libp2p.stop(cb),
(cb) => self._repo.close(cb)
], done)
Expand Down
15 changes: 7 additions & 8 deletions test/core/bitswap.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ const isNode = require('detect-node')
const multihashing = require('multihashing-async')
const CID = require('cids')

const DaemonFactory = require('ipfsd-ctl')
const df = DaemonFactory.create({ type: 'js' })

const dfProc = DaemonFactory.create({ type: 'proc' })
const IPFSFactory = require('ipfsd-ctl')
const fDaemon = IPFSFactory.create({ type: 'js' })
const fInProc = IPFSFactory.create({ type: 'proc' })

// This gets replaced by '../utils/create-repo-browser.js' in the browser
const createTempRepo = require('../utils/create-repo-nodejs.js')
Expand Down Expand Up @@ -69,7 +68,7 @@ function connectNodes (remoteNode, inProcNode, callback) {
let nodes = []

function addNode (inProcNode, callback) {
df.spawn({
fDaemon.spawn({
exec: './src/cli/bin.js',
config: {
Addresses: {
Expand All @@ -89,7 +88,7 @@ function addNode (inProcNode, callback) {
})
}

describe('bitswap', function () {
describe.only('bitswap', function () {
this.timeout(80 * 1000)

let inProcNode // Node spawned inside this process
Expand Down Expand Up @@ -119,7 +118,7 @@ describe('bitswap', function () {
})
}

dfProc.spawn({ exec: IPFS, config }, (err, _ipfsd) => {
fInProc.spawn({ exec: IPFS, config: config }, (err, _ipfsd) => {
expect(err).to.not.exist()
nodes.push(_ipfsd)
inProcNode = _ipfsd.api
Expand All @@ -137,7 +136,7 @@ describe('bitswap', function () {
})
})

describe('transfer a block between', () => {
describe.only('transfer a block between', () => {
it('2 peers', function (done) {
this.timeout(80 * 1000)

Expand Down

0 comments on commit 11e8f77

Please sign in to comment.