From 5aac9221cd2730aec6af519392decac00f565b14 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Wed, 25 Mar 2020 13:56:24 +0100 Subject: [PATCH 1/9] add createGroupKeyReset --- src/MessageCreationUtil.js | 17 ++++++++++ test/unit/MessageCreationUtil.test.js | 48 +++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/src/MessageCreationUtil.js b/src/MessageCreationUtil.js index d321ab9c2..4808bc1c8 100644 --- a/src/MessageCreationUtil.js +++ b/src/MessageCreationUtil.js @@ -178,6 +178,23 @@ export default class MessageCreationUtil { return streamMessage } + async createGroupKeyReset(subscriberAddress, streamId, encryptedGroupKey) { + if (!this._signer) { + throw new Error('Cannot create unsigned group key reset. Must authenticate with "privateKey" or "provider"') + } + const publisherId = await this.getPublisherId() + const data = { + streamId, + groupKey: encryptedGroupKey.groupKey, + start: encryptedGroupKey.start, + } + const idAndPrevRef = this.createDefaultMsgIdAndPrevRef(subscriberAddress.toLowerCase(), publisherId) + const streamMessage = StreamMessage.create(idAndPrevRef[0], idAndPrevRef[1], StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE, + StreamMessage.ENCRYPTION_TYPES.RSA, data, StreamMessage.SIGNATURE_TYPES.NONE, null) + await this._signer.signStreamMessage(streamMessage) + return streamMessage + } + async createErrorMessage(destinationAddress, error) { if (!this._signer) { throw new Error('Cannot create unsigned error message. Must authenticate with "privateKey" or "provider"') diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index ce2bed2ef..d72106cf8 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -402,6 +402,54 @@ describe('MessageCreationUtil', () => { }) }) + describe('createGroupKeyReset', () => { + const stream = new Stream(null, { + id: 'streamId', + partitions: 1, + }) + const auth = { + username: 'username', + } + it('should not be able to create unsigned group key reset', (done) => { + const util = new MessageCreationUtil(auth, null, () => Promise.resolve({ + username: 'username', + }), sinon.stub().resolves(stream)) + util.createGroupKeyReset('subscriberId', 'streamId', { + groupKey: 'group-key', + start: 34524, + }).catch((err) => { + assert.strictEqual(err.message, 'Cannot create unsigned group key reset. Must authenticate with "privateKey" or "provider"') + done() + }) + }) + it('creates correct group key reset', async () => { + const signer = { + signStreamMessage: (streamMessage) => { + /* eslint-disable no-param-reassign */ + streamMessage.signatureType = StreamMessage.SIGNATURE_TYPES.ETH + streamMessage.signature = 'signature' + /* eslint-enable no-param-reassign */ + return Promise.resolve() + }, + } + const util = new MessageCreationUtil(auth, signer, () => Promise.resolve({ + username: 'username', + }), sinon.stub().resolves(stream)) + const streamMessage = await util.createGroupKeyReset('subscriberId', 'streamId', { + groupKey: 'encrypted-group-key', + start: 34524, + }) + assert.strictEqual(streamMessage.getStreamId(), 'subscriberId'.toLowerCase()) // sending to subscriber's inbox stream + const content = streamMessage.getParsedContent() + assert.strictEqual(streamMessage.contentType, StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE) + assert.strictEqual(streamMessage.encryptionType, StreamMessage.ENCRYPTION_TYPES.RSA) + assert.strictEqual(content.streamId, 'streamId') + assert.deepStrictEqual(content.groupKey, 'encrypted-group-key') + assert.deepStrictEqual(content.start, 34524) + assert(streamMessage.signature) + }) + }) + describe('createErrorMessage', () => { const stream = new Stream(null, { id: 'streamId', From eda57a8019d5a26ce4f520d40f36399cebd3d741 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Wed, 25 Mar 2020 15:11:40 +0100 Subject: [PATCH 2/9] add nbSubToRevoke --- src/KeyExchangeUtil.js | 14 ++++++++++++++ test/unit/KeyExchangeUtil.test.js | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/KeyExchangeUtil.js b/src/KeyExchangeUtil.js index 47d6ee360..c5e72df84 100644 --- a/src/KeyExchangeUtil.js +++ b/src/KeyExchangeUtil.js @@ -11,6 +11,7 @@ export default class KeyExchangeUtil { constructor(client) { this._client = client this.isSubscriberPromises = {} + this.localSubscribers = {} } async handleGroupKeyRequest(streamMessage) { @@ -107,6 +108,19 @@ export default class KeyExchangeUtil { return this.subscribersPromise } + async nbSubscribersToRevoke(streamId) { + const realSubscribersSet = await this._client.getStreamSubscribers(streamId) + let counter = 0 + const localSubscribersSet = this.localSubscribers[streamId] ? this.localSubscribers[streamId] : [] + localSubscribersSet.forEach((subscriber) => { + if (!realSubscribersSet.includes(subscriber)) { + counter += 1 + } + }) + this.localSubscribers[streamId] = realSubscribersSet + return counter + } + async isSubscriber(streamId, subscriberId) { if (!this.isSubscriberPromises[streamId]) { this.isSubscriberPromises[streamId] = {} diff --git a/test/unit/KeyExchangeUtil.test.js b/test/unit/KeyExchangeUtil.test.js index 206d5294a..ba41f7591 100644 --- a/test/unit/KeyExchangeUtil.test.js +++ b/test/unit/KeyExchangeUtil.test.js @@ -98,6 +98,27 @@ describe('KeyExchangeUtil', () => { assert(client.isStreamSubscriber.calledTwice) }) }) + describe('nbSubscribersToRevoke', () => { + it('correctly returns the number of subscribers to revoke at each call', async () => { + client.getStreamSubscribers.withArgs('streamId1').onCall(0).resolves(['subscriberId1', 'subscriberId2']) + client.getStreamSubscribers.withArgs('streamId1').onCall(1).resolves(['subscriberId1', 'subscriberId3']) + client.getStreamSubscribers.withArgs('streamId1').onCall(2).resolves(['subscriberId1', 'subscriberId3', 'subscriber8']) + client.getStreamSubscribers.withArgs('streamId1').onCall(3).resolves(['subscriberId4', 'subscriberId3', 'subscriberId2']) + client.getStreamSubscribers.withArgs('streamId2').onCall(0).resolves(['subscriberId1', 'subscriberId2']) + client.getStreamSubscribers.withArgs('streamId2').onCall(1).resolves(['subscriberId1', 'subscriberId2']) + client.getStreamSubscribers.withArgs('streamId2').onCall(2).resolves(['subscriberId5', 'subscriberId3', 'subscriberId8']) + client.getStreamSubscribers.withArgs('streamId2').onCall(3).resolves(['subscriberId9', 'subscriberId10', 'subscriberId11']) + + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 1) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 2) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 2) + assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 3) + }) + }) describe('handleGroupKeyRequest', () => { it('should reject unsigned request', (done) => { const streamMessage = StreamMessage.create( From e70402367d44136ac1d2ac58dbb2725bec9c6094 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Wed, 25 Mar 2020 17:11:50 +0100 Subject: [PATCH 3/9] add auto revoke --- src/KeyExchangeUtil.js | 43 ++++++++++ test/unit/KeyExchangeUtil.test.js | 133 ++++++++++++++++++++++++++++-- 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/src/KeyExchangeUtil.js b/src/KeyExchangeUtil.js index c5e72df84..3f3d72661 100644 --- a/src/KeyExchangeUtil.js +++ b/src/KeyExchangeUtil.js @@ -1,3 +1,5 @@ +import crypto from 'crypto' + import debugFactory from 'debug' import EncryptionUtil from './EncryptionUtil' @@ -7,11 +9,15 @@ import InvalidGroupKeyError from './errors/InvalidGroupKeyError' const debug = debugFactory('KeyExchangeUtil') const SUBSCRIBERS_EXPIRATION_TIME = 5 * 60 * 1000 // 5 minutes +const REVOCATION_DELAY = 10 * 60 * 1000 // 10 minutes +const REVOCATION_THRESHOLD = 5 export default class KeyExchangeUtil { constructor(client) { this._client = client this.isSubscriberPromises = {} this.localSubscribers = {} + this.lastCallToCheckRevocation = Number.NEGATIVE_INFINITY // Date in millis + this.publicKeysStore = {} } async handleGroupKeyRequest(streamMessage) { @@ -51,6 +57,7 @@ export default class KeyExchangeUtil { start: keyObj.start, }) }) + this.publicKeysStore[subscriberId] = parsedContent.publicKey const response = await this._client.msgCreationUtil.createGroupKeyResponse(subscriberId, parsedContent.streamId, encryptedGroupKeys) return this._client.publishStreamMessage(response) } @@ -121,6 +128,40 @@ export default class KeyExchangeUtil { return counter } + async keyRevocationNeeded(streamId) { + const now = Date.now() + let res = false + if (this.lastCallToCheckRevocation + REVOCATION_DELAY < now) { + res = (await this.nbSubscribersToRevoke(streamId)) >= KeyExchangeUtil.REVOCATION_THRESHOLD + } + this.lastCallToCheckRevocation = now + return res + } + + async rekey(streamId) { + const groupKey = crypto.randomBytes(32) + const start = Date.now() + const realSubscribersSet = this.localSubscribers[streamId] ? this.localSubscribers[streamId] : [] + + const toRevoke = [] + const promises = [] + Object.keys(this.publicKeysStore).forEach(async (subscriberId) => { // iterating over local cache of Ethereum address --> RSA public key + if (realSubscribersSet.includes(subscriberId)) { // if still valid subscriber, send the new key + const encryptedGroupKey = { + groupKey: EncryptionUtil.encryptWithPublicKey(groupKey, this.publicKeysStore[subscriberId], true), + start, + } + const reset = await this._client.msgCreationUtil.createGroupKeyReset(subscriberId, streamId, encryptedGroupKey) + promises.push(this._client.publishStreamMessage(reset)) + } else { // no longer a valid subscriber, to be removed from local cache + toRevoke.push(subscriberId) + } + }) + toRevoke.forEach((revokedSubscriberId) => delete this.publicKeysStore[revokedSubscriberId]) + await Promise.all(promises) + this._client.keyStorageUtil.addKey(streamId, groupKey, start) + } + async isSubscriber(streamId, subscriberId) { if (!this.isSubscriberPromises[streamId]) { this.isSubscriberPromises[streamId] = {} @@ -143,3 +184,5 @@ export default class KeyExchangeUtil { } } KeyExchangeUtil.SUBSCRIBERS_EXPIRATION_TIME = SUBSCRIBERS_EXPIRATION_TIME +KeyExchangeUtil.REVOCATION_DELAY = REVOCATION_DELAY +KeyExchangeUtil.REVOCATION_THRESHOLD = REVOCATION_THRESHOLD diff --git a/test/unit/KeyExchangeUtil.test.js b/test/unit/KeyExchangeUtil.test.js index ba41f7591..0e3b6e58e 100644 --- a/test/unit/KeyExchangeUtil.test.js +++ b/test/unit/KeyExchangeUtil.test.js @@ -109,14 +109,131 @@ describe('KeyExchangeUtil', () => { client.getStreamSubscribers.withArgs('streamId2').onCall(2).resolves(['subscriberId5', 'subscriberId3', 'subscriberId8']) client.getStreamSubscribers.withArgs('streamId2').onCall(3).resolves(['subscriberId9', 'subscriberId10', 'subscriberId11']) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 1) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 2) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId1'), 2) - assert.deepStrictEqual(await util.nbSubscribersToRevoke('streamId2'), 3) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId1'), 1) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId2'), 0) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId1'), 0) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId2'), 2) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId1'), 2) + assert.strictEqual(await util.nbSubscribersToRevoke('streamId2'), 3) + }) + }) + describe('keyRevocationNeeded', () => { + it('should not revoke if checked recently', async () => { + let res = await util.keyRevocationNeeded('streamId') + assert(client.getStreamSubscribers.calledOnce) + assert(!res) + res = await util.keyRevocationNeeded('streamId') + assert(client.getStreamSubscribers.calledOnce) + assert(!res) + }) + it('should not revoke if enough time elapsed but less than threshold', async () => { + const clock = sinon.useFakeTimers() + const initialSubscribers = [] + for (let i = 0; i < KeyExchangeUtil.REVOCATION_THRESHOLD - 1; i++) { + initialSubscribers.push(`subscriberId${i}`) + } + client.getStreamSubscribers.withArgs('streamId3').onCall(0).resolves(initialSubscribers) + client.getStreamSubscribers.withArgs('streamId3').onCall(1).resolves([]) // all subscribers need to be revoked + let res = await util.keyRevocationNeeded('streamId3') + assert(client.getStreamSubscribers.calledOnce) + assert(!res) + clock.tick(KeyExchangeUtil.REVOCATION_DELAY + 1000) + res = await util.keyRevocationNeeded('streamId3') + assert(client.getStreamSubscribers.calledTwice) + assert(!res) + clock.restore() + }) + it('should revoke if threshold reached', async () => { + const clock = sinon.useFakeTimers() + const initialSubscribers = [] + for (let i = 0; i < KeyExchangeUtil.REVOCATION_THRESHOLD; i++) { + initialSubscribers.push(`subscriberId${i}`) + } + client.getStreamSubscribers.withArgs('streamId3').onCall(0).resolves(initialSubscribers) + client.getStreamSubscribers.withArgs('streamId3').onCall(1).resolves([]) // all subscribers need to be revoked + let res = await util.keyRevocationNeeded('streamId3') + assert(client.getStreamSubscribers.calledOnce) + assert(!res) + clock.tick(KeyExchangeUtil.REVOCATION_DELAY + 1000) + res = await util.keyRevocationNeeded('streamId3') + assert(client.getStreamSubscribers.calledTwice) + assert(res) + clock.restore() + }) + }) + describe('revoke', () => { + it('should rekey by sending group key resets', async () => { + client.isStreamSubscriber.withArgs('streamId4', 'subscriber1').resolves(true) + client.isStreamSubscriber.withArgs('streamId4', 'subscriber2').resolves(true) + client.isStreamSubscriber.withArgs('streamId4', 'subscriber3').resolves(true) + client.getStreamSubscribers.withArgs('streamId4').resolves([]) + client.keyStorageUtil.addKey('streamId4', crypto.randomBytes(32), 5) + util.localSubscribers.streamId4 = ['subscriber1', 'subscriber3'] // fake call to 'keyRevocationNeeded', subscriber2 must be revoked + + const subscriberKeyPair1 = new EncryptionUtil() + const request1 = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'subscriber1', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, { + streamId: 'streamId4', + publicKey: subscriberKeyPair1.getPublicKey(), + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + const subscriberKeyPair2 = new EncryptionUtil() + const request2 = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'subscriber2', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, { + streamId: 'streamId4', + publicKey: subscriberKeyPair2.getPublicKey(), + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + const subscriberKeyPair3 = new EncryptionUtil() + const request3 = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'subscriber3', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, { + streamId: 'streamId4', + publicKey: subscriberKeyPair3.getPublicKey(), + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + + let resetKeySent1 = null + let resetKeySent3 = null + + client.msgCreationUtil = { + createGroupKeyResponse: sinon.stub().resolves({}), + createGroupKeyReset: (subscriberId, streamId, key) => { + assert.strictEqual(streamId, 'streamId4') + if (subscriberId === 'subscriber1') { + resetKeySent1 = { + groupKey: subscriberKeyPair1.decryptWithPrivateKey(key.groupKey, true), + start: key.start + } + return Promise.resolve('fake reset 1') + } + assert.strictEqual(subscriberId, 'subscriber3') + resetKeySent3 = { + groupKey: subscriberKeyPair3.decryptWithPrivateKey(key.groupKey, true), + start: key.start + } + return Promise.resolve('fake reset 3') + }, + } + + const published = [] + client.publishStreamMessage = (msg) => { + published.push(msg) + return Promise.resolve() + } + + await util.handleGroupKeyRequest(request1) + await util.handleGroupKeyRequest(request2) + await util.handleGroupKeyRequest(request3) + await util.rekey('streamId4') + assert.deepStrictEqual(resetKeySent1, resetKeySent3) + assert.deepStrictEqual(resetKeySent1, client.keyStorageUtil.getLatestKey('streamId4')) + assert((published[3] === 'fake reset 1' && published[4] === 'fake reset 3') + || (published[3] === 'fake reset 3' && published[4] === 'fake reset 1')) }) }) describe('handleGroupKeyRequest', () => { From 277edadd60f8223ce9cad38b340d100111ff4f44 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Wed, 25 Mar 2020 17:24:05 +0100 Subject: [PATCH 4/9] add handleGroupKeyReset --- src/KeyExchangeUtil.js | 37 ++++++++++++ src/errors/InvalidGroupKeyResetError.js | 8 +++ test/unit/KeyExchangeUtil.test.js | 76 +++++++++++++++++++++++++ 3 files changed, 121 insertions(+) create mode 100644 src/errors/InvalidGroupKeyResetError.js diff --git a/src/KeyExchangeUtil.js b/src/KeyExchangeUtil.js index 3f3d72661..2e2fdf675 100644 --- a/src/KeyExchangeUtil.js +++ b/src/KeyExchangeUtil.js @@ -6,6 +6,7 @@ import EncryptionUtil from './EncryptionUtil' import InvalidGroupKeyRequestError from './errors/InvalidGroupKeyRequestError' import InvalidGroupKeyResponseError from './errors/InvalidGroupKeyResponseError' import InvalidGroupKeyError from './errors/InvalidGroupKeyError' +import InvalidGroupKeyResetError from './errors/InvalidGroupKeyResetError' const debug = debugFactory('KeyExchangeUtil') const SUBSCRIBERS_EXPIRATION_TIME = 5 * 60 * 1000 // 5 minutes @@ -101,6 +102,42 @@ export default class KeyExchangeUtil { debug('INFO: Updated group key for stream "%s" and publisher "%s"', parsedContent.streamId, streamMessage.getPublisherId()) } + handleGroupKeyReset(streamMessage) { + // if it was signed, the StreamrClient already checked the signature. If not, StreamrClient accepted it since the stream + // does not require signed data for all types of messages. + if (!streamMessage.signature) { + throw new InvalidGroupKeyResetError('Received unsigned group key reset (it must be signed to avoid MitM attacks).') + } + // No need to check if parsedContent contains the necessary fields because it was already checked during deserialization + const parsedContent = streamMessage.getParsedContent() + // TODO: fix this hack in other PR + if (!this._client.subscribedStreamPartitions[parsedContent.streamId + '0']) { + throw new InvalidGroupKeyResetError('Received group key reset for a stream to which the client is not subscribed.') + } + + if (!this._client.encryptionUtil) { + throw new InvalidGroupKeyResetError('Cannot decrypt group key reset without the private key.') + } + const groupKey = this._client.encryptionUtil.decryptWithPrivateKey(parsedContent.groupKey, true) + try { + EncryptionUtil.validateGroupKey(groupKey) + } catch (err) { + if (err instanceof InvalidGroupKeyError) { + throw new InvalidGroupKeyResetError(err.message) + } else { + throw err + } + } + const decryptedGroupKey = { + groupKey, + start: parsedContent.start + } + /* eslint-disable no-underscore-dangle */ + this._client._setGroupKeys(parsedContent.streamId, streamMessage.getPublisherId(), [decryptedGroupKey]) + /* eslint-enable no-underscore-dangle */ + debug('INFO: Updated group key for stream "%s" and publisher "%s"', parsedContent.streamId, streamMessage.getPublisherId()) + } + async getSubscribers(streamId) { if (!this.subscribersPromise || (Date.now() - this.lastAccess) > SUBSCRIBERS_EXPIRATION_TIME) { this.subscribersPromise = this._client.getStreamSubscribers(streamId).then((subscribers) => { diff --git a/src/errors/InvalidGroupKeyResetError.js b/src/errors/InvalidGroupKeyResetError.js new file mode 100644 index 000000000..91a462c52 --- /dev/null +++ b/src/errors/InvalidGroupKeyResetError.js @@ -0,0 +1,8 @@ +export default class InvalidGroupKeyResetError extends Error { + constructor(message) { + super(message) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, this.constructor) + } + } +} diff --git a/test/unit/KeyExchangeUtil.test.js b/test/unit/KeyExchangeUtil.test.js index 0e3b6e58e..344a4e476 100644 --- a/test/unit/KeyExchangeUtil.test.js +++ b/test/unit/KeyExchangeUtil.test.js @@ -9,6 +9,7 @@ import EncryptionUtil from '../../src/EncryptionUtil' import KeyStorageUtil from '../../src/KeyStorageUtil' import InvalidGroupKeyResponseError from '../../src/errors/InvalidGroupKeyResponseError' import InvalidGroupKeyRequestError from '../../src/errors/InvalidGroupKeyRequestError' +import InvalidGroupKeyResetError from '../../src/errors/InvalidGroupKeyResetError' const { StreamMessage } = MessageLayer const subscribers = ['0xb8CE9ab6943e0eCED004cDe8e3bBed6568B2Fa01'.toLowerCase(), 'subscriber2', 'subscriber3'] @@ -458,4 +459,79 @@ describe('KeyExchangeUtil', () => { return util.handleGroupKeyResponse(streamMessage) }) }) + describe('handleGroupKeyReset', () => { + it('should reject unsigned reset', () => { + const streamMessage = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'publisherId', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE, StreamMessage.ENCRYPTION_TYPES.RSA, { + streamId: 'streamId', + groupKey: 'encrypted-group-key', + start: 54256, + }, StreamMessage.SIGNATURE_TYPES.NONE, null, + ) + try { + util.handleGroupKeyReset(streamMessage) + } catch (err) { + assert(err instanceof InvalidGroupKeyResetError) + assert.strictEqual(err.message, 'Received unsigned group key reset (it must be signed to avoid MitM attacks).') + } + }) + it('should reject reset for a stream to which the client is not subscribed', () => { + const streamMessage = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'publisherId', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE, StreamMessage.ENCRYPTION_TYPES.RSA, { + streamId: 'wrong-streamId', + groupKey: 'encrypted-group-key', + start: 54256, + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + try { + util.handleGroupKeyReset(streamMessage) + } catch (err) { + assert(err instanceof InvalidGroupKeyResetError) + assert.strictEqual(err.message, 'Received group key reset for a stream to which the client is not subscribed.') + } + }) + it('should reject reset with invalid group key', () => { + const encryptedGroupKey = EncryptionUtil.encryptWithPublicKey(crypto.randomBytes(16), client.encryptionUtil.getPublicKey(), true) + const streamMessage = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'publisherId', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE, StreamMessage.ENCRYPTION_TYPES.RSA, { + streamId: 'streamId', + groupKey: encryptedGroupKey, + start: 54256, + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + try { + util.handleGroupKeyReset(streamMessage) + } catch (err) { + assert(err instanceof InvalidGroupKeyResetError) + assert.strictEqual(err.message, 'Group key must have a size of 256 bits, not 128') + } + }) + it('should update client options and subscriptions after reset with received group key', (done) => { + const groupKey = crypto.randomBytes(32) + const encryptedGroupKey = EncryptionUtil.encryptWithPublicKey(groupKey, client.encryptionUtil.getPublicKey(), true) + const streamMessage = StreamMessage.create( + ['clientInboxAddress', 0, Date.now(), 0, 'publisherId', ''], null, + StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE, StreamMessage.ENCRYPTION_TYPES.RSA, { + streamId: 'streamId', + groupKey: encryptedGroupKey, + start: 54256, + }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', + ) + /* eslint-disable no-underscore-dangle */ + client._setGroupKeys = (streamId, publisherId, keys) => { + assert.strictEqual(streamId, 'streamId') + assert.strictEqual(publisherId, 'publisherId') + assert.deepStrictEqual(keys, [{ + groupKey, + start: 54256 + }]) + done() + } + /* eslint-enable no-underscore-dangle */ + return util.handleGroupKeyReset(streamMessage) + }) + }) }) From 3bc001f4626624dabf606e16a8f3b39d66c45e0a Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Wed, 25 Mar 2020 18:33:26 +0100 Subject: [PATCH 5/9] integrate with StreamrClient --- src/KeyExchangeUtil.js | 4 +-- src/StreamrClient.js | 32 +++++++++++++++++- test/integration/StreamrClient.test.js | 46 ++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/src/KeyExchangeUtil.js b/src/KeyExchangeUtil.js index 2e2fdf675..51bca881d 100644 --- a/src/KeyExchangeUtil.js +++ b/src/KeyExchangeUtil.js @@ -175,10 +175,10 @@ export default class KeyExchangeUtil { return res } - async rekey(streamId) { + async rekey(streamId, getSubscribersLocally = true) { const groupKey = crypto.randomBytes(32) const start = Date.now() - const realSubscribersSet = this.localSubscribers[streamId] ? this.localSubscribers[streamId] : [] + const realSubscribersSet = getSubscribersLocally ? this.localSubscribers[streamId] : (await this._client.getStreamSubscribers(streamId)) const toRevoke = [] const promises = [] diff --git a/src/StreamrClient.js b/src/StreamrClient.js index 05b9e2f0c..3c93b7eb7 100644 --- a/src/StreamrClient.js +++ b/src/StreamrClient.js @@ -43,6 +43,7 @@ import ResendUtil from './ResendUtil' import InvalidGroupKeyResponseError from './errors/InvalidGroupKeyResponseError' import InvalidContentTypeError from './errors/InvalidContentTypeError' import InvalidGroupKeyRequestError from './errors/InvalidGroupKeyRequestError' +import InvalidGroupKeyResetError from './errors/InvalidGroupKeyResetError' export default class StreamrClient extends EventEmitter { constructor(options, connection) { @@ -69,6 +70,7 @@ export default class StreamrClient extends EventEmitter { publisherGroupKeys: {}, // {streamId: groupKey} subscriberGroupKeys: {}, // {streamId: {publisherId: groupKey}} keyExchange: {}, + autoRevoke: true, streamrNodeAddress: '0xf3E5A65851C3779f468c9EcB32E6f25D9D68601a', streamrOperatorAddress: '0xc0aa4dC0763550161a6B59fa430361b5a26df28C', tokenAddress: '0x0Cf0Ee63788A0849fE5297F3407f701E122cC023', @@ -320,7 +322,22 @@ export default class StreamrClient extends EventEmitter { this.keyExchangeUtil.handleGroupKeyResponse(streamMessage) } else { throw new InvalidGroupKeyResponseError( - `Received group key from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}` + `Received group key response from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}` + ) + } + } + } else if (streamMessage.contentType === StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE) { + if (this.keyExchangeUtil) { + const { streamId } = streamMessage.getParsedContent() + // A valid publisher of the client's inbox stream could send key resets for other streams to which + // the publisher doesn't have write permissions. Thus the following additional check is necessary. + // TODO: fix this hack in other PR (and move logic to keyExchangeUtil) + const valid = await this.subscribedStreamPartitions[streamId + '0'].isValidPublisher(streamMessage.getPublisherId()) + if (valid) { + this.keyExchangeUtil.handleGroupKeyReset(streamMessage) + } else { + throw new InvalidGroupKeyResetError( + `Received group key reset from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}` ) } } @@ -426,6 +443,12 @@ export default class StreamrClient extends EventEmitter { if (this.isConnected()) { // If connected, emit a publish request + if (this.options.autoRevoke && this.keyExchangeUtil) { + const res = await this.keyExchangeUtil.keyRevocationNeeded(streamId) + if (res) { + await this.keyExchangeUtil.rekey(streamId) + } + } return this._requestPublish(streamMessage, sessionToken) } @@ -464,6 +487,13 @@ export default class StreamrClient extends EventEmitter { ) } + async rekey(streamId) { + if (!this.keyExchangeUtil) { + throw new Error('options.keyExchange must be defined in the StreamrClient constructor to use this function.') + } + return this.keyExchangeUtil.rekey(streamId, false) + } + async resend(optionsOrStreamId, callback) { const options = this._validateParameters(optionsOrStreamId, callback) diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index f3f86b482..70a118f6d 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -1006,6 +1006,52 @@ describe('StreamrClient', () => { }) }, 2 * TIMEOUT) + it('client.subscribe can get the new group key after a group key reset', async (done) => { + client.once('error', done) + const groupKey = crypto.randomBytes(32) + // subscribe without knowing the group key to decrypt stream messages + let receivedFirst = false + const sub = client.subscribe({ + stream: stream.id, + }, (parsedContent, streamMessage) => { + // Check signature stuff + assert.strictEqual(streamMessage.signatureType, StreamMessage.SIGNATURE_TYPES.ETH) + assert(streamMessage.getPublisherId()) + assert(streamMessage.signature) + + if (!receivedFirst) { + assert.strictEqual(parsedContent.data, 'msg1') + // Now the subscriber knows the initial group key + assert.deepStrictEqual(sub.groupKeys[streamMessage.getPublisherId().toLowerCase()], groupKey) + receivedFirst = true + } else { + assert.strictEqual(parsedContent.data, 'msg2') + // Now the subscriber knows another group key after reset + assert.notDeepStrictEqual(sub.groupKeys[streamMessage.getPublisherId().toLowerCase()], groupKey) + + // All good, unsubscribe + client.unsubscribe(sub) + sub.on('unsubscribed', () => { + done() + }) + } + }) + + // Publish after subscribed + sub.on('subscribed', async () => { + await client.publish(stream.id, { + data: 'msg1', + }, Date.now(), null, groupKey) + setTimeout(async () => { + await client.rekey(stream.id) + // publishing second message with the newly generated key + await client.publish(stream.id, { + data: 'msg2' + }) + }, 2000) + }) + }, 2 * TIMEOUT) + it('client.subscribe with resend last can get the historical keys for previous encrypted messages', (done) => { client.once('error', done) // Publish encrypted messages with different keys From 51c1e931fc1d0ba07a64d2edc77edd3579bc2852 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Thu, 26 Mar 2020 16:28:19 +0100 Subject: [PATCH 6/9] update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 460c31ca1..3a4352681 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ publisherGroupKeys | {} | Object defining the group key as a hex string used to publisherStoreKeyHistory | true | If `true`, the client will locally store every key used to encrypt messages at some point. If set to `false`, the client will not be able to answer subscribers asking for historical keys during resend requests. subscriberGroupKeys | {} | Object defining, for each stream id, an object containing the group key used to decrypt for each publisher id. Not needed if `keyExchange` is defined. keyExchange | {} | Defines RSA key pair to use for group key exchange. Can define `publicKey` and `privateKey` fields as strings in the PEM format, or stay empty to generate a key pair automatically. Can be set to `null` if no key exchange is required. +autoRevoke | true | If set to true, periodically rekeys streams when publishing if the number of subscribers to revoke reaches a threshold of 5. Can also manually trigger a rekey with `client.rekey(streamId)`. ### Authentication options From 73d825f3511cd2c7789b1c7f6f8abe358e1b9493 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Fri, 27 Mar 2020 13:57:24 +0100 Subject: [PATCH 7/9] fix unit test --- test/unit/StubbedStreamrClient.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/unit/StubbedStreamrClient.js b/test/unit/StubbedStreamrClient.js index 3f2525c7e..08edbabda 100644 --- a/test/unit/StubbedStreamrClient.js +++ b/test/unit/StubbedStreamrClient.js @@ -13,6 +13,8 @@ export default class StubbedStreamrClient extends StreamrClient { id: 'streamId', partitions: 1, })) + + getStreamSubscribers = sinon.stub().resolves([]) } // publisherId is the hash of 'username' StubbedStreamrClient.hashedUsername = '0x16F78A7D6317F102BBD95FC9A4F3FF2E3249287690B8BDAD6B7810F82B34ACE3'.toLowerCase() From 9ba8feaca6c63183f11fbb6c03221db73068dcc9 Mon Sep 17 00:00:00 2001 From: mthambipillai Date: Mon, 30 Mar 2020 17:13:51 +0200 Subject: [PATCH 8/9] fix integration test --- test/integration/StreamrClient.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index 70a118f6d..88e0f075f 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -1052,15 +1052,15 @@ describe('StreamrClient', () => { }) }, 2 * TIMEOUT) - it('client.subscribe with resend last can get the historical keys for previous encrypted messages', (done) => { + it('client.subscribe with resend last can get the historical keys for previous encrypted messages', async (done) => { client.once('error', done) // Publish encrypted messages with different keys const groupKey1 = crypto.randomBytes(32) const groupKey2 = crypto.randomBytes(32) - client.publish(stream.id, { + await client.publish(stream.id, { test: 'resent msg 1', }, Date.now(), null, groupKey1) - client.publish(stream.id, { + await client.publish(stream.id, { test: 'resent msg 2', }, Date.now(), null, groupKey2) From 9c431ccea09be16a0c3293731574d0ac4cb9b4dc Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Mon, 20 Apr 2020 10:51:46 -0400 Subject: [PATCH 9/9] Wait for encryptionutil to be ready in tests. --- test/unit/KeyExchangeUtil.test.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/unit/KeyExchangeUtil.test.js b/test/unit/KeyExchangeUtil.test.js index 344a4e476..13ca6dd55 100644 --- a/test/unit/KeyExchangeUtil.test.js +++ b/test/unit/KeyExchangeUtil.test.js @@ -174,6 +174,7 @@ describe('KeyExchangeUtil', () => { util.localSubscribers.streamId4 = ['subscriber1', 'subscriber3'] // fake call to 'keyRevocationNeeded', subscriber2 must be revoked const subscriberKeyPair1 = new EncryptionUtil() + await subscriberKeyPair1.onReady() const request1 = StreamMessage.create( ['clientInboxAddress', 0, Date.now(), 0, 'subscriber1', ''], null, StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, { @@ -182,6 +183,7 @@ describe('KeyExchangeUtil', () => { }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', ) const subscriberKeyPair2 = new EncryptionUtil() + await subscriberKeyPair2.onReady() const request2 = StreamMessage.create( ['clientInboxAddress', 0, Date.now(), 0, 'subscriber2', ''], null, StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, { @@ -190,6 +192,7 @@ describe('KeyExchangeUtil', () => { }, StreamMessage.SIGNATURE_TYPES.ETH, 'signature', ) const subscriberKeyPair3 = new EncryptionUtil() + await subscriberKeyPair3.onReady() const request3 = StreamMessage.create( ['clientInboxAddress', 0, Date.now(), 0, 'subscriber3', ''], null, StreamMessage.CONTENT_TYPES.GROUP_KEY_REQUEST, StreamMessage.ENCRYPTION_TYPES.NONE, {