From 5cb17fd55986e99fd396ad2830aa673af259c8ab Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 6 May 2019 13:08:16 +0200 Subject: [PATCH] feat: add support for message signing BREAKING CHANGE: as .publish should now sign messages (via _buildMessage) it now requires a callback since signing is async. This also adds an options param to the pubsub constructor to allow for disabling signing. While this change shouldnt break things upstream, implementations need to be sure to call _buildMessage for each message they will publish. --- src/index.js | 32 ++++++++++++++++++++++-- src/message/index.js | 4 +++ src/message/rpc.proto.js | 4 ++- src/message/sign.js | 33 +++++++++++++++++++++++++ src/peer.js | 6 ++--- src/utils.js | 16 ++++++------ test/pubsub.js | 44 ++++++++++++++++++++++++++------- test/sign.spec.js | 53 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 170 insertions(+), 22 deletions(-) create mode 100644 src/message/sign.js create mode 100644 test/sign.spec.js diff --git a/src/index.js b/src/index.js index 1a9c0cd714..271cf3e105 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ const errcode = require('err-code') const Peer = require('./peer') const message = require('./message') +const { signMessage } = require('./message/sign') const utils = require('./utils') const nextTick = require('async/nextTick') @@ -22,17 +23,28 @@ class PubsubBaseProtocol extends EventEmitter { * @param {String} debugName * @param {String} multicodec * @param {Object} libp2p libp2p implementation + * @param {Object} options + * @param {boolean} options.signMessages if messages should be signed, defaults to true * @constructor */ - constructor (debugName, multicodec, libp2p) { + constructor (debugName, multicodec, libp2p, options) { super() + options = { + signMessages: true, + ...options + } + this.log = debug(debugName) this.log.err = debug(`${debugName}:error`) this.multicodec = multicodec this.libp2p = libp2p this.started = false + if (options.signMessages) { + this.peerId = this.libp2p.peerInfo.id + } + /** * Map of topics to which peers are subscribed to * @@ -225,16 +237,32 @@ class PubsubBaseProtocol extends EventEmitter { this._removePeer(peer) } + /** + * Normalizes the message and signs it, if signing is enabled + * + * @param {Message} message + * @param {function(Error, Message)} callback + */ + _buildMessage (message, callback) { + const msg = utils.normalizeOutRpcMessage(message) + if (this.peerId) { + signMessage(this.peerId, msg, callback) + } else { + nextTick(callback, null, msg) + } + } + /** * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. * For example, a Floodsub implementation might simply publish each message to each topic for every peer * @abstract * @param {Array|string} topics * @param {Array|any} messages + * @param {function(Error)} callback * @returns {undefined} * */ - publish (topics, messages) { + publish (topics, messages, callback) { throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') } diff --git a/src/message/index.js b/src/message/index.js index ed860a60ef..320ab4cf7d 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -3,8 +3,12 @@ const protons = require('protons') const rpcProto = protons(require('./rpc.proto.js')) +const RPC = rpcProto.RPC const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) exports = module.exports exports.rpc = rpcProto exports.td = topicDescriptorProto +exports.RPC = RPC +exports.Message = RPC.Message +exports.SubOpts = RPC.SubOpts diff --git a/src/message/rpc.proto.js b/src/message/rpc.proto.js index 6f917bfcef..88b1f83427 100644 --- a/src/message/rpc.proto.js +++ b/src/message/rpc.proto.js @@ -13,6 +13,8 @@ message RPC { optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - repeated string topicIDs = 4; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; } }` diff --git a/src/message/sign.js b/src/message/sign.js new file mode 100644 index 0000000000..a275214358 --- /dev/null +++ b/src/message/sign.js @@ -0,0 +1,33 @@ +'use strict' + +const { Message } = require('./index') +const SignPrefix = Buffer.from('libp2p-pubsub:') + +module.exports.SignPrefix = SignPrefix + +/** + * Signs the provided message with the given `peerId` + * + * @param {PeerId} peerId + * @param {Message} message + * @param {function(Error, Message)} callback + * @returns {void} + */ +module.exports.signMessage = function (peerId, message, callback) { + // Get the message in bytes, and prepend with the pubsub prefix + const bytes = Buffer.concat([ + SignPrefix, + Message.encode(message) + ]) + + // Sign the bytes with the private key + peerId.privKey.sign(bytes, (err, signature) => { + if (err) return callback(err) + + callback(null, { + ...message, + signature: signature, + key: peerId.pubKey.bytes + }) + }) +} diff --git a/src/peer.js b/src/peer.js index 45c4573222..7179b4e830 100644 --- a/src/peer.js +++ b/src/peer.js @@ -6,7 +6,7 @@ const pull = require('pull-stream') const setImmediate = require('async/setImmediate') const EventEmitter = require('events') -const rpc = require('./message').rpc.RPC +const { RPC } = require('./message') /** * The known state of a connected peer. @@ -109,7 +109,7 @@ class Peer extends EventEmitter { }) }) - this.write(rpc.encode({ + this.write(RPC.encode({ subscriptions: subs })) } @@ -139,7 +139,7 @@ class Peer extends EventEmitter { * @returns {undefined} */ sendMessages (msgs) { - this.write(rpc.encode({ + this.write(RPC.encode({ msgs: msgs })) } diff --git a/src/utils.js b/src/utils.js index 53c515216e..547aaf550d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -81,15 +81,17 @@ exports.normalizeInRpcMessages = (messages) => { }) } +exports.normalizeOutRpcMessage = (message) => { + const m = Object.assign({}, message) + if (typeof message.from === 'string' || message.from instanceof String) { + m.from = bs58.decode(message.from) + } + return m +} + exports.normalizeOutRpcMessages = (messages) => { if (!messages) { return messages } - return messages.map((msg) => { - const m = Object.assign({}, msg) - if (typeof msg.from === 'string' || msg.from instanceof String) { - m.from = bs58.decode(msg.from) - } - return m - }) + return messages.map(exports.normalizeOutRpcMessage) } diff --git a/test/pubsub.js b/test/pubsub.js index 5691d85607..2c824a7611 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -8,7 +8,10 @@ const expect = chai.expect const series = require('async/series') const parallel = require('async/parallel') +const { Message } = require('../src/message') +const { SignPrefix } = require('../src/message/sign') const PubsubBaseProtocol = require('../src') +const { randomSeqno, normalizeOutRpcMessage } = require('../src/utils') const utils = require('./utils') const createNode = utils.createNode @@ -55,14 +58,7 @@ describe('pubsub base protocol', () => { }) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) - - it('mount the pubsub protocol', (done) => { + before('mount the pubsub protocol', (done) => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) @@ -73,13 +69,20 @@ describe('pubsub base protocol', () => { }, 50) }) - it('start both Pubsub', (done) => { + before('start both Pubsub', (done) => { parallel([ (cb) => psA.start(cb), (cb) => psB.start(cb) ], done) }) + after((done) => { + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], done) + }) + it('Dial from nodeA to nodeB', (done) => { series([ (cb) => nodeA.dial(nodeB.peerInfo, cb), @@ -90,6 +93,29 @@ describe('pubsub base protocol', () => { }, 1000) ], done) }) + + it('_buildMessage normalizes and signs messages', (done) => { + const message = { + from: 'QmABC', + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + psA._buildMessage(message, (err, signedMessage) => { + expect(err).to.not.exist() + + const bytesToSign = Buffer.concat([ + SignPrefix, + Message.encode(normalizeOutRpcMessage(message)) + ]) + + psA.peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => { + expect(verified).to.eql(true) + done(err) + }) + }) + }) }) describe('dial the pubsub protocol on mount', () => { diff --git a/test/sign.spec.js b/test/sign.spec.js new file mode 100644 index 0000000000..e7bbd34e51 --- /dev/null +++ b/test/sign.spec.js @@ -0,0 +1,53 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const { Message } = require('../src/message') +const { signMessage, SignPrefix } = require('../src/message/sign') +const PeerId = require('peer-id') +const { randomSeqno } = require('../src/utils') + +describe('message signing', () => { + let peerId + before((done) => { + peerId = PeerId.create({ + bits: 1024 + }, (err, id) => { + peerId = id + done(err) + }) + }) + + it('should be able to sign a message', (done) => { + const message = { + from: 'QmABC', + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + + peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { + if (err) return done(err) + + signMessage(peerId, message, (err, signedMessage) => { + if (err) return done(err) + + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) + + // Verify the signature + peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => { + expect(verified).to.eql(true) + done(err) + }) + }) + }) + }) +})