From b74b7489143130c7663e6a524d7b6f379c4cad52 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 24 Feb 2021 15:54:09 -0500 Subject: [PATCH 01/14] feat(encryption): Support reading & writing newGroupKey. Delays actual groupKey change until message after next. --- src/publish/Encrypt.ts | 5 ++- src/stream/KeyExchange.js | 54 +++++++++++++++++------- src/subscribe/Decrypt.js | 1 + test/integration/Encryption.test.js | 64 +++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 16 deletions(-) diff --git a/src/publish/Encrypt.ts b/src/publish/Encrypt.ts index f3a752708..9b63784d2 100644 --- a/src/publish/Encrypt.ts +++ b/src/publish/Encrypt.ts @@ -39,8 +39,9 @@ export default function Encrypt(client: StreamrClient) { if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.MESSAGE) { return } - const groupKey = await getPublisherKeyExchange().useGroupKey(stream.id) - await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey) + + const [groupKey, nextGroupKey] = await getPublisherKeyExchange().useGroupKey(stream.id) + await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey, nextGroupKey) } return Object.assign(encrypt, { diff --git a/src/stream/KeyExchange.js b/src/stream/KeyExchange.js index 8aee4be89..6e499af3a 100644 --- a/src/stream/KeyExchange.js +++ b/src/stream/KeyExchange.js @@ -65,7 +65,7 @@ function GroupKeyStore({ groupKeys = new Map() }) { }) let currentGroupKeyId // current key id if any - let nextGroupKey // key to use next, disappears if not actually used. + const nextGroupKeys = [] // the keys to use next, disappears if not actually used. Max queue size 2 store.forEach((groupKey) => { GroupKey.validate(GroupKey.from(groupKey)) @@ -79,7 +79,8 @@ function GroupKeyStore({ groupKeys = new Map() }) { const existingKey = GroupKey.from(store.get(groupKey.id)) if (!existingKey.equals(groupKey)) { throw new GroupKey.InvalidGroupKeyError( - `Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.` + `Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.`, + groupKey ) } @@ -96,29 +97,47 @@ function GroupKeyStore({ groupKeys = new Map() }) { has(groupKeyId) { if (currentGroupKeyId === groupKeyId) { return true } - if (nextGroupKey && nextGroupKey.id === groupKeyId) { return true } + if (nextGroupKeys.some((nextKey) => nextKey.id === groupKeyId)) { return true } return store.has(groupKeyId) }, isEmpty() { - return !nextGroupKey && store.size === 0 + return nextGroupKeys.length === 0 && store.size === 0 }, useGroupKey() { - if (nextGroupKey) { - // next key becomes current key + const nextGroupKey = nextGroupKeys.pop() + // first message + if (!currentGroupKeyId && nextGroupKey) { storeKey(nextGroupKey) + currentGroupKeyId = nextGroupKey.id + return [ + this.get(currentGroupKeyId), + undefined, + ] + } + // key changed + if (currentGroupKeyId && nextGroupKey) { + storeKey(nextGroupKey) + const prevGroupKey = this.get(currentGroupKeyId) currentGroupKeyId = nextGroupKey.id - nextGroupKey = undefined + // use current key one more time + return [ + prevGroupKey, + nextGroupKey, + ] } + // generate & use key if none already set if (!currentGroupKeyId) { - // generate & use key if none already set this.rotateGroupKey() return this.useGroupKey() } - return this.get(currentGroupKeyId) + return [ + this.get(currentGroupKeyId), + nextGroupKey + ] }, get(groupKeyId) { const groupKey = store.get(groupKeyId) @@ -127,7 +146,7 @@ function GroupKeyStore({ groupKeys = new Map() }) { }, clear() { currentGroupKeyId = undefined - nextGroupKey = undefined + nextGroupKeys.length = 0 return store.clear() }, rotateGroupKey() { @@ -138,7 +157,8 @@ function GroupKeyStore({ groupKeys = new Map() }) { }, setNextGroupKey(newKey) { GroupKey.validate(newKey) - nextGroupKey = newKey + nextGroupKeys.unshift(newKey) + nextGroupKeys.length = Math.min(nextGroupKeys.length, 2) } } } @@ -341,7 +361,7 @@ async function getGroupKeysFromStreamMessage(streamMessage, encryptionUtil) { async function SubscriberKeyExhangeSubscription(client, getGroupKeyStore, encryptionUtil) { let sub - async function onKeyExchangeMessage(parsedContent, streamMessage) { + async function onKeyExchangeMessage(_parsedContent, streamMessage) { try { const { messageType } = streamMessage const { MESSAGE_TYPES } = StreamMessage @@ -537,9 +557,9 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) { }) async function getGroupKey(streamMessage) { - if (!streamMessage.groupKeyId) { return undefined } + if (!streamMessage.groupKeyId) { return [] } await next() - if (!enabled) { return undefined } + if (!enabled) { return [] } return getKey(streamMessage) } @@ -549,6 +569,12 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) { enabled = true return next() }, + addNewKey(streamMessage) { + if (!streamMessage.newGroupKey) { return } + const streamId = streamMessage.getStreamId() + const groupKeyStore = getGroupKeyStore(streamId) + groupKeyStore.add(streamMessage.newGroupKey) + }, async stop() { enabled = false return next() diff --git a/src/subscribe/Decrypt.js b/src/subscribe/Decrypt.js index 25ab37f31..6e97f9f76 100644 --- a/src/subscribe/Decrypt.js +++ b/src/subscribe/Decrypt.js @@ -46,6 +46,7 @@ export default function Decrypt(client, options = {}) { throw new UnableToDecryptError(`Group key not found: ${streamMessage.groupKeyId}`, streamMessage) } await EncryptionUtil.decryptStreamMessage(streamMessage, groupKey) + requestKey.addNewKey(streamMessage) } catch (err) { await onError(err, streamMessage) } finally { diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index b98c106ad..69fd82c35 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -180,6 +180,70 @@ describe('decryption', () => { await client.unsubscribe(sub) }, 2 * TIMEOUT) + it('changing group key injects group key into next stream message', async () => { + const done = Defer() + const msgs = [Msg(), Msg(), Msg()] + const received = [] + // subscribe without knowing the group key to decrypt stream messages + const sub = await client.subscribe({ + stream: stream.id, + }, done.wrapError((_parsedContent, streamMessage) => { + // Check signature stuff + received.push(streamMessage) + if (received.length === msgs.length) { + done.resolve() + } + })) + + sub.once('error', done.reject) + + const onEncryptionMessageErr = checkEncryptionMessages(client) + // id | groupKeyId | newGroupKey (encrypted by groupKeyId) + // msg1 gk2 - + // msg2 gk2 gk3 + // msg3 gk3 - + const groupKey1 = GroupKey.generate() + const groupKey2 = GroupKey.generate() + await client.setNextGroupKey(stream.id, groupKey1) + await client.publish(stream.id, msgs[0]) + await client.setNextGroupKey(stream.id, groupKey2) + await client.publish(stream.id, msgs[1]) + await client.publish(stream.id, msgs[2]) + await done + expect(received.map((m) => m.getParsedContent())).toEqual(msgs) + received.forEach((streamMessage, index) => { + expect(streamMessage.signatureType).toBe(StreamMessage.SIGNATURE_TYPES.ETH) + expect(streamMessage.getPublisherId()) + expect(streamMessage.signature) + switch (index) { + case 0: { + expect(streamMessage.newGroupKey).toEqual(null) + expect(streamMessage.groupKeyId).toEqual(groupKey1.id) + break + } + case 1: { + expect(streamMessage.newGroupKey).toEqual(groupKey2) + expect(streamMessage.groupKeyId).toEqual(groupKey1.id) + break + } + case 2: { + expect(streamMessage.newGroupKey).toEqual(null) + expect(streamMessage.groupKeyId).toEqual(groupKey2.id) + break + } + default: { + throw new Error(`should not get here: ${index}`) + } + + } + }) + + onEncryptionMessageErr.resolve() // will be ignored if errored + await onEncryptionMessageErr + // All good, unsubscribe + await client.unsubscribe(sub) + }, 2 * TIMEOUT) + it('errors if rotating group key for no stream', async () => { expect(async () => ( client.rotateGroupKey() From da847fe571ff0f1408020c45d6c4de61c539dd5a Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 24 Feb 2021 16:54:09 -0500 Subject: [PATCH 02/14] feat(encryption): Add client.rekey(streamId). Test revocation works. Avoid test assertions in event handlers. --- src/StreamrClient.ts | 4 ++ src/publish/Encrypt.ts | 3 ++ src/publish/index.js | 6 +++ src/stream/KeyExchange.js | 17 ++++++- test/integration/Encryption.test.js | 79 +++++++++++++++++++++++++++++ 5 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 053719c4f..d821c2417 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -348,6 +348,10 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede return this.publisher.rotateGroupKey(...args) } + rekey(...args: Todo) { + return this.publisher.rekey(...args) + } + /** * @category Important */ diff --git a/src/publish/Encrypt.ts b/src/publish/Encrypt.ts index 9b63784d2..ebee57bae 100644 --- a/src/publish/Encrypt.ts +++ b/src/publish/Encrypt.ts @@ -51,6 +51,9 @@ export default function Encrypt(client: StreamrClient) { rotateGroupKey(...args: Parameters) { return getPublisherKeyExchange().rotateGroupKey(...args) }, + rekey(...args: Parameters) { + return getPublisherKeyExchange().rekey(...args) + }, start() { return getPublisherKeyExchange().start() }, diff --git a/src/publish/index.js b/src/publish/index.js index 40f7bbf3a..485dcdf4c 100644 --- a/src/publish/index.js +++ b/src/publish/index.js @@ -189,6 +189,9 @@ function getCreateStreamMessage(client) { rotateGroupKey(maybeStreamId) { return encrypt.rotateGroupKey(maybeStreamId) }, + rekey(maybeStreamId) { + return encrypt.rekey(maybeStreamId) + }, startKeyExchange() { return encrypt.start() }, @@ -315,6 +318,9 @@ export default function Publisher(client) { }, setNextGroupKey(streamId, newKey) { return createStreamMessage.setNextGroupKey(streamId, newKey) + }, + rekey(streamId) { + return createStreamMessage.rekey(streamId) } } } diff --git a/src/stream/KeyExchange.js b/src/stream/KeyExchange.js index 6e499af3a..6c5f10101 100644 --- a/src/stream/KeyExchange.js +++ b/src/stream/KeyExchange.js @@ -159,6 +159,12 @@ function GroupKeyStore({ groupKeys = new Map() }) { GroupKey.validate(newKey) nextGroupKeys.unshift(newKey) nextGroupKeys.length = Math.min(nextGroupKeys.length, 2) + }, + rekey() { + const newKey = GroupKey.generate() + storeKey(newKey) + currentGroupKeyId = newKey.id + nextGroupKeys.length = 0 } } } @@ -235,7 +241,8 @@ async function PublisherKeyExhangeSubscription(client, getGroupKeyStore) { const subscriberId = streamMessage.getPublisherId() const groupKeyStore = getGroupKeyStore(streamId) - const encryptedGroupKeys = groupKeyIds.map((id) => { + const isSubscriber = await client.isStreamSubscriber(streamId, subscriberId) + const encryptedGroupKeys = !isSubscriber ? [] : groupKeyIds.map((id) => { const groupKey = groupKeyStore.get(id) if (!groupKey) { return null // will be filtered out @@ -336,9 +343,17 @@ export function PublisherKeyExhange(client, { groupKeys = {} } = {}) { return !groupKeyStore.isEmpty() } + async function rekey(streamId) { + if (!enabled) { return } + const groupKeyStore = getGroupKeyStore(streamId) + groupKeyStore.rekey() + await next() + } + return { setNextGroupKey, useGroupKey, + rekey, rotateGroupKey, hasAnyGroupKey, async start() { diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index 69fd82c35..58a79a401 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -616,4 +616,83 @@ describe('decryption', () => { expect(onSubError).toHaveBeenCalledTimes(1) }) + + describe('revoking permissions', () => { + let client2 + beforeEach(async () => { + client2 = createClient() + await client2.connect() + await client2.session.getSessionToken() + }) + + afterEach(async () => { + await client2.disconnect() + }) + + it('fails gracefully if permission revoked', async () => { + const MAX_MESSAGES = 10 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 6 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 2000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('decrypt') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(MAX_MESSAGES - REVOKE_AFTER + 1) // + 1 for final err + }) + }) }) From c4cfb2d589a59242581632f16e61c890bc166e49 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Mon, 11 Jan 2021 09:49:52 -0500 Subject: [PATCH 03/14] Add failing multiple publishers + late subscriber test. --- test/integration/MultipleClients.test.js | 215 +++++++++++++++++++++++ 1 file changed, 215 insertions(+) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 101f0f5b6..7e510fff4 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -512,6 +512,221 @@ describeRepeats('PubSub with multiple clients', () => { }, 60000) }) + test('works with multiple publishers on one stream', async () => { + const onEnd = [] + async function createPublisher() { + const pubClient = createClient({ + auth: { + privateKey: fakePrivateKey(), + } + }) + onEnd.push(() => pubClient.disconnect()) + pubClient.on('error', getOnError(errors)) + const pubUser = await pubClient.getUserInfo() + await stream.grantPermission('stream_get', pubUser.username) + await stream.grantPermission('stream_publish', pubUser.username) + // needed to check last + await stream.grantPermission('stream_subscribe', pubUser.username) + await pubClient.session.getSessionToken() + await pubClient.connect() + return pubClient + } + + try { + await mainClient.session.getSessionToken() + await mainClient.connect() + + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() + + const receivedMessagesOther = {} + const receivedMessagesMain = {} + // subscribe to stream from other client instance + await otherClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || [] + msgs.push(msg) + receivedMessagesOther[streamMessage.getPublisherId()] = msgs + }) + + // subscribe to stream from main client instance + await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || [] + msgs.push(msg) + receivedMessagesMain[streamMessage.getPublisherId()] = msgs + }) + + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } + /* eslint-enable no-await-in-loop */ + const published = {} + await Promise.all(publishers.map(async (pubClient) => { + const publisherId = await pubClient.getPublisherId() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + }) + await publishTestMessages(10, { + delay: 500 + Math.random() * 1500, + afterEach(msg) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(msg) + } + }) + })) + + await wait(5000) + + mainClient.debug('%j', { + published, + receivedMessagesMain, + receivedMessagesOther, + }) + + // eslint-disable-next-line no-inner-declarations + function checkMessages(received) { + for (const [key, msgs] of Object.entries(published)) { + expect(received[key]).toEqual(msgs) + } + } + + checkMessages(receivedMessagesMain) + checkMessages(receivedMessagesOther) + } finally { + await Promise.all(onEnd.map((fn) => fn())) + } + }, 40000) + + test('works with multiple publishers on one stream with late subscriber', async () => { + const onEnd = [] + async function createPublisher() { + const pubClient = createClient({ + auth: { + privateKey: fakePrivateKey(), + } + }) + onEnd.push(() => pubClient.disconnect()) + pubClient.on('error', getOnError(errors)) + const pubUser = await pubClient.getUserInfo() + await stream.grantPermission('stream_get', pubUser.username) + await stream.grantPermission('stream_publish', pubUser.username) + // needed to check last + await stream.grantPermission('stream_subscribe', pubUser.username) + await pubClient.session.getSessionToken() + await pubClient.connect() + return pubClient + } + + const published = {} + function checkMessages(received) { + for (const [key, msgs] of Object.entries(published)) { + expect(received[key]).toEqual(msgs) + } + } + + const MAX_MESSAGES = 10 + + try { + await mainClient.session.getSessionToken() + await mainClient.connect() + + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() + + const receivedMessagesOther = {} + const receivedMessagesMain = {} + + // subscribe to stream from main client instance + const mainSub = await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || [] + msgs.push(msg) + receivedMessagesMain[streamMessage.getPublisherId()] = msgs + if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { + mainSub.end() + } + }) + + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } + + let counter = 0 + /* eslint-enable no-await-in-loop */ + await Promise.all(publishers.map(async (pubClient) => { + const publisherId = await pubClient.getPublisherId() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + }) + await publishTestMessages(MAX_MESSAGES, { + delay: 500 + Math.random() * 1500, + async afterEach(pubMsg) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(pubMsg) + counter += 1 + if (counter === 3) { + // late subscribe to stream from other client instance + const otherSub = await otherClient.subscribe({ + stream: stream.id, + resend: { + last: 1000, + } + }, (msg, streamMessage) => { + const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || [] + msgs.push(msg) + receivedMessagesOther[streamMessage.getPublisherId()] = msgs + if (msgs.length === MAX_MESSAGES) { + return otherSub.end() + } + }) + } + } + }) + })) + + await wait(15000) + + mainClient.debug('%j', { + published, + receivedMessagesMain, + receivedMessagesOther, + }) + + checkMessages(receivedMessagesMain) + checkMessages(receivedMessagesOther) + } finally { + await Promise.all(onEnd.map((fn) => fn())) + } + }, 60000) + test('disconnecting one client does not disconnect the other', async () => { otherClient = createClient({ id: 'other', From 9f849e87ed76ff94f2e10d2a27a246b23d5abd14 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Thu, 4 Mar 2021 14:45:59 -0500 Subject: [PATCH 04/14] test: Passing integration/Encryption.test.js with revoke tests. Cleaned up, but still WIP encryption/subscription error handling. Tracking down unhandled error. Fix unhandled rejection in keyexchange. Improve subscription error handling. Update Encryption tests. Passing integration/Encryption.test.js with revoke tests. Fix publish/Encrypt typing. --- src/publish/index.js | 3 + src/subscribe/index.ts | 46 +++++-- src/subscribe/pipeline.js | 1 + test/integration/Encryption.test.js | 161 ++++++++++++++++++++++- test/integration/MultipleClients.test.js | 8 +- 5 files changed, 201 insertions(+), 18 deletions(-) diff --git a/src/publish/index.js b/src/publish/index.js index 485dcdf4c..b14af0915 100644 --- a/src/publish/index.js +++ b/src/publish/index.js @@ -189,6 +189,9 @@ function getCreateStreamMessage(client) { rotateGroupKey(maybeStreamId) { return encrypt.rotateGroupKey(maybeStreamId) }, + startKeyExchange() { + return encrypt.start() + }, rekey(maybeStreamId) { return encrypt.rekey(maybeStreamId) }, diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index 40c79a84c..44d9a7aef 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -50,7 +50,9 @@ export class Subscription extends Emitter { this.client = client this.options = validateOptions(opts) this.key = this.options.key - this.id = counterId(`Subscription.${this.options.id || ''}${this.key}`) + this.id = counterId(`Subscription:${this.options.id || ''}${this.key}`) + this.debug = client.debug.extend(this.id) + this.debug('create') this.streamId = this.options.streamId this.streamPartition = this.options.streamPartition @@ -75,14 +77,20 @@ export class Subscription extends Emitter { if (event !== 'error') { return super.emit(event, ...args) } + const [error] = args + if (!this.listenerCount('error')) { + this.debug('emitting error but no error listeners, cancelling subscription', error) + this.cancel(error) + return false + } try { - if (this.listenerCount('error')) { - // debugger - return super.emit('error', ...args) - } - throw args[0] + this.debug('emit error', error) + return super.emit('error', ...args) } catch (err) { + if (err !== error) { + this.debug('error emitting error!', err) + } this.cancel(err) return false } @@ -94,6 +102,7 @@ export class Subscription extends Emitter { */ async onPipelineEnd(err?: Error) { + this.debug('onPipelineEnd', err) let error = err try { await this._onFinally(error) @@ -205,6 +214,9 @@ class SubscriptionSession extends Emitter { this.subscriptions = new Set() // active subs this.deletedSubscriptions = new Set() // hold so we can clean up + this.id = counterId(`SubscriptionSession:${this.options.id || ''}${this.options.key}`) + this.debug = this.client.debug.extend(this.id) + this.debug('create') this._init() } @@ -322,6 +334,12 @@ class SubscriptionSession extends Emitter { emit(...args: Todo[]) { const subs = this._getSubs() + if (args[0] === 'error') { + this.debug(args[0], args[1]) + } else { + this.debug(args[0]) + } + try { multiEmit(subs, ...args) } catch (error) { @@ -345,6 +363,7 @@ class SubscriptionSession extends Emitter { async add(sub: Todo) { this.subscriptions.add(sub) + this.debug('add', sub && sub.id) const { connection } = this.client await connection.addHandle(`adding${sub.id}`) try { @@ -366,11 +385,18 @@ class SubscriptionSession extends Emitter { return } + if (this.subscriptions.has(sub)) { + this.debug('remove', sub && sub.id) + } + const cancelTask = sub.cancel() - this.subscriptions.delete(sub) - this.deletedSubscriptions.add(sub) - await this.step() - await cancelTask + try { + this.subscriptions.delete(sub) + this.deletedSubscriptions.add(sub) + await this.step() + } finally { + await cancelTask + } } /** diff --git a/src/subscribe/pipeline.js b/src/subscribe/pipeline.js index 0beb14294..745d2e02f 100644 --- a/src/subscribe/pipeline.js +++ b/src/subscribe/pipeline.js @@ -39,6 +39,7 @@ export default function MessagePipeline(client, opts = {}, onFinally = async (er const seenErrors = new WeakSet() const onErrorFn = options.onError ? options.onError : (error) => { throw error } const onError = async (err) => { + // don't handle same error multiple times if (seenErrors.has(err)) { return } diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index 58a79a401..b55d02426 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -619,17 +619,172 @@ describe('decryption', () => { describe('revoking permissions', () => { let client2 + beforeEach(async () => { - client2 = createClient() + client2 = createClient({ id: 'subscriber' }) await client2.connect() await client2.session.getSessionToken() + client2.debug('new SUBSCRIBER') }) afterEach(async () => { await client2.disconnect() }) - it('fails gracefully if permission revoked', async () => { + it('fails gracefully if permission revoked with low cache maxAge', async () => { + await client.disconnect() + await setupClient({ + id: 'publisher', + cache: { + maxAge: 1, + } + }) + + const MAX_MESSAGES = 6 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 3 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 10000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('not a subscriber') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(1) + }) + + it('fails gracefully if permission revoked with low cache maxAge fail first message', async () => { + await client.disconnect() + await setupClient({ + id: 'publisher', + cache: { + maxAge: 1, + } + }) + const MAX_MESSAGES = 3 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + sub.debug('sub', sub.id) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 1 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 10000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('not a subscriber') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(1) + }) + + it('fails gracefully if permission revoked with high cache maxAge', async () => { + await client.disconnect() + await setupClient({ + cache: { + maxAge: 99999999, + } + }) + const MAX_MESSAGES = 10 await client.rotateGroupKey(stream.id) @@ -692,7 +847,7 @@ describe('decryption', () => { ...published.slice(0, REVOKE_AFTER), ]) - expect(onSubError).toHaveBeenCalledTimes(MAX_MESSAGES - REVOKE_AFTER + 1) // + 1 for final err + expect(onSubError).toHaveBeenCalledTimes(1) }) }) }) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 7e510fff4..02b06907e 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -639,8 +639,6 @@ describeRepeats('PubSub with multiple clients', () => { } } - const MAX_MESSAGES = 10 - try { await mainClient.session.getSessionToken() await mainClient.connect() @@ -681,7 +679,7 @@ describeRepeats('PubSub with multiple clients', () => { let counter = 0 /* eslint-enable no-await-in-loop */ await Promise.all(publishers.map(async (pubClient) => { - const publisherId = await pubClient.getPublisherId() + const publisherId = pubClient.getPublisherId() const publishTestMessages = getPublishTestMessages(pubClient, { stream, waitForLast: true, @@ -699,12 +697,12 @@ describeRepeats('PubSub with multiple clients', () => { resend: { last: 1000, } - }, (msg, streamMessage) => { + }, async (msg, streamMessage) => { const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || [] msgs.push(msg) receivedMessagesOther[streamMessage.getPublisherId()] = msgs if (msgs.length === MAX_MESSAGES) { - return otherSub.end() + await otherSub.end() } }) } From 6e52f77d0588587184fdb0036afb186cb0a830b5 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Thu, 11 Mar 2021 16:15:53 -0500 Subject: [PATCH 05/14] Linting. --- src/publish/index.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/publish/index.js b/src/publish/index.js index b14af0915..485dcdf4c 100644 --- a/src/publish/index.js +++ b/src/publish/index.js @@ -189,9 +189,6 @@ function getCreateStreamMessage(client) { rotateGroupKey(maybeStreamId) { return encrypt.rotateGroupKey(maybeStreamId) }, - startKeyExchange() { - return encrypt.start() - }, rekey(maybeStreamId) { return encrypt.rekey(maybeStreamId) }, From 5b3224a579e741dec90c8f979f25b855f7b67fbc Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Mon, 15 Mar 2021 10:33:23 -0400 Subject: [PATCH 06/14] Fixing MultipleClients test. WIP. --- package-lock.json | 44 +++++++++++++----------- test/integration/MessagePipeline.test.js | 3 +- test/integration/MultipleClients.test.js | 23 +++---------- 3 files changed, 29 insertions(+), 41 deletions(-) diff --git a/package-lock.json b/package-lock.json index 0aa6d1826..606086c35 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5243,6 +5243,17 @@ "requires": { "array-ify": "^1.0.0", "dot-prop": "^5.1.0" + }, + "dependencies": { + "dot-prop": { + "version": "5.3.0", + "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-5.3.0.tgz", + "integrity": "sha512-QM8q3zDe58hqUqjraQOmzZ1LIH9SWQJTlEKCH4kJ2oQvLZk7RbQXvtDM2XEq3fwkV9CCvvH4LA0AV+ogFsBM2Q==", + "dev": true, + "requires": { + "is-obj": "^2.0.0" + } + } } }, "component-emitter": { @@ -5821,15 +5832,6 @@ } } }, - "dot-prop": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-5.3.0.tgz", - "integrity": "sha512-QM8q3zDe58hqUqjraQOmzZ1LIH9SWQJTlEKCH4kJ2oQvLZk7RbQXvtDM2XEq3fwkV9CCvvH4LA0AV+ogFsBM2Q==", - "dev": true, - "requires": { - "is-obj": "^2.0.0" - } - }, "dotenv": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-7.0.0.tgz", @@ -7843,15 +7845,6 @@ "minimalistic-crypto-utils": "^1.0.1" } }, - "hosted-git-info": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-4.0.2.tgz", - "integrity": "sha512-c9OGXbZ3guC/xOlCg1Ci/VgWlwsqDv1yMQL1CWqXDL0hDjXuNcq0zuR4xqPSuasI3kqFDhqSyTjREz5gzq0fXg==", - "dev": true, - "requires": { - "lru-cache": "^6.0.0" - } - }, "html-encoding-sniffer": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/html-encoding-sniffer/-/html-encoding-sniffer-2.0.1.tgz", @@ -10641,6 +10634,15 @@ "yargs-parser": "^20.2.3" }, "dependencies": { + "hosted-git-info": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-4.0.1.tgz", + "integrity": "sha512-eT7NrxAsppPRQEBSwKSosReE+v8OzABwEScQYk5d4uxaEPlzxTIku7LINXtBGalthkLhJnq5lBI89PfK43zAKg==", + "dev": true, + "requires": { + "lru-cache": "^6.0.0" + } + }, "normalize-package-data": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/normalize-package-data/-/normalize-package-data-3.0.2.tgz", @@ -14430,9 +14432,9 @@ "dev": true }, "uglify-js": { - "version": "3.13.4", - "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.13.4.tgz", - "integrity": "sha512-kv7fCkIXyQIilD5/yQy8O+uagsYIOt5cZvs890W40/e/rvjMSzJw81o9Bg0tkURxzZBROtDQhW2LFjOGoK3RZw==", + "version": "3.13.5", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.13.5.tgz", + "integrity": "sha512-xtB8yEqIkn7zmOyS2zUNBsYCBRhDkvlNxMMY2smuJ/qA8NCHeQvKCF3i9Z4k8FJH4+PJvZRtMrPynfZ75+CSZw==", "dev": true, "optional": true }, diff --git a/test/integration/MessagePipeline.test.js b/test/integration/MessagePipeline.test.js index 54f807498..2e6aa6403 100644 --- a/test/integration/MessagePipeline.test.js +++ b/test/integration/MessagePipeline.test.js @@ -177,8 +177,7 @@ describe('MessagePipeline', () => { const onPipelineError = jest.fn((err) => { throw err }) - const msgStream = new PushQueue([ - ]) + const msgStream = new PushQueue([]) const p = MessagePipeline(client, { ...MOCK_INVALID_GROUP_KEY_MESSAGE.messageId, msgStream, diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 02b06907e..064137f72 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -449,6 +449,7 @@ describeRepeats('PubSub with multiple clients', () => { addAfter(() => { counterId.clear(publisherId) // prevent overflows in counter }) + const publishTestMessages = getPublishTestMessages(pubClient, { stream, waitForLast: true, @@ -576,7 +577,7 @@ describeRepeats('PubSub with multiple clients', () => { /* eslint-enable no-await-in-loop */ const published = {} await Promise.all(publishers.map(async (pubClient) => { - const publisherId = await pubClient.getPublisherId() + const publisherId = (await pubClient.getPublisherId()).toLowerCase() const publishTestMessages = getPublishTestMessages(pubClient, { stream, waitForLast: true, @@ -590,14 +591,6 @@ describeRepeats('PubSub with multiple clients', () => { }) })) - await wait(5000) - - mainClient.debug('%j', { - published, - receivedMessagesMain, - receivedMessagesOther, - }) - // eslint-disable-next-line no-inner-declarations function checkMessages(received) { for (const [key, msgs] of Object.entries(published)) { @@ -666,7 +659,7 @@ describeRepeats('PubSub with multiple clients', () => { msgs.push(msg) receivedMessagesMain[streamMessage.getPublisherId()] = msgs if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { - mainSub.end() + mainSub.cancel() } }) @@ -679,7 +672,7 @@ describeRepeats('PubSub with multiple clients', () => { let counter = 0 /* eslint-enable no-await-in-loop */ await Promise.all(publishers.map(async (pubClient) => { - const publisherId = pubClient.getPublisherId() + const publisherId = (await pubClient.getPublisherId()).toString() const publishTestMessages = getPublishTestMessages(pubClient, { stream, waitForLast: true, @@ -702,7 +695,7 @@ describeRepeats('PubSub with multiple clients', () => { msgs.push(msg) receivedMessagesOther[streamMessage.getPublisherId()] = msgs if (msgs.length === MAX_MESSAGES) { - await otherSub.end() + await otherSub.cancel() } }) } @@ -712,12 +705,6 @@ describeRepeats('PubSub with multiple clients', () => { await wait(15000) - mainClient.debug('%j', { - published, - receivedMessagesMain, - receivedMessagesOther, - }) - checkMessages(receivedMessagesMain) checkMessages(receivedMessagesOther) } finally { From f2c73862d350ea04e904906cf17169e59e3e3499 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 23 Mar 2021 13:32:42 -0400 Subject: [PATCH 07/14] types(subscribe): Fix subscription & stream validateOptions types. --- src/subscribe/index.ts | 31 ++++++++++++++++++++----------- src/utils/index.ts | 2 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index 44d9a7aef..800fea9fb 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -29,21 +29,25 @@ export class Subscription extends Emitter { /** @internal */ client: StreamrClient /** @internal */ - options: Todo + options: ReturnType & { + id?: string + } /** @internal */ - key: Todo + key /** @internal */ - id: Todo + id /** @internal */ - _onDone: Todo + _onDone: ReturnType /** @internal */ - _onFinally: Todo + _onFinally /** @internal */ - pipeline: Todo + pipeline /** @internal */ - msgStream: Todo + msgStream /** @internal */ iterated?: Todo + /** @internal */ + debug constructor(client: StreamrClient, opts: Todo, onFinally = defaultOnFinally) { super() @@ -194,11 +198,16 @@ function multiEmit(emitters: Todo, ...args: Todo[]) { */ class SubscriptionSession extends Emitter { - + id + debug client: StreamrClient - options: Todo - validate: Todo - subscriptions: Set + options: ReturnType & { + id: string + subscribe: typeof subscribe + unsubscribe: typeof unsubscribe + } + validate + subscriptions: Set deletedSubscriptions: Set step?: Todo _subscribe diff --git a/src/utils/index.ts b/src/utils/index.ts index 3af15be77..caf748c8b 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -272,7 +272,7 @@ export function Defer(executor: (...args: Parameters['then']>) => } } - function handleErrBack(err: Error) { + function handleErrBack(err?: Error) { if (err) { reject(err) } else { From 7c7c7d18bff24ab623bf293b5c3fcf81b5744392 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 23 Mar 2021 13:33:30 -0400 Subject: [PATCH 08/14] refactor(test): Tidy MultipleClients late sub test. --- test/integration/MultipleClients.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 064137f72..39e963a7a 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -477,7 +477,7 @@ describeRepeats('PubSub with multiple clients', () => { addAfter(async () => { await lateSub.unsubscribe() }) - } + } await publishTestMessages(MAX_MESSAGES, { waitForLast: true, From 0f30ab20de3443d3dac2aa160e35795a882eb33a Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 23 Mar 2021 13:43:45 -0400 Subject: [PATCH 09/14] types(client): Explicitly type top-level groupkey handling methods. --- src/StreamrClient.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index d821c2417..365da657f 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -23,6 +23,7 @@ import { BigNumber } from '@ethersproject/bignumber' import { getAddress } from '@ethersproject/address' import { Contract } from '@ethersproject/contracts' import { StreamPartDefinition } from './stream' +import type { GroupKey } from './stream/Encryption' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> @@ -160,7 +161,7 @@ function Plugin(targetInstance: any, srcInstance: any) { } // these are mixed in via Plugin function above -export interface StreamrClient extends StreamEndpoints, LoginEndpoints {} +export interface StreamrClient extends StreamEndpoints, LoginEndpoints, ReturnType, Subscriber {} /** * @category Important @@ -340,16 +341,16 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede return getUserId(this) } - setNextGroupKey(...args: Todo) { - return this.publisher.setNextGroupKey(...args) + setNextGroupKey(streamId: string, newKey: GroupKey) { + return this.publisher.setNextGroupKey(streamId, newKey) } - rotateGroupKey(...args: Todo) { - return this.publisher.rotateGroupKey(...args) + rotateGroupKey(streamId: string) { + return this.publisher.rotateGroupKey(streamId) } - rekey(...args: Todo) { - return this.publisher.rekey(...args) + rekey(streamId: string) { + return this.publisher.rekey(streamId) } /** From 3d7602aeba066713f18d93478a2dfb78166da26d Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 24 Mar 2021 10:28:48 -0400 Subject: [PATCH 10/14] refactor(keyexchange): Tidy up useGroupKey with switch instead of multiple if + early return. --- src/stream/KeyExchange.js | 64 ++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/stream/KeyExchange.js b/src/stream/KeyExchange.js index 6c5f10101..0f46c36ba 100644 --- a/src/stream/KeyExchange.js +++ b/src/stream/KeyExchange.js @@ -106,38 +106,40 @@ function GroupKeyStore({ groupKeys = new Map() }) { }, useGroupKey() { const nextGroupKey = nextGroupKeys.pop() - // first message - if (!currentGroupKeyId && nextGroupKey) { - storeKey(nextGroupKey) - currentGroupKeyId = nextGroupKey.id - return [ - this.get(currentGroupKeyId), - undefined, - ] - } - - // key changed - if (currentGroupKeyId && nextGroupKey) { - storeKey(nextGroupKey) - const prevGroupKey = this.get(currentGroupKeyId) - currentGroupKeyId = nextGroupKey.id - // use current key one more time - return [ - prevGroupKey, - nextGroupKey, - ] - } - - // generate & use key if none already set - if (!currentGroupKeyId) { - this.rotateGroupKey() - return this.useGroupKey() + switch (true) { + // First use of group key on this stream, no current key. Make next key current. + case (!currentGroupKeyId && nextGroupKey): { + storeKey(nextGroupKey) + currentGroupKeyId = nextGroupKey.id + return [ + this.get(currentGroupKeyId), + undefined, + ] + } + // Keep using current key (empty next) + case (currentGroupKeyId && !nextGroupKey): { + return [ + this.get(currentGroupKeyId), + undefined + ] + } + // Key changed (non-empty next). return current + next. Make next key current. + case (currentGroupKeyId && nextGroupKey): { + storeKey(nextGroupKey) + const prevGroupKey = this.get(currentGroupKeyId) + currentGroupKeyId = nextGroupKey.id + // use current key one more time + return [ + prevGroupKey, + nextGroupKey, + ] + } + // Generate & use new key if none already set. + default: { + this.rotateGroupKey() + return this.useGroupKey() + } } - - return [ - this.get(currentGroupKeyId), - nextGroupKey - ] }, get(groupKeyId) { const groupKey = store.get(groupKeyId) From 7a8ed9faeaf9964c5127413497a53936d4966ec1 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 24 Mar 2021 12:24:02 -0400 Subject: [PATCH 11/14] test(multipleclients): Fix late sub test. --- test/integration/MultipleClients.test.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 39e963a7a..d1486870a 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -477,13 +477,11 @@ describeRepeats('PubSub with multiple clients', () => { addAfter(async () => { await lateSub.unsubscribe() }) - } + } - await publishTestMessages(MAX_MESSAGES, { + published[publisherId] = await publishTestMessages(MAX_MESSAGES, { waitForLast: true, - async afterEach(pubMsg, req) { - published[publisherId] = published[publisherId] || [] - published[publisherId].push(pubMsg) + async afterEach(_pubMsg, req) { counter += 1 if (counter === 3) { await waitForStorage(req) // make sure lastest message has hit storage From b0354198ed7969058658c856e76b323b4b5ff2ec Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 24 Mar 2021 12:55:41 -0400 Subject: [PATCH 12/14] fix(keyexchange): Fix bad switch case boolean. --- src/stream/KeyExchange.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/KeyExchange.js b/src/stream/KeyExchange.js index 0f46c36ba..778f78d30 100644 --- a/src/stream/KeyExchange.js +++ b/src/stream/KeyExchange.js @@ -108,7 +108,7 @@ function GroupKeyStore({ groupKeys = new Map() }) { const nextGroupKey = nextGroupKeys.pop() switch (true) { // First use of group key on this stream, no current key. Make next key current. - case (!currentGroupKeyId && nextGroupKey): { + case !!(!currentGroupKeyId && nextGroupKey): { storeKey(nextGroupKey) currentGroupKeyId = nextGroupKey.id return [ @@ -117,14 +117,14 @@ function GroupKeyStore({ groupKeys = new Map() }) { ] } // Keep using current key (empty next) - case (currentGroupKeyId && !nextGroupKey): { + case !!(currentGroupKeyId && !nextGroupKey): { return [ this.get(currentGroupKeyId), undefined ] } // Key changed (non-empty next). return current + next. Make next key current. - case (currentGroupKeyId && nextGroupKey): { + case !!(currentGroupKeyId && nextGroupKey): { storeKey(nextGroupKey) const prevGroupKey = this.get(currentGroupKeyId) currentGroupKeyId = nextGroupKey.id From 793fb543e5a3abd995c6ab13f08c649058224149 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 4 May 2021 16:30:57 -0400 Subject: [PATCH 13/14] test(keyexchange): Add test repeats & reduce test timeout for Encryption tests. --- test/integration/Encryption.test.js | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index b55d02426..8822a9925 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -1,7 +1,7 @@ import { wait } from 'streamr-test-utils' import { MessageLayer } from 'streamr-client-protocol' -import { fakePrivateKey, uid, Msg, getPublishTestMessages } from '../utils' +import { describeRepeats, fakePrivateKey, uid, Msg, getPublishTestMessages } from '../utils' import { Defer } from '../../src/utils' import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/stream/Encryption' @@ -10,11 +10,11 @@ import { StorageNode } from '../../src/stream/StorageNode' import config from './config' -const TIMEOUT = 30 * 1000 +const TIMEOUT = 10 * 1000 const { StreamMessage } = MessageLayer -describe('decryption', () => { +describeRepeats('decryption', () => { let publishTestMessages let expectErrors = 0 // check no errors by default let errors = [] @@ -178,7 +178,7 @@ describe('decryption', () => { await onEncryptionMessageErr // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('changing group key injects group key into next stream message', async () => { const done = Defer() @@ -242,7 +242,7 @@ describe('decryption', () => { await onEncryptionMessageErr // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('errors if rotating group key for no stream', async () => { expect(async () => ( @@ -306,7 +306,7 @@ describe('decryption', () => { await otherClient.logout() } } - }, 2 * TIMEOUT) + }, TIMEOUT) it('does not encrypt messages in stream without groupkey', async () => { const name = uid('stream') @@ -379,7 +379,7 @@ describe('decryption', () => { onEncryptionMessageErr.resolve() // will be ignored if errored await onEncryptionMessageErr expect(didFindStream2).toBeTruthy() - }, 2 * TIMEOUT) + }, TIMEOUT) it('sets group key per-stream', async () => { const name = uid('stream') @@ -452,7 +452,7 @@ describe('decryption', () => { ]) onEncryptionMessageErr.resolve() // will be ignored if errored await onEncryptionMessageErr - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.subscribe can get the group key and decrypt multiple encrypted messages using an RSA key pair', async () => { // subscribe without knowing the group key to decrypt stream messages @@ -476,7 +476,7 @@ describe('decryption', () => { // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('subscribe with changing group key', async () => { // subscribe without knowing the group key to decrypt stream messages @@ -502,7 +502,7 @@ describe('decryption', () => { // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.resend last can get the historical keys for previous encrypted messages', async () => { // Publish encrypted messages with different keys @@ -525,7 +525,7 @@ describe('decryption', () => { expect(received).toEqual(published.slice(-2)) await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.subscribe with resend last can get the historical keys for previous encrypted messages', async () => { // Publish encrypted messages with different keys @@ -554,7 +554,7 @@ describe('decryption', () => { expect(received).toEqual(published.slice(-2)) await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('fails gracefully if cannot decrypt', async () => { const MAX_MESSAGES = 10 From e229b81fe9b1251d8c3bf25b868ac9631dc4231a Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 5 May 2021 16:10:56 -0400 Subject: [PATCH 14/14] test: Make MultipleClients test more reliable. --- test/integration/MultipleClients.test.js | 342 +++++++++++------------ 1 file changed, 161 insertions(+), 181 deletions(-) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index d1486870a..6c55c5e72 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -419,9 +419,10 @@ describeRepeats('PubSub with multiple clients', () => { const mainSub = await mainClient.subscribe({ stream: stream.id, }, (msg, streamMessage) => { - const msgs = receivedMessagesMain[streamMessage.getPublisherId().toLowerCase()] || [] + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] msgs.push(msg) - receivedMessagesMain[streamMessage.getPublisherId().toLowerCase()] = msgs + receivedMessagesMain[key] = msgs if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { mainSub.unsubscribe() } @@ -469,9 +470,10 @@ describeRepeats('PubSub with multiple clients', () => { last: 1000, } }, (msg, streamMessage) => { - const msgs = receivedMessagesOther[streamMessage.getPublisherId().toLowerCase()] || [] + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] msgs.push(msg) - receivedMessagesOther[streamMessage.getPublisherId().toLowerCase()] = msgs + receivedMessagesOther[key] = msgs }) addAfter(async () => { @@ -499,215 +501,193 @@ describeRepeats('PubSub with multiple clients', () => { } catch (err) { return false } - }, 15000).catch((err) => { + }, 15000, 300).catch((err) => { // convert timeout to actual error checkMessages(published, receivedMessagesMain) checkMessages(published, receivedMessagesOther) throw err }) - - checkMessages(published, receivedMessagesMain) - checkMessages(published, receivedMessagesOther) }, 60000) }) test('works with multiple publishers on one stream', async () => { - const onEnd = [] - async function createPublisher() { - const pubClient = createClient({ - auth: { - privateKey: fakePrivateKey(), - } - }) - onEnd.push(() => pubClient.disconnect()) - pubClient.on('error', getOnError(errors)) - const pubUser = await pubClient.getUserInfo() - await stream.grantPermission('stream_get', pubUser.username) - await stream.grantPermission('stream_publish', pubUser.username) - // needed to check last - await stream.grantPermission('stream_subscribe', pubUser.username) - await pubClient.session.getSessionToken() - await pubClient.connect() - return pubClient - } - - try { - await mainClient.session.getSessionToken() - await mainClient.connect() + await mainClient.session.getSessionToken() + await mainClient.connect() - otherClient = createClient({ - auth: { - privateKey - } - }) - otherClient.on('error', getOnError(errors)) - await otherClient.session.getSessionToken() - const otherUser = await otherClient.getUserInfo() - await stream.grantPermission('stream_get', otherUser.username) - await stream.grantPermission('stream_subscribe', otherUser.username) - await otherClient.connect() + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() - const receivedMessagesOther = {} - const receivedMessagesMain = {} - // subscribe to stream from other client instance - await otherClient.subscribe({ - stream: stream.id, - }, (msg, streamMessage) => { - const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || [] - msgs.push(msg) - receivedMessagesOther[streamMessage.getPublisherId()] = msgs - }) + const receivedMessagesOther = {} + const receivedMessagesMain = {} + // subscribe to stream from other client instance + await otherClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] + msgs.push(msg) + receivedMessagesOther[key] = msgs + }) - // subscribe to stream from main client instance - await mainClient.subscribe({ - stream: stream.id, - }, (msg, streamMessage) => { - const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || [] - msgs.push(msg) - receivedMessagesMain[streamMessage.getPublisherId()] = msgs - }) + // subscribe to stream from main client instance + await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] + msgs.push(msg) + receivedMessagesMain[key] = msgs + }) - /* eslint-disable no-await-in-loop */ - const publishers = [] - for (let i = 0; i < 3; i++) { - publishers.push(await createPublisher()) - } - /* eslint-enable no-await-in-loop */ - const published = {} - await Promise.all(publishers.map(async (pubClient) => { - const publisherId = (await pubClient.getPublisherId()).toLowerCase() - const publishTestMessages = getPublishTestMessages(pubClient, { - stream, - waitForLast: true, - }) - await publishTestMessages(10, { - delay: 500 + Math.random() * 1500, - afterEach(msg) { - published[publisherId] = published[publisherId] || [] - published[publisherId].push(msg) - } - }) - })) + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } - // eslint-disable-next-line no-inner-declarations - function checkMessages(received) { - for (const [key, msgs] of Object.entries(published)) { - expect(received[key]).toEqual(msgs) + /* eslint-enable no-await-in-loop */ + const published = {} + await Promise.all(publishers.map(async (pubClient) => { + const publisherId = (await pubClient.getPublisherId()).toLowerCase() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + }) + await publishTestMessages(10, { + delay: 500 + Math.random() * 1500, + afterEach(msg) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(msg) } + }) + })) + + await waitForCondition(() => { + try { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + return true + } catch (err) { + return false } + }, 5000).catch(() => { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + }) - checkMessages(receivedMessagesMain) - checkMessages(receivedMessagesOther) - } finally { - await Promise.all(onEnd.map((fn) => fn())) - } }, 40000) test('works with multiple publishers on one stream with late subscriber', async () => { - const onEnd = [] - async function createPublisher() { - const pubClient = createClient({ - auth: { - privateKey: fakePrivateKey(), - } - }) - onEnd.push(() => pubClient.disconnect()) - pubClient.on('error', getOnError(errors)) - const pubUser = await pubClient.getUserInfo() - await stream.grantPermission('stream_get', pubUser.username) - await stream.grantPermission('stream_publish', pubUser.username) - // needed to check last - await stream.grantPermission('stream_subscribe', pubUser.username) - await pubClient.session.getSessionToken() - await pubClient.connect() - return pubClient - } - const published = {} - function checkMessages(received) { - for (const [key, msgs] of Object.entries(published)) { - expect(received[key]).toEqual(msgs) - } - } + await mainClient.session.getSessionToken() + await mainClient.connect() - try { - await mainClient.session.getSessionToken() - await mainClient.connect() + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() - otherClient = createClient({ - auth: { - privateKey - } - }) - otherClient.on('error', getOnError(errors)) - await otherClient.session.getSessionToken() - const otherUser = await otherClient.getUserInfo() - await stream.grantPermission('stream_get', otherUser.username) - await stream.grantPermission('stream_subscribe', otherUser.username) - await otherClient.connect() + const receivedMessagesOther = {} + const receivedMessagesMain = {} + + // subscribe to stream from main client instance + const mainSub = await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] + msgs.push(msg) + receivedMessagesMain[key] = msgs + if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { + mainSub.cancel() + } + }) - const receivedMessagesOther = {} - const receivedMessagesMain = {} + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } - // subscribe to stream from main client instance - const mainSub = await mainClient.subscribe({ - stream: stream.id, - }, (msg, streamMessage) => { - const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || [] - msgs.push(msg) - receivedMessagesMain[streamMessage.getPublisherId()] = msgs - if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { - mainSub.cancel() - } + let counter = 0 + /* eslint-enable no-await-in-loop */ + await Promise.all(publishers.map(async (pubClient) => { + const waitForStorage = getWaitForStorage(pubClient, { + stream, + timeout: 10000, + count: MAX_MESSAGES * publishers.length, }) - /* eslint-disable no-await-in-loop */ - const publishers = [] - for (let i = 0; i < 3; i++) { - publishers.push(await createPublisher()) - } + const publisherId = (await pubClient.getPublisherId()).toString().toLowerCase() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + waitForLastTimeout: 10000, + waitForLastCount: MAX_MESSAGES * publishers.length, + delay: 500 + Math.random() * 1500, + }) - let counter = 0 - /* eslint-enable no-await-in-loop */ - await Promise.all(publishers.map(async (pubClient) => { - const publisherId = (await pubClient.getPublisherId()).toString() - const publishTestMessages = getPublishTestMessages(pubClient, { - stream, - waitForLast: true, - }) - await publishTestMessages(MAX_MESSAGES, { - delay: 500 + Math.random() * 1500, - async afterEach(pubMsg) { - published[publisherId] = published[publisherId] || [] - published[publisherId].push(pubMsg) - counter += 1 - if (counter === 3) { - // late subscribe to stream from other client instance - const otherSub = await otherClient.subscribe({ - stream: stream.id, - resend: { - last: 1000, - } - }, async (msg, streamMessage) => { - const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || [] - msgs.push(msg) - receivedMessagesOther[streamMessage.getPublisherId()] = msgs - if (msgs.length === MAX_MESSAGES) { - await otherSub.cancel() - } - }) - } + async function addLateSubscriber() { + // late subscribe to stream from other client instance + const lateSub = await otherClient.subscribe({ + stream: stream.id, + resend: { + last: 1000, } + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] + msgs.push(msg) + receivedMessagesOther[key] = msgs }) - })) - await wait(15000) + addAfter(async () => { + await lateSub.unsubscribe() + }) + } - checkMessages(receivedMessagesMain) - checkMessages(receivedMessagesOther) - } finally { - await Promise.all(onEnd.map((fn) => fn())) - } + await publishTestMessages(MAX_MESSAGES, { + async afterEach(pubMsg, req) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(pubMsg) + counter += 1 + if (counter === 3) { + await waitForStorage(req) // make sure lastest message has hit storage + // late subscribe to stream from other client instance + await addLateSubscriber() + } + } + }) + })) + + await waitForCondition(() => { + try { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + return true + } catch (err) { + return false + } + }, 15000, 300).catch(() => { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + }) }, 60000) test('disconnecting one client does not disconnect the other', async () => {