diff --git a/package-lock.json b/package-lock.json index 0aa6d1826..606086c35 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5243,6 +5243,17 @@ "requires": { "array-ify": "^1.0.0", "dot-prop": "^5.1.0" + }, + "dependencies": { + "dot-prop": { + "version": "5.3.0", + "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-5.3.0.tgz", + "integrity": "sha512-QM8q3zDe58hqUqjraQOmzZ1LIH9SWQJTlEKCH4kJ2oQvLZk7RbQXvtDM2XEq3fwkV9CCvvH4LA0AV+ogFsBM2Q==", + "dev": true, + "requires": { + "is-obj": "^2.0.0" + } + } } }, "component-emitter": { @@ -5821,15 +5832,6 @@ } } }, - "dot-prop": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-5.3.0.tgz", - "integrity": "sha512-QM8q3zDe58hqUqjraQOmzZ1LIH9SWQJTlEKCH4kJ2oQvLZk7RbQXvtDM2XEq3fwkV9CCvvH4LA0AV+ogFsBM2Q==", - "dev": true, - "requires": { - "is-obj": "^2.0.0" - } - }, "dotenv": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-7.0.0.tgz", @@ -7843,15 +7845,6 @@ "minimalistic-crypto-utils": "^1.0.1" } }, - "hosted-git-info": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-4.0.2.tgz", - "integrity": "sha512-c9OGXbZ3guC/xOlCg1Ci/VgWlwsqDv1yMQL1CWqXDL0hDjXuNcq0zuR4xqPSuasI3kqFDhqSyTjREz5gzq0fXg==", - "dev": true, - "requires": { - "lru-cache": "^6.0.0" - } - }, "html-encoding-sniffer": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/html-encoding-sniffer/-/html-encoding-sniffer-2.0.1.tgz", @@ -10641,6 +10634,15 @@ "yargs-parser": "^20.2.3" }, "dependencies": { + "hosted-git-info": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-4.0.1.tgz", + "integrity": "sha512-eT7NrxAsppPRQEBSwKSosReE+v8OzABwEScQYk5d4uxaEPlzxTIku7LINXtBGalthkLhJnq5lBI89PfK43zAKg==", + "dev": true, + "requires": { + "lru-cache": "^6.0.0" + } + }, "normalize-package-data": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/normalize-package-data/-/normalize-package-data-3.0.2.tgz", @@ -14430,9 +14432,9 @@ "dev": true }, "uglify-js": { - "version": "3.13.4", - "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.13.4.tgz", - "integrity": "sha512-kv7fCkIXyQIilD5/yQy8O+uagsYIOt5cZvs890W40/e/rvjMSzJw81o9Bg0tkURxzZBROtDQhW2LFjOGoK3RZw==", + "version": "3.13.5", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.13.5.tgz", + "integrity": "sha512-xtB8yEqIkn7zmOyS2zUNBsYCBRhDkvlNxMMY2smuJ/qA8NCHeQvKCF3i9Z4k8FJH4+PJvZRtMrPynfZ75+CSZw==", "dev": true, "optional": true }, diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 053719c4f..365da657f 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -23,6 +23,7 @@ import { BigNumber } from '@ethersproject/bignumber' import { getAddress } from '@ethersproject/address' import { Contract } from '@ethersproject/contracts' import { StreamPartDefinition } from './stream' +import type { GroupKey } from './stream/Encryption' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> @@ -160,7 +161,7 @@ function Plugin(targetInstance: any, srcInstance: any) { } // these are mixed in via Plugin function above -export interface StreamrClient extends StreamEndpoints, LoginEndpoints {} +export interface StreamrClient extends StreamEndpoints, LoginEndpoints, ReturnType, Subscriber {} /** * @category Important @@ -340,12 +341,16 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede return getUserId(this) } - setNextGroupKey(...args: Todo) { - return this.publisher.setNextGroupKey(...args) + setNextGroupKey(streamId: string, newKey: GroupKey) { + return this.publisher.setNextGroupKey(streamId, newKey) } - rotateGroupKey(...args: Todo) { - return this.publisher.rotateGroupKey(...args) + rotateGroupKey(streamId: string) { + return this.publisher.rotateGroupKey(streamId) + } + + rekey(streamId: string) { + return this.publisher.rekey(streamId) } /** diff --git a/src/publish/Encrypt.ts b/src/publish/Encrypt.ts index f3a752708..ebee57bae 100644 --- a/src/publish/Encrypt.ts +++ b/src/publish/Encrypt.ts @@ -39,8 +39,9 @@ export default function Encrypt(client: StreamrClient) { if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.MESSAGE) { return } - const groupKey = await getPublisherKeyExchange().useGroupKey(stream.id) - await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey) + + const [groupKey, nextGroupKey] = await getPublisherKeyExchange().useGroupKey(stream.id) + await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey, nextGroupKey) } return Object.assign(encrypt, { @@ -50,6 +51,9 @@ export default function Encrypt(client: StreamrClient) { rotateGroupKey(...args: Parameters) { return getPublisherKeyExchange().rotateGroupKey(...args) }, + rekey(...args: Parameters) { + return getPublisherKeyExchange().rekey(...args) + }, start() { return getPublisherKeyExchange().start() }, diff --git a/src/publish/index.js b/src/publish/index.js index 40f7bbf3a..485dcdf4c 100644 --- a/src/publish/index.js +++ b/src/publish/index.js @@ -189,6 +189,9 @@ function getCreateStreamMessage(client) { rotateGroupKey(maybeStreamId) { return encrypt.rotateGroupKey(maybeStreamId) }, + rekey(maybeStreamId) { + return encrypt.rekey(maybeStreamId) + }, startKeyExchange() { return encrypt.start() }, @@ -315,6 +318,9 @@ export default function Publisher(client) { }, setNextGroupKey(streamId, newKey) { return createStreamMessage.setNextGroupKey(streamId, newKey) + }, + rekey(streamId) { + return createStreamMessage.rekey(streamId) } } } diff --git a/src/stream/KeyExchange.js b/src/stream/KeyExchange.js index 8aee4be89..778f78d30 100644 --- a/src/stream/KeyExchange.js +++ b/src/stream/KeyExchange.js @@ -65,7 +65,7 @@ function GroupKeyStore({ groupKeys = new Map() }) { }) let currentGroupKeyId // current key id if any - let nextGroupKey // key to use next, disappears if not actually used. + const nextGroupKeys = [] // the keys to use next, disappears if not actually used. Max queue size 2 store.forEach((groupKey) => { GroupKey.validate(GroupKey.from(groupKey)) @@ -79,7 +79,8 @@ function GroupKeyStore({ groupKeys = new Map() }) { const existingKey = GroupKey.from(store.get(groupKey.id)) if (!existingKey.equals(groupKey)) { throw new GroupKey.InvalidGroupKeyError( - `Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.` + `Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.`, + groupKey ) } @@ -96,29 +97,49 @@ function GroupKeyStore({ groupKeys = new Map() }) { has(groupKeyId) { if (currentGroupKeyId === groupKeyId) { return true } - if (nextGroupKey && nextGroupKey.id === groupKeyId) { return true } + if (nextGroupKeys.some((nextKey) => nextKey.id === groupKeyId)) { return true } return store.has(groupKeyId) }, isEmpty() { - return !nextGroupKey && store.size === 0 + return nextGroupKeys.length === 0 && store.size === 0 }, useGroupKey() { - if (nextGroupKey) { - // next key becomes current key - storeKey(nextGroupKey) - - currentGroupKeyId = nextGroupKey.id - nextGroupKey = undefined - } - - if (!currentGroupKeyId) { - // generate & use key if none already set - this.rotateGroupKey() - return this.useGroupKey() + const nextGroupKey = nextGroupKeys.pop() + switch (true) { + // First use of group key on this stream, no current key. Make next key current. + case !!(!currentGroupKeyId && nextGroupKey): { + storeKey(nextGroupKey) + currentGroupKeyId = nextGroupKey.id + return [ + this.get(currentGroupKeyId), + undefined, + ] + } + // Keep using current key (empty next) + case !!(currentGroupKeyId && !nextGroupKey): { + return [ + this.get(currentGroupKeyId), + undefined + ] + } + // Key changed (non-empty next). return current + next. Make next key current. + case !!(currentGroupKeyId && nextGroupKey): { + storeKey(nextGroupKey) + const prevGroupKey = this.get(currentGroupKeyId) + currentGroupKeyId = nextGroupKey.id + // use current key one more time + return [ + prevGroupKey, + nextGroupKey, + ] + } + // Generate & use new key if none already set. + default: { + this.rotateGroupKey() + return this.useGroupKey() + } } - - return this.get(currentGroupKeyId) }, get(groupKeyId) { const groupKey = store.get(groupKeyId) @@ -127,7 +148,7 @@ function GroupKeyStore({ groupKeys = new Map() }) { }, clear() { currentGroupKeyId = undefined - nextGroupKey = undefined + nextGroupKeys.length = 0 return store.clear() }, rotateGroupKey() { @@ -138,7 +159,14 @@ function GroupKeyStore({ groupKeys = new Map() }) { }, setNextGroupKey(newKey) { GroupKey.validate(newKey) - nextGroupKey = newKey + nextGroupKeys.unshift(newKey) + nextGroupKeys.length = Math.min(nextGroupKeys.length, 2) + }, + rekey() { + const newKey = GroupKey.generate() + storeKey(newKey) + currentGroupKeyId = newKey.id + nextGroupKeys.length = 0 } } } @@ -215,7 +243,8 @@ async function PublisherKeyExhangeSubscription(client, getGroupKeyStore) { const subscriberId = streamMessage.getPublisherId() const groupKeyStore = getGroupKeyStore(streamId) - const encryptedGroupKeys = groupKeyIds.map((id) => { + const isSubscriber = await client.isStreamSubscriber(streamId, subscriberId) + const encryptedGroupKeys = !isSubscriber ? [] : groupKeyIds.map((id) => { const groupKey = groupKeyStore.get(id) if (!groupKey) { return null // will be filtered out @@ -316,9 +345,17 @@ export function PublisherKeyExhange(client, { groupKeys = {} } = {}) { return !groupKeyStore.isEmpty() } + async function rekey(streamId) { + if (!enabled) { return } + const groupKeyStore = getGroupKeyStore(streamId) + groupKeyStore.rekey() + await next() + } + return { setNextGroupKey, useGroupKey, + rekey, rotateGroupKey, hasAnyGroupKey, async start() { @@ -341,7 +378,7 @@ async function getGroupKeysFromStreamMessage(streamMessage, encryptionUtil) { async function SubscriberKeyExhangeSubscription(client, getGroupKeyStore, encryptionUtil) { let sub - async function onKeyExchangeMessage(parsedContent, streamMessage) { + async function onKeyExchangeMessage(_parsedContent, streamMessage) { try { const { messageType } = streamMessage const { MESSAGE_TYPES } = StreamMessage @@ -537,9 +574,9 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) { }) async function getGroupKey(streamMessage) { - if (!streamMessage.groupKeyId) { return undefined } + if (!streamMessage.groupKeyId) { return [] } await next() - if (!enabled) { return undefined } + if (!enabled) { return [] } return getKey(streamMessage) } @@ -549,6 +586,12 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) { enabled = true return next() }, + addNewKey(streamMessage) { + if (!streamMessage.newGroupKey) { return } + const streamId = streamMessage.getStreamId() + const groupKeyStore = getGroupKeyStore(streamId) + groupKeyStore.add(streamMessage.newGroupKey) + }, async stop() { enabled = false return next() diff --git a/src/subscribe/Decrypt.js b/src/subscribe/Decrypt.js index 25ab37f31..6e97f9f76 100644 --- a/src/subscribe/Decrypt.js +++ b/src/subscribe/Decrypt.js @@ -46,6 +46,7 @@ export default function Decrypt(client, options = {}) { throw new UnableToDecryptError(`Group key not found: ${streamMessage.groupKeyId}`, streamMessage) } await EncryptionUtil.decryptStreamMessage(streamMessage, groupKey) + requestKey.addNewKey(streamMessage) } catch (err) { await onError(err, streamMessage) } finally { diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index 40c79a84c..800fea9fb 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -29,28 +29,34 @@ export class Subscription extends Emitter { /** @internal */ client: StreamrClient /** @internal */ - options: Todo + options: ReturnType & { + id?: string + } /** @internal */ - key: Todo + key /** @internal */ - id: Todo + id /** @internal */ - _onDone: Todo + _onDone: ReturnType /** @internal */ - _onFinally: Todo + _onFinally /** @internal */ - pipeline: Todo + pipeline /** @internal */ - msgStream: Todo + msgStream /** @internal */ iterated?: Todo + /** @internal */ + debug constructor(client: StreamrClient, opts: Todo, onFinally = defaultOnFinally) { super() this.client = client this.options = validateOptions(opts) this.key = this.options.key - this.id = counterId(`Subscription.${this.options.id || ''}${this.key}`) + this.id = counterId(`Subscription:${this.options.id || ''}${this.key}`) + this.debug = client.debug.extend(this.id) + this.debug('create') this.streamId = this.options.streamId this.streamPartition = this.options.streamPartition @@ -75,14 +81,20 @@ export class Subscription extends Emitter { if (event !== 'error') { return super.emit(event, ...args) } + const [error] = args + if (!this.listenerCount('error')) { + this.debug('emitting error but no error listeners, cancelling subscription', error) + this.cancel(error) + return false + } try { - if (this.listenerCount('error')) { - // debugger - return super.emit('error', ...args) - } - throw args[0] + this.debug('emit error', error) + return super.emit('error', ...args) } catch (err) { + if (err !== error) { + this.debug('error emitting error!', err) + } this.cancel(err) return false } @@ -94,6 +106,7 @@ export class Subscription extends Emitter { */ async onPipelineEnd(err?: Error) { + this.debug('onPipelineEnd', err) let error = err try { await this._onFinally(error) @@ -185,11 +198,16 @@ function multiEmit(emitters: Todo, ...args: Todo[]) { */ class SubscriptionSession extends Emitter { - + id + debug client: StreamrClient - options: Todo - validate: Todo - subscriptions: Set + options: ReturnType & { + id: string + subscribe: typeof subscribe + unsubscribe: typeof unsubscribe + } + validate + subscriptions: Set deletedSubscriptions: Set step?: Todo _subscribe @@ -205,6 +223,9 @@ class SubscriptionSession extends Emitter { this.subscriptions = new Set() // active subs this.deletedSubscriptions = new Set() // hold so we can clean up + this.id = counterId(`SubscriptionSession:${this.options.id || ''}${this.options.key}`) + this.debug = this.client.debug.extend(this.id) + this.debug('create') this._init() } @@ -322,6 +343,12 @@ class SubscriptionSession extends Emitter { emit(...args: Todo[]) { const subs = this._getSubs() + if (args[0] === 'error') { + this.debug(args[0], args[1]) + } else { + this.debug(args[0]) + } + try { multiEmit(subs, ...args) } catch (error) { @@ -345,6 +372,7 @@ class SubscriptionSession extends Emitter { async add(sub: Todo) { this.subscriptions.add(sub) + this.debug('add', sub && sub.id) const { connection } = this.client await connection.addHandle(`adding${sub.id}`) try { @@ -366,11 +394,18 @@ class SubscriptionSession extends Emitter { return } + if (this.subscriptions.has(sub)) { + this.debug('remove', sub && sub.id) + } + const cancelTask = sub.cancel() - this.subscriptions.delete(sub) - this.deletedSubscriptions.add(sub) - await this.step() - await cancelTask + try { + this.subscriptions.delete(sub) + this.deletedSubscriptions.add(sub) + await this.step() + } finally { + await cancelTask + } } /** diff --git a/src/subscribe/pipeline.js b/src/subscribe/pipeline.js index 0beb14294..745d2e02f 100644 --- a/src/subscribe/pipeline.js +++ b/src/subscribe/pipeline.js @@ -39,6 +39,7 @@ export default function MessagePipeline(client, opts = {}, onFinally = async (er const seenErrors = new WeakSet() const onErrorFn = options.onError ? options.onError : (error) => { throw error } const onError = async (err) => { + // don't handle same error multiple times if (seenErrors.has(err)) { return } diff --git a/src/utils/index.ts b/src/utils/index.ts index 3af15be77..caf748c8b 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -272,7 +272,7 @@ export function Defer(executor: (...args: Parameters['then']>) => } } - function handleErrBack(err: Error) { + function handleErrBack(err?: Error) { if (err) { reject(err) } else { diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index b98c106ad..8822a9925 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -1,7 +1,7 @@ import { wait } from 'streamr-test-utils' import { MessageLayer } from 'streamr-client-protocol' -import { fakePrivateKey, uid, Msg, getPublishTestMessages } from '../utils' +import { describeRepeats, fakePrivateKey, uid, Msg, getPublishTestMessages } from '../utils' import { Defer } from '../../src/utils' import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/stream/Encryption' @@ -10,11 +10,11 @@ import { StorageNode } from '../../src/stream/StorageNode' import config from './config' -const TIMEOUT = 30 * 1000 +const TIMEOUT = 10 * 1000 const { StreamMessage } = MessageLayer -describe('decryption', () => { +describeRepeats('decryption', () => { let publishTestMessages let expectErrors = 0 // check no errors by default let errors = [] @@ -178,7 +178,71 @@ describe('decryption', () => { await onEncryptionMessageErr // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) + + it('changing group key injects group key into next stream message', async () => { + const done = Defer() + const msgs = [Msg(), Msg(), Msg()] + const received = [] + // subscribe without knowing the group key to decrypt stream messages + const sub = await client.subscribe({ + stream: stream.id, + }, done.wrapError((_parsedContent, streamMessage) => { + // Check signature stuff + received.push(streamMessage) + if (received.length === msgs.length) { + done.resolve() + } + })) + + sub.once('error', done.reject) + + const onEncryptionMessageErr = checkEncryptionMessages(client) + // id | groupKeyId | newGroupKey (encrypted by groupKeyId) + // msg1 gk2 - + // msg2 gk2 gk3 + // msg3 gk3 - + const groupKey1 = GroupKey.generate() + const groupKey2 = GroupKey.generate() + await client.setNextGroupKey(stream.id, groupKey1) + await client.publish(stream.id, msgs[0]) + await client.setNextGroupKey(stream.id, groupKey2) + await client.publish(stream.id, msgs[1]) + await client.publish(stream.id, msgs[2]) + await done + expect(received.map((m) => m.getParsedContent())).toEqual(msgs) + received.forEach((streamMessage, index) => { + expect(streamMessage.signatureType).toBe(StreamMessage.SIGNATURE_TYPES.ETH) + expect(streamMessage.getPublisherId()) + expect(streamMessage.signature) + switch (index) { + case 0: { + expect(streamMessage.newGroupKey).toEqual(null) + expect(streamMessage.groupKeyId).toEqual(groupKey1.id) + break + } + case 1: { + expect(streamMessage.newGroupKey).toEqual(groupKey2) + expect(streamMessage.groupKeyId).toEqual(groupKey1.id) + break + } + case 2: { + expect(streamMessage.newGroupKey).toEqual(null) + expect(streamMessage.groupKeyId).toEqual(groupKey2.id) + break + } + default: { + throw new Error(`should not get here: ${index}`) + } + + } + }) + + onEncryptionMessageErr.resolve() // will be ignored if errored + await onEncryptionMessageErr + // All good, unsubscribe + await client.unsubscribe(sub) + }, TIMEOUT) it('errors if rotating group key for no stream', async () => { expect(async () => ( @@ -242,7 +306,7 @@ describe('decryption', () => { await otherClient.logout() } } - }, 2 * TIMEOUT) + }, TIMEOUT) it('does not encrypt messages in stream without groupkey', async () => { const name = uid('stream') @@ -315,7 +379,7 @@ describe('decryption', () => { onEncryptionMessageErr.resolve() // will be ignored if errored await onEncryptionMessageErr expect(didFindStream2).toBeTruthy() - }, 2 * TIMEOUT) + }, TIMEOUT) it('sets group key per-stream', async () => { const name = uid('stream') @@ -388,7 +452,7 @@ describe('decryption', () => { ]) onEncryptionMessageErr.resolve() // will be ignored if errored await onEncryptionMessageErr - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.subscribe can get the group key and decrypt multiple encrypted messages using an RSA key pair', async () => { // subscribe without knowing the group key to decrypt stream messages @@ -412,7 +476,7 @@ describe('decryption', () => { // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('subscribe with changing group key', async () => { // subscribe without knowing the group key to decrypt stream messages @@ -438,7 +502,7 @@ describe('decryption', () => { // All good, unsubscribe await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.resend last can get the historical keys for previous encrypted messages', async () => { // Publish encrypted messages with different keys @@ -461,7 +525,7 @@ describe('decryption', () => { expect(received).toEqual(published.slice(-2)) await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('client.subscribe with resend last can get the historical keys for previous encrypted messages', async () => { // Publish encrypted messages with different keys @@ -490,7 +554,7 @@ describe('decryption', () => { expect(received).toEqual(published.slice(-2)) await client.unsubscribe(sub) - }, 2 * TIMEOUT) + }, TIMEOUT) it('fails gracefully if cannot decrypt', async () => { const MAX_MESSAGES = 10 @@ -552,4 +616,238 @@ describe('decryption', () => { expect(onSubError).toHaveBeenCalledTimes(1) }) + + describe('revoking permissions', () => { + let client2 + + beforeEach(async () => { + client2 = createClient({ id: 'subscriber' }) + await client2.connect() + await client2.session.getSessionToken() + client2.debug('new SUBSCRIBER') + }) + + afterEach(async () => { + await client2.disconnect() + }) + + it('fails gracefully if permission revoked with low cache maxAge', async () => { + await client.disconnect() + await setupClient({ + id: 'publisher', + cache: { + maxAge: 1, + } + }) + + const MAX_MESSAGES = 6 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 3 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 10000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('not a subscriber') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(1) + }) + + it('fails gracefully if permission revoked with low cache maxAge fail first message', async () => { + await client.disconnect() + await setupClient({ + id: 'publisher', + cache: { + maxAge: 1, + } + }) + const MAX_MESSAGES = 3 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + sub.debug('sub', sub.id) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 1 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 10000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('not a subscriber') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(1) + }) + + it('fails gracefully if permission revoked with high cache maxAge', async () => { + await client.disconnect() + await setupClient({ + cache: { + maxAge: 99999999, + } + }) + + const MAX_MESSAGES = 10 + await client.rotateGroupKey(stream.id) + + const p1 = await stream.grantPermission('stream_get', client2.getPublisherId()) + const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId()) + + const sub = await client2.subscribe({ + stream: stream.id, + }) + + const errs = [] + const onSubError = jest.fn((err) => { + errs.push(err) + throw err + }) + + sub.on('error', onSubError) + + const received = [] + // Publish after subscribed + let count = 0 + const REVOKE_AFTER = 6 + const gotMessages = Defer() + // do publish in background otherwise permission is revoked before subscriber starts processing + const publishTask = publishTestMessages(MAX_MESSAGES, { + timestamp: 1111111, + async afterEach() { + count += 1 + if (count === REVOKE_AFTER) { + await gotMessages + await stream.revokePermission(p1.id) + await stream.revokePermission(p2.id) + await client.rekey(stream.id) + } + } + }) + + let t + await expect(async () => { + for await (const m of sub) { + received.push(m.getParsedContent()) + if (received.length === REVOKE_AFTER) { + gotMessages.resolve() + clearTimeout(t) + t = setTimeout(() => { + sub.cancel() + }, 2000) + } + + if (received.length === MAX_MESSAGES) { + clearTimeout(t) + break + } + } + }).rejects.toThrow('decrypt') + clearTimeout(t) + const published = await publishTask + + expect(received).toEqual([ + ...published.slice(0, REVOKE_AFTER), + ]) + + expect(onSubError).toHaveBeenCalledTimes(1) + }) + }) }) diff --git a/test/integration/MessagePipeline.test.js b/test/integration/MessagePipeline.test.js index 54f807498..2e6aa6403 100644 --- a/test/integration/MessagePipeline.test.js +++ b/test/integration/MessagePipeline.test.js @@ -177,8 +177,7 @@ describe('MessagePipeline', () => { const onPipelineError = jest.fn((err) => { throw err }) - const msgStream = new PushQueue([ - ]) + const msgStream = new PushQueue([]) const p = MessagePipeline(client, { ...MOCK_INVALID_GROUP_KEY_MESSAGE.messageId, msgStream, diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 101f0f5b6..6c55c5e72 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -419,9 +419,10 @@ describeRepeats('PubSub with multiple clients', () => { const mainSub = await mainClient.subscribe({ stream: stream.id, }, (msg, streamMessage) => { - const msgs = receivedMessagesMain[streamMessage.getPublisherId().toLowerCase()] || [] + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] msgs.push(msg) - receivedMessagesMain[streamMessage.getPublisherId().toLowerCase()] = msgs + receivedMessagesMain[key] = msgs if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { mainSub.unsubscribe() } @@ -449,6 +450,7 @@ describeRepeats('PubSub with multiple clients', () => { addAfter(() => { counterId.clear(publisherId) // prevent overflows in counter }) + const publishTestMessages = getPublishTestMessages(pubClient, { stream, waitForLast: true, @@ -468,9 +470,10 @@ describeRepeats('PubSub with multiple clients', () => { last: 1000, } }, (msg, streamMessage) => { - const msgs = receivedMessagesOther[streamMessage.getPublisherId().toLowerCase()] || [] + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] msgs.push(msg) - receivedMessagesOther[streamMessage.getPublisherId().toLowerCase()] = msgs + receivedMessagesOther[key] = msgs }) addAfter(async () => { @@ -478,11 +481,9 @@ describeRepeats('PubSub with multiple clients', () => { }) } - await publishTestMessages(MAX_MESSAGES, { + published[publisherId] = await publishTestMessages(MAX_MESSAGES, { waitForLast: true, - async afterEach(pubMsg, req) { - published[publisherId] = published[publisherId] || [] - published[publisherId].push(pubMsg) + async afterEach(_pubMsg, req) { counter += 1 if (counter === 3) { await waitForStorage(req) // make sure lastest message has hit storage @@ -500,17 +501,194 @@ describeRepeats('PubSub with multiple clients', () => { } catch (err) { return false } - }, 15000).catch((err) => { + }, 15000, 300).catch((err) => { // convert timeout to actual error checkMessages(published, receivedMessagesMain) checkMessages(published, receivedMessagesOther) throw err }) + }, 60000) + }) + + test('works with multiple publishers on one stream', async () => { + await mainClient.session.getSessionToken() + await mainClient.connect() + + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() + + const receivedMessagesOther = {} + const receivedMessagesMain = {} + // subscribe to stream from other client instance + await otherClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] + msgs.push(msg) + receivedMessagesOther[key] = msgs + }) + // subscribe to stream from main client instance + await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] + msgs.push(msg) + receivedMessagesMain[key] = msgs + }) + + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } + + /* eslint-enable no-await-in-loop */ + const published = {} + await Promise.all(publishers.map(async (pubClient) => { + const publisherId = (await pubClient.getPublisherId()).toLowerCase() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + }) + await publishTestMessages(10, { + delay: 500 + Math.random() * 1500, + afterEach(msg) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(msg) + } + }) + })) + + await waitForCondition(() => { + try { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + return true + } catch (err) { + return false + } + }, 5000).catch(() => { checkMessages(published, receivedMessagesMain) checkMessages(published, receivedMessagesOther) - }, 60000) - }) + }) + + }, 40000) + + test('works with multiple publishers on one stream with late subscriber', async () => { + const published = {} + await mainClient.session.getSessionToken() + await mainClient.connect() + + otherClient = createClient({ + auth: { + privateKey + } + }) + otherClient.on('error', getOnError(errors)) + await otherClient.session.getSessionToken() + const otherUser = await otherClient.getUserInfo() + await stream.grantPermission('stream_get', otherUser.username) + await stream.grantPermission('stream_subscribe', otherUser.username) + await otherClient.connect() + + const receivedMessagesOther = {} + const receivedMessagesMain = {} + + // subscribe to stream from main client instance + const mainSub = await mainClient.subscribe({ + stream: stream.id, + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesMain[key] || [] + msgs.push(msg) + receivedMessagesMain[key] = msgs + if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) { + mainSub.cancel() + } + }) + + /* eslint-disable no-await-in-loop */ + const publishers = [] + for (let i = 0; i < 3; i++) { + publishers.push(await createPublisher()) + } + + let counter = 0 + /* eslint-enable no-await-in-loop */ + await Promise.all(publishers.map(async (pubClient) => { + const waitForStorage = getWaitForStorage(pubClient, { + stream, + timeout: 10000, + count: MAX_MESSAGES * publishers.length, + }) + + const publisherId = (await pubClient.getPublisherId()).toString().toLowerCase() + const publishTestMessages = getPublishTestMessages(pubClient, { + stream, + waitForLast: true, + waitForLastTimeout: 10000, + waitForLastCount: MAX_MESSAGES * publishers.length, + delay: 500 + Math.random() * 1500, + }) + + async function addLateSubscriber() { + // late subscribe to stream from other client instance + const lateSub = await otherClient.subscribe({ + stream: stream.id, + resend: { + last: 1000, + } + }, (msg, streamMessage) => { + const key = streamMessage.getPublisherId().toLowerCase() + const msgs = receivedMessagesOther[key] || [] + msgs.push(msg) + receivedMessagesOther[key] = msgs + }) + + addAfter(async () => { + await lateSub.unsubscribe() + }) + } + + await publishTestMessages(MAX_MESSAGES, { + async afterEach(pubMsg, req) { + published[publisherId] = published[publisherId] || [] + published[publisherId].push(pubMsg) + counter += 1 + if (counter === 3) { + await waitForStorage(req) // make sure lastest message has hit storage + // late subscribe to stream from other client instance + await addLateSubscriber() + } + } + }) + })) + + await waitForCondition(() => { + try { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + return true + } catch (err) { + return false + } + }, 15000, 300).catch(() => { + checkMessages(published, receivedMessagesMain) + checkMessages(published, receivedMessagesOther) + }) + }, 60000) test('disconnecting one client does not disconnect the other', async () => { otherClient = createClient({