From 1793e709d28f1625147004956dbb7beea53210fe Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 23 Nov 2016 17:38:50 +0100 Subject: [PATCH] feat(pubsub): Add pubsub api --- README.md | 24 ++++ src/api/pubsub.js | 162 +++++++++++++++++++++++++ src/load-commands.js | 1 + src/pubsub-message-stream.js | 33 +++++ src/pubsub-message-utils.js | 39 ++++++ test/interface/pubsub.spec.js | 23 ++++ test/ipfs-factory/daemon-spawner.js | 2 +- test/pubsub-in-browser.spec.js | 182 ++++++++++++++++++++++++++++ 8 files changed, 465 insertions(+), 1 deletion(-) create mode 100644 src/api/pubsub.js create mode 100644 src/pubsub-message-stream.js create mode 100644 src/pubsub-message-utils.js create mode 100644 test/interface/pubsub.spec.js create mode 100644 test/pubsub-in-browser.spec.js diff --git a/README.md b/README.md index 5d853e00c..7f29a44b4 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,30 @@ $ ipfs config --json API.HTTPHeaders.Access-Control-Allow-Methods "[\"PUT\", \"P > `js-ipfs-api` follows the spec defined by [`interface-ipfs-core`](https://github.com/ipfs/interface-ipfs-core), which concerns the interface to expect from IPFS implementations. This interface is a currently active endeavor. You can use it today to consult the methods available. +#### Caveats + +##### Pubsub + +**Currently, the [PubSub API only works in Node.js envinroment](https://github.com/ipfs/js-ipfs-api/issues/518)** + +We currently don't support pubsub when run in the browser, and we test it with separate set of tests to make sure if it's being used in the browser, pubsub errors. + +More info: https://github.com/ipfs/js-ipfs-api/issues/518 + +This means: +- You can use pubsub from js-ipfs-api in Node.js +- You can use pubsub from js-ipfs-api in Electron + (when js-ipfs-api is ran in the main process of Electron) +- You can't use pubsub from js-ipfs-api in the browser +- You can't use pubsub from js-ipfs-api in Electron's + renderer process +- You can use pubsub from js-ipfs in the browsers +- You can use pubsub from js-ipfs in Node.js +- You can use pubsub from js-ipfs in Electron + (in both the main process and the renderer process) +- See https://github.com/ipfs/js-ipfs for details on + pubsub in js-ipfs + ##### [bitswap]() - [`ipfs.bitswap.wantlist()`]() diff --git a/src/api/pubsub.js b/src/api/pubsub.js new file mode 100644 index 000000000..1afd05fab --- /dev/null +++ b/src/api/pubsub.js @@ -0,0 +1,162 @@ +'use strict' + +const promisify = require('promisify-es6') +const EventEmitter = require('events') +const eos = require('end-of-stream') +const isNode = require('detect-node') +const PubsubMessageStream = require('../pubsub-message-stream') +const stringlistToArray = require('../stringlist-to-array') + +const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser') + +/* Public API */ +module.exports = (send) => { + /* Internal subscriptions state and functions */ + const ps = new EventEmitter() + const subscriptions = {} + ps.id = Math.random() + return { + subscribe: (topic, options, handler, callback) => { + const defaultOptions = { + discover: false + } + + if (typeof options === 'function') { + callback = handler + handler = options + options = defaultOptions + } + + if (!options) { + options = defaultOptions + } + + // Throw an error if ran in the browsers + if (!isNode) { + if (!callback) { + return Promise.reject(NotSupportedError()) + } + return callback(NotSupportedError()) + } + + // promisify doesn't work as we always pass a + // function as last argument (`handler`) + if (!callback) { + return new Promise((resolve, reject) => { + subscribe(topic, options, handler, (err) => { + if (err) { + return reject(err) + } + resolve() + }) + }) + } + + subscribe(topic, options, handler, callback) + }, + unsubscribe: (topic, handler) => { + if (!isNode) { + throw NotSupportedError() + } + + if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { + throw new Error(`Not subscribed to '${topic}'`) + } + + ps.removeListener(topic, handler) + + // Drop the request once we are actualy done + if (ps.listenerCount(topic) === 0) { + subscriptions[topic].abort() + subscriptions[topic] = null + } + }, + publish: promisify((topic, data, callback) => { + if (!isNode) { + return callback(NotSupportedError()) + } + + if (!Buffer.isBuffer(data)) { + return callback(new Error('data must be a Buffer')) + } + + const request = { + path: 'pubsub/pub', + args: [topic, data] + } + + send(request, callback) + }), + ls: promisify((callback) => { + if (!isNode) { + return callback(NotSupportedError()) + } + + const request = { + path: 'pubsub/ls' + } + + send.andTransform(request, stringlistToArray, callback) + }), + peers: promisify((topic, callback) => { + if (!isNode) { + return callback(NotSupportedError()) + } + + const request = { + path: 'pubsub/peers', + args: [topic] + } + + send.andTransform(request, stringlistToArray, callback) + }), + setMaxListeners (n) { + return ps.setMaxListeners(n) + } + } + + function subscribe (topic, options, handler, callback) { + ps.on(topic, handler) + if (subscriptions[topic]) { + return callback() + } + + // Request params + const request = { + path: 'pubsub/sub', + args: [topic], + qs: { + discover: options.discover + } + } + + // Start the request and transform the response + // stream to Pubsub messages stream + subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { + if (err) { + subscriptions[topic] = null + ps.removeListener(topic, handler) + return callback(err) + } + + stream.on('data', (msg) => { + ps.emit(topic, msg) + }) + + stream.on('error', (err) => { + ps.emit('error', err) + }) + + eos(stream, (err) => { + if (err) { + ps.emit('error', err) + } + + subscriptions[topic] = null + ps.removeListener(topic, handler) + }) + + callback() + }) + } +} diff --git a/src/load-commands.js b/src/load-commands.js index b69c197cb..1246ef5ef 100644 --- a/src/load-commands.js +++ b/src/load-commands.js @@ -25,6 +25,7 @@ function requireCommands () { refs: require('./api/refs'), repo: require('./api/repo'), swarm: require('./api/swarm'), + pubsub: require('./api/pubsub'), update: require('./api/update'), version: require('./api/version') } diff --git a/src/pubsub-message-stream.js b/src/pubsub-message-stream.js new file mode 100644 index 000000000..b6631726f --- /dev/null +++ b/src/pubsub-message-stream.js @@ -0,0 +1,33 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const PubsubMessage = require('./pubsub-message-utils') + +class PubsubMessageStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + } + + static from (inputStream, callback) { + let outputStream = inputStream.pipe(new PubsubMessageStream()) + inputStream.on('end', () => outputStream.emit('end')) + callback(null, outputStream) + } + + _transform (obj, enc, callback) { + let msg + try { + msg = PubsubMessage.deserialize(obj, 'base64') + } catch (e) { + // Not a valid pubsub message + // go-ipfs returns '{}' as the very first object atm, we skip that + return callback() + } + + this.push(msg) + callback() + } +} + +module.exports = PubsubMessageStream diff --git a/src/pubsub-message-utils.js b/src/pubsub-message-utils.js new file mode 100644 index 000000000..05be7fa14 --- /dev/null +++ b/src/pubsub-message-utils.js @@ -0,0 +1,39 @@ +'use strict' + +const bs58 = require('bs58') + +module.exports = { + deserialize (data, enc = 'json') { + enc = enc ? enc.toLowerCase() : null + + if (enc === 'json') { + return deserializeFromJson(data) + } else if (enc === 'base64') { + return deserializeFromBase64(data) + } + + throw new Error(`Unsupported encoding: '${enc}'`) + } +} + +function deserializeFromJson (data) { + const json = JSON.parse(data) + return deserializeFromBase64(json) +} + +function deserializeFromBase64 (obj) { + if (!isPubsubMessage(obj)) { + throw new Error(`Not a pubsub message`) + } + + return { + from: bs58.encode(new Buffer(obj.from, 'base64')).toString(), + seqno: new Buffer(obj.seqno, 'base64'), + data: new Buffer(obj.data, 'base64'), + topicCIDs: obj.topicIDs || obj.topicCIDs + } +} + +function isPubsubMessage (obj) { + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) +} diff --git a/test/interface/pubsub.spec.js b/test/interface/pubsub.spec.js new file mode 100644 index 000000000..5448ccc97 --- /dev/null +++ b/test/interface/pubsub.spec.js @@ -0,0 +1,23 @@ +/* eslint-env mocha */ + +'use strict' + +const test = require('interface-ipfs-core') +const FactoryClient = require('../ipfs-factory/client') +const isNode = require('detect-node') + +if (isNode) { + let fc + + const common = { + setup: function (callback) { + fc = new FactoryClient() + callback(null, fc) + }, + teardown: function (callback) { + fc.dismantle(callback) + } + } + + test.pubsub(common) +} diff --git a/test/ipfs-factory/daemon-spawner.js b/test/ipfs-factory/daemon-spawner.js index 7904ea43e..0d861c402 100644 --- a/test/ipfs-factory/daemon-spawner.js +++ b/test/ipfs-factory/daemon-spawner.js @@ -84,7 +84,7 @@ function spawnEphemeralNode (callback) { node.setConfig(configKey, configVal, cb) }, cb) }, - (cb) => node.startDaemon(cb) + (cb) => node.startDaemon(['--enable-pubsub-experiment'], cb) ], (err) => callback(err, node)) }) } diff --git a/test/pubsub-in-browser.spec.js b/test/pubsub-in-browser.spec.js new file mode 100644 index 000000000..53b50d1ec --- /dev/null +++ b/test/pubsub-in-browser.spec.js @@ -0,0 +1,182 @@ +/* + We currently don't support pubsub when run in the browser, + and we test it with separate set of tests to make sure + if it's being used in the browser, pubsub errors. + + More info: https://github.com/ipfs/js-ipfs-api/issues/518 + + This means: + - You can use pubsub from js-ipfs-api in Node.js + - You can use pubsub from js-ipfs-api in Electron + (when js-ipfs-api is ran in the main process of Electron) + + - You can't use pubsub from js-ipfs-api in the browser + - You can't use pubsub from js-ipfs-api in Electron's + renderer process + + - You can use pubsub from js-ipfs in the browsers + - You can use pubsub from js-ipfs in Node.js + - You can use pubsub from js-ipfs in Electron + (in both the main process and the renderer process) + - See https://github.com/ipfs/js-ipfs for details on + pubsub in js-ipfs +*/ + +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ['error', 8] */ +'use strict' + +const series = require('async/series') +const waterfall = require('async/waterfall') +const isNode = require('detect-node') +const FactoryClient = require('./ipfs-factory/client') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const expectedError = 'pubsub is currently not supported when run in the browser' + +function spawnWithId (factory, callback) { + waterfall([ + (cb) => factory.spawnNode(cb), + (node, cb) => node.id((err, res) => { + if (err) { + return cb(err) + } + node.peerId = res + cb(null, node) + }) + ], callback) +} + +if (!isNode) { + describe('.pubsub-browser (pubsub not supported in the browsers currently)', () => { + const topic = 'pubsub-tests' + + let factory + let ipfs1 + + before((done) => { + factory = new FactoryClient() + + series([ + (cb) => spawnWithId(factory, cb) + ], (err, nodes) => { + if (err) { + return done(err) + } + + ipfs1 = nodes[0] + done() + }) + }) + + after((done) => { + factory.dismantle(done) + }) + + describe('everything errors', () => { + describe('Callback API', () => { + describe('.publish', () => { + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.publish(topic, 'hello friend', (err, topics) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + + describe('.subscribe', () => { + const handler = () => {} + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.subscribe(topic, {}, handler, (err, topics) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + + describe('.peers', () => { + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.peers(topic, (err, topics) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + + describe('.ls', () => { + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.ls((err, topics) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + }) + + describe('Promise API', () => { + describe('.publish', () => { + it('throws an error if called in the browser', () => { + return ipfs1.pubsub.publish(topic, 'hello friend') + .catch((err) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + }) + }) + }) + + describe('.subscribe', () => { + const handler = () => {} + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.subscribe(topic, {}, handler) + .catch((err) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + + describe('.peers', () => { + it('throws an error if called in the browser', (done) => { + ipfs1.pubsub.peers(topic) + .catch((err) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + }) + }) + }) + + describe('.ls', () => { + it('throws an error if called in the browser', () => { + return ipfs1.pubsub.ls() + .catch((err) => { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + }) + }) + }) + }) + + describe('.unsubscribe', () => { + it('throws an error if called in the browser', (done) => { + try { + ipfs1.pubsub.unsubscribe() + done('unsubscribe() didn\'t throw an error') + } catch (err) { + expect(err).to.exist() + expect(err.message).to.equal(expectedError) + done() + } + }) + }) + }) + }) +}