From 981f6b583058b86b082c60927072a4a0766c4ec4 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Sat, 12 Sep 2020 11:36:51 -0400 Subject: [PATCH 1/6] Promisify subscribe/unsubscribe. --- src/Subscriber.js | 42 +- src/Subscription.js | 39 ++ test/integration/MultipleClients.test.js | 20 +- test/integration/ResendReconnect.test.js | 4 +- test/integration/Resends.test.js | 35 +- test/integration/StreamrClient.test.js | 152 ++---- test/integration/Subscription.test.js | 122 ++--- test/unit/StreamrClient.test.js | 656 +++++++++++------------ 8 files changed, 521 insertions(+), 549 deletions(-) diff --git a/src/Subscriber.js b/src/Subscriber.js index 5eb760ee4..43d62b822 100644 --- a/src/Subscriber.js +++ b/src/Subscriber.js @@ -111,7 +111,16 @@ export default class Subscriber { } } - subscribe(optionsOrStreamId, callback, legacyOptions) { + async subscribe(...args) { + const sub = this.createSubscription(...args) + await Promise.all([ + sub.waitForSubscribed(), + this._resendAndSubscribe(sub), + ]) + return sub + } + + createSubscription(optionsOrStreamId, callback, legacyOptions) { const options = this._validateParameters(optionsOrStreamId, callback) // Backwards compatibility for giving an options object as third argument @@ -176,13 +185,6 @@ export default class Subscriber { // Add to lookups this._addSubscription(sub) - // If connected, emit a subscribe request - if (this.client.isConnected()) { - this._resendAndSubscribe(sub).catch(this.onErrorEmit) - } else if (this.client.options.autoConnect) { - this.client.ensureConnected() - } - return sub } @@ -191,6 +193,21 @@ export default class Subscriber { throw new Error('unsubscribe: please give a Subscription object as an argument!') } + if (sub.getState() === Subscription.State.unsubscribed) { + return Promise.resolve() + } + + return Promise.all([ + new Promise((resolve) => sub.once('unsubscribed', resolve)), + this._sendUnsubscribe(sub) + ]).then(() => Promise.resolve()) + } + + async _sendUnsubscribe(sub) { + if (!sub || !sub.streamId) { + throw new Error('unsubscribe: please give a Subscription object as an argument!') + } + const { streamId, streamPartition, id } = sub const info = { id, @@ -323,16 +340,16 @@ export default class Subscriber { if (subscribedSubs.length) { // If there already is a subscribed subscription for this stream, this new one will just join it immediately this.debug('_requestSubscribe: another subscription for same stream: %s, insta-subscribing', sub.streamId) - - setTimeout(() => { - sub.setState(Subscription.State.subscribed) - }) + await true // wait a tick + sub.setState(Subscription.State.subscribed) return } } const sessionToken = await this.client.session.getSessionToken() + // this should come after an async call e.g. getSessionToken + // so only one parallel call will send the subscription request if (sp.isSubscribing()) { return } @@ -344,6 +361,7 @@ export default class Subscriber { sessionToken, requestId: this.client.resender.resendUtil.generateRequestId(), }) + sp.setSubscribing(true) this.debug('_requestSubscribe: subscribing client: %o', request) await this.client.send(request).catch((err) => { diff --git a/src/Subscription.js b/src/Subscription.js index 0f2d9ec98..126b67029 100644 --- a/src/Subscription.js +++ b/src/Subscription.js @@ -41,6 +41,45 @@ export default class Subscription extends EventEmitter { this.state = Subscription.State.unsubscribed } + async waitForSubscribed() { + if (this._subscribedPromise) { + return this._subscribedPromise + } + + const subscribedPromise = new Promise((resolve, reject) => { + if (this.state === Subscription.State.subscribed) { + resolve() + return + } + let onError + const onSubscribed = () => { + this.off('error', onError) + resolve() + } + onError = (err) => { + this.off('subscribed', onSubscribed) + reject(err) + } + + const onUnsubscribed = () => { + if (this._subscribedPromise === subscribedPromise) { + this._subscribedPromise = undefined + } + } + + this.once('subscribed', onSubscribed) + this.once('unsubscribed', onUnsubscribed) + this.once('error', reject) + }).then(() => this).finally(() => { + if (this._subscribedPromise === subscribedPromise) { + this._subscribedPromise = undefined + } + }) + + this._subscribedPromise = subscribedPromise + return this._subscribedPromise + } + emit(event, ...args) { this.debug('emit', event) return super.emit(event, ...args) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 92b30d8ec..8dbf29912 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -75,20 +75,16 @@ describe('PubSub with multiple clients', () => { const receivedMessagesOther = [] const receivedMessagesMain = [] // subscribe to stream from other client instance - await new Promise((resolve) => { - otherClient.subscribe({ - stream: stream.id, - }, (msg) => { - receivedMessagesOther.push(msg) - }).once('subscribed', resolve) + await otherClient.subscribe({ + stream: stream.id, + }, (msg) => { + receivedMessagesOther.push(msg) }) // subscribe to stream from main client instance - await new Promise((resolve) => { - mainClient.subscribe({ - stream: stream.id, - }, (msg) => { - receivedMessagesMain.push(msg) - }).once('subscribed', resolve) + await mainClient.subscribe({ + stream: stream.id, + }, (msg) => { + receivedMessagesMain.push(msg) }) const message = { msg: uid('message'), diff --git a/test/integration/ResendReconnect.test.js b/test/integration/ResendReconnect.test.js index 0d79942a5..c324dac11 100644 --- a/test/integration/ResendReconnect.test.js +++ b/test/integration/ResendReconnect.test.js @@ -57,8 +57,8 @@ describe('resend/reconnect', () => { describe('reconnect after resend', () => { let sub let messages = [] - beforeEach((done) => { - sub = client.subscribe({ + beforeEach(async (done) => { + sub = await client.subscribe({ stream: stream.id, resend: { last: MAX_MESSAGES, diff --git a/test/integration/Resends.test.js b/test/integration/Resends.test.js index 1bba64859..7e84cf595 100644 --- a/test/integration/Resends.test.js +++ b/test/integration/Resends.test.js @@ -26,7 +26,7 @@ describe('StreamrClient resends', () => { beforeEach(async () => { client = createClient() - await client.ensureConnected() + await client.connect() publishedMessages = [] @@ -69,7 +69,7 @@ describe('StreamrClient resends', () => { resentMessages.push(message) }) - client.subscribe({ + await client.subscribe({ stream: stream.id, }, (message) => { realtimeMessages.push(message) @@ -92,7 +92,7 @@ describe('StreamrClient resends', () => { msg: uid('realtimeMessage'), } - client.subscribe({ + await client.subscribe({ stream: stream.id, }, (message) => { realtimeMessages.push(message) @@ -217,32 +217,25 @@ describe('StreamrClient resends', () => { const receivedMessages = [] // eslint-disable-next-line no-await-in-loop - const sub = client.subscribe( - { - stream: stream.id, - resend: { - last: MAX_MESSAGES, - }, - }, - (message) => { - receivedMessages.push(message) + const sub = await client.subscribe({ + stream: stream.id, + resend: { + last: MAX_MESSAGES, }, - ) - - // eslint-disable-next-line no-loop-func - sub.once('resent', () => { - expect(receivedMessages).toStrictEqual(publishedMessages) + }, (message) => { + receivedMessages.push(message) }) - // eslint-disable-next-line no-await-in-loop - await waitForCondition(() => receivedMessages.length === MAX_MESSAGES, 10000) + // eslint-disable-next-line no-loop-func + await waitForEvent(sub, 'resent') + expect(receivedMessages).toStrictEqual(publishedMessages) }, 10000) } it('resend last using subscribe and publish messages after resend', async () => { const receivedMessages = [] - client.subscribe({ + await client.subscribe({ stream: stream.id, resend: { last: MAX_MESSAGES, @@ -273,7 +266,7 @@ describe('StreamrClient resends', () => { it('resend last using subscribe and publish realtime messages', async () => { const receivedMessages = [] - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, resend: { last: MAX_MESSAGES, diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index f074b14c6..55d68e9e6 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -505,7 +505,7 @@ describe('StreamrClient', () => { }).rejects.toThrow() }, 5000) - it('should not subscribe to unsubscribed streams on reconnect', async (done) => { + it('should not subscribe to unsubscribed streams on reconnect', async () => { client = createClient() await client.connect() const sessionToken = await client.session.getSessionToken() @@ -515,37 +515,29 @@ describe('StreamrClient', () => { }) const connectionEventSpy = jest.spyOn(client.connection, '_send') - const sub = client.subscribe(stream.id, () => {}) - - sub.once('subscribed', async () => { - await wait(100) - await client.unsubscribe(sub) - }) - - sub.once('unsubscribed', async () => { - await client.disconnect() - await client.connect() - await client.disconnect() - // key exchange stream subscription should not have been sent yet - expect(connectionEventSpy.mock.calls).toHaveLength(2) - - // check whole list of calls after reconnect and disconnect - expect(connectionEventSpy.mock.calls[0]).toEqual([new SubscribeRequest({ - streamId: stream.id, - streamPartition: 0, - sessionToken, - requestId: connectionEventSpy.mock.calls[0][0].requestId, - })]) - - expect(connectionEventSpy.mock.calls[1]).toEqual([new UnsubscribeRequest({ - streamId: stream.id, - streamPartition: 0, - sessionToken, - requestId: connectionEventSpy.mock.calls[1][0].requestId, - })]) - - done() - }) + const sub = await client.subscribe(stream.id, () => {}) + await wait(100) + await client.unsubscribe(sub) + await client.disconnect() + await client.connect() + await client.disconnect() + // key exchange stream subscription should not have been sent yet + expect(connectionEventSpy.mock.calls).toHaveLength(2) + + // check whole list of calls after reconnect and disconnect + expect(connectionEventSpy.mock.calls[0]).toEqual([new SubscribeRequest({ + streamId: stream.id, + streamPartition: 0, + sessionToken, + requestId: connectionEventSpy.mock.calls[0][0].requestId, + })]) + + expect(connectionEventSpy.mock.calls[1]).toEqual([new UnsubscribeRequest({ + streamId: stream.id, + streamPartition: 0, + sessionToken, + requestId: connectionEventSpy.mock.calls[1][0].requestId, + })]) }) it('should not subscribe after resend() on reconnect', async (done) => { @@ -699,18 +691,16 @@ describe('StreamrClient', () => { name: uid('stream') }) - const sub = client.subscribe({ + await client.subscribe({ stream: stream.id, }, () => {}) - sub.once('subscribed', async () => { - await client.disconnect() - // wait in case of delayed errors - setTimeout(() => { - expect(client.onError).not.toHaveBeenCalled() - done() - }, 100) - }) + await client.disconnect() + // wait in case of delayed errors + setTimeout(() => { + expect(client.onError).not.toHaveBeenCalled() + done() + }, 100) }) it('does not error if disconnect after subscribe with resend', async (done) => { @@ -724,7 +714,7 @@ describe('StreamrClient', () => { name: uid('stream') }) - const sub = client.subscribe({ + await client.subscribe({ stream: stream.id, resend: { from: { @@ -733,14 +723,12 @@ describe('StreamrClient', () => { }, }, () => {}) - sub.once('subscribed', async () => { - await client.disconnect() - // wait in case of delayed errors - setTimeout(() => { - expect(client.onError).not.toHaveBeenCalled() - done() - }, 100) - }) + await client.disconnect() + // wait in case of delayed errors + setTimeout(() => { + expect(client.onError).not.toHaveBeenCalled() + done() + }, 100) }) }) }) @@ -823,27 +811,21 @@ describe('StreamrClient', () => { it('client.subscribe then unsubscribe after subscribed without resend', async () => { expect(client.getSubscriptions(stream.id)).toHaveLength(0) - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, }, () => {}) - const onSubscribed = jest.fn() - sub.on('subscribed', onSubscribed) const onUnsubscribed = jest.fn() sub.on('unsubscribed', onUnsubscribed) expect(client.getSubscriptions(stream.id)).toHaveLength(1) // has subscription immediately - await new Promise((resolve) => sub.once('subscribed', resolve)) expect(client.getSubscriptions(stream.id)).toHaveLength(1) - const t = new Promise((resolve) => sub.once('unsubscribed', resolve)) await client.unsubscribe(sub) - await t expect(client.getSubscriptions(stream.id)).toHaveLength(0) - expect(onSubscribed).toHaveBeenCalledTimes(1) expect(onUnsubscribed).toHaveBeenCalledTimes(1) }, TIMEOUT) - it('client.subscribe then unsubscribe before subscribed without resend', async () => { + it.skip('client.subscribe then unsubscribe before subscribed without resend', async () => { expect(client.getSubscriptions(stream.id)).toHaveLength(0) const sub = client.subscribe({ @@ -863,7 +845,7 @@ describe('StreamrClient', () => { expect(onUnsubscribed).toHaveBeenCalledTimes(1) }, TIMEOUT) - it('client.subscribe then unsubscribe before subscribed with resend', async () => { + it.skip('client.subscribe then unsubscribe before subscribed with resend', async () => { expect(client.getSubscriptions(stream.id)).toHaveLength(0) const sub = client.subscribe({ @@ -888,7 +870,7 @@ describe('StreamrClient', () => { expect(onUnsubscribed).toHaveBeenCalledTimes(1) }, TIMEOUT) - it('client.subscribe then unsubscribe before subscribed with resend', async () => { + it.skip('client.subscribe then unsubscribe before subscribed with resend', async () => { expect(client.getSubscriptions(stream.id)).toHaveLength(0) const sub = client.subscribe({ @@ -923,7 +905,7 @@ describe('StreamrClient', () => { expect(client.getSubscriptions(stream.id)).toHaveLength(0) const onMessage = jest.fn() - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, }, onMessage) @@ -936,7 +918,6 @@ describe('StreamrClient', () => { sub.on('no_resend', onNoResend) const onUnsubscribed = jest.fn() sub.on('unsubscribed', onUnsubscribed) - await new Promise((resolve) => sub.once('subscribed', resolve)) const msg = { name: uid('msg') } @@ -948,7 +929,7 @@ describe('StreamrClient', () => { expect(onResent).toHaveBeenCalledTimes(0) expect(onMessage).toHaveBeenCalledTimes(0) expect(onNoResend).toHaveBeenCalledTimes(0) - expect(onSubscribed).toHaveBeenCalledTimes(1) + expect(onSubscribed).toHaveBeenCalledTimes(0) expect(onUnsubscribed).toHaveBeenCalledTimes(1) }, TIMEOUT) @@ -960,7 +941,7 @@ describe('StreamrClient', () => { await wait(TIMEOUT * 0.5) const onMessage = jest.fn() - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, resend: { from: { @@ -978,26 +959,19 @@ describe('StreamrClient', () => { sub.on('no_resend', onNoResend) const onUnsubscribed = jest.fn() sub.on('unsubscribed', onUnsubscribed) - client.debug(1) - await new Promise((resolve) => sub.once('subscribed', resolve)) - client.debug(2) - const t = new Promise((resolve) => sub.once('unsubscribed', resolve)) await client.unsubscribe(sub) - client.debug(3) - await t - client.debug(4) expect(client.getSubscriptions(stream.id)).toHaveLength(0) // lost subscription immediately expect(onResent).toHaveBeenCalledTimes(0) expect(onMessage).toHaveBeenCalledTimes(0) expect(onNoResend).toHaveBeenCalledTimes(0) - expect(onSubscribed).toHaveBeenCalledTimes(1) + expect(onSubscribed).toHaveBeenCalledTimes(0) expect(onUnsubscribed).toHaveBeenCalledTimes(1) }, TIMEOUT * 2) }) it('client.subscribe (realtime)', async (done) => { const id = Date.now() - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, }, async (parsedContent, streamMessage) => { expect(parsedContent.id).toBe(id) @@ -1013,10 +987,8 @@ describe('StreamrClient', () => { }) // Publish after subscribed - sub.once('subscribed', () => { - stream.publish({ - id, - }) + await stream.publish({ + id, }) }) @@ -1025,7 +997,7 @@ describe('StreamrClient', () => { const nbMessages = 3 const intervalMs = 100 let counter = 0 - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, }, async (parsedContent, streamMessage) => { expect(parsedContent.i).toBe(counter) @@ -1051,7 +1023,6 @@ describe('StreamrClient', () => { }) // Publish after subscribed - await new Promise((resolve) => sub.once('subscribed', resolve)) for (let i = 0; i < nbMessages; i++) { // eslint-disable-next-line no-await-in-loop await wait(intervalMs) @@ -1073,7 +1044,7 @@ describe('StreamrClient', () => { // Add delay: this test needs some time to allow the message to be written to Cassandra await wait(TIMEOUT * 0.8) - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, resend: { from: { @@ -1096,9 +1067,7 @@ describe('StreamrClient', () => { expect(streamMessage.signature).toBeTruthy() // All good, unsubscribe - const t = new Promise((resolve) => sub.once('unsubscribed', resolve)) await client.unsubscribe(sub) - await t expect(client.getSubscriptions(stream.id)).toHaveLength(0) done() }) @@ -1116,7 +1085,7 @@ describe('StreamrClient', () => { // Add delay: this test needs some time to allow the message to be written to Cassandra await wait(TIMEOUT * 0.7) - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, resend: { last: 1, @@ -1137,17 +1106,15 @@ describe('StreamrClient', () => { expect(streamMessage.signature).toBeTruthy() // All good, unsubscribe - const t = new Promise((resolve) => sub.once('unsubscribed', resolve)) await client.unsubscribe(sub) - await t expect(client.getSubscriptions(stream.id)).toHaveLength(0) done() }) }, TIMEOUT) - it('client.subscribe (realtime with resend)', (done) => { + it('client.subscribe (realtime with resend)', async (done) => { const id = Date.now() - const sub = client.subscribe({ + const sub = await client.subscribe({ stream: stream.id, resend: { last: 1, @@ -1166,10 +1133,8 @@ describe('StreamrClient', () => { }) // Publish after subscribed - sub.once('subscribed', () => { - stream.publish({ - id, - }) + await stream.publish({ + id, }) }, 30000) }) @@ -1181,12 +1146,11 @@ describe('StreamrClient', () => { it('decodes realtime messages correctly', async (done) => { client.once('error', done) - client.subscribe(stream.id, (msg) => { + await client.subscribe(stream.id, (msg) => { expect(msg).toStrictEqual(publishedMessage) done() - }).once('subscribed', () => { - client.publish(stream.id, publishedMessage) }) + await client.publish(stream.id, publishedMessage) }) it('decodes resent messages correctly', async (done) => { diff --git a/test/integration/Subscription.test.js b/test/integration/Subscription.test.js index 7482dff1e..17c7d6a5b 100644 --- a/test/integration/Subscription.test.js +++ b/test/integration/Subscription.test.js @@ -1,5 +1,5 @@ import { ethers } from 'ethers' -import { wait } from 'streamr-test-utils' +import { wait, waitForEvent } from 'streamr-test-utils' import { uid } from '../utils' import StreamrClient from '../../src' @@ -16,8 +16,6 @@ const createClient = (opts = {}) => new StreamrClient({ ...opts, }) -const throwError = (error) => { throw error } - const RESEND_ALL = { from: { timestamp: 0, @@ -28,31 +26,11 @@ describe('Subscription', () => { let stream let client let subscription + let errors = [] + let expectedErrors = 0 - async function setup() { - client = createClient() - client.on('error', throwError) - stream = await client.createStream({ - name: uid('stream') - }) - } - - async function teardown() { - if (subscription) { - await client.unsubscribe(subscription) - subscription = undefined - } - - if (stream) { - await stream.delete() - stream = undefined - } - - if (client) { - client.off('error', throwError) - await client.disconnect() - client = undefined - } + function onError(err) { + errors.push(err) } /** @@ -60,10 +38,10 @@ describe('Subscription', () => { * Needs to create subscription at same time in order to track message events. */ - function createMonitoredSubscription(opts = {}) { + async function createMonitoredSubscription(opts = {}) { if (!client) { throw new Error('No client') } const events = [] - subscription = client.subscribe({ + subscription = await client.subscribe({ stream: stream.id, resend: RESEND_ALL, ...opts, @@ -88,69 +66,63 @@ describe('Subscription', () => { } beforeEach(async () => { - await teardown() - await setup() + errors = [] + expectedErrors = 0 + client = createClient() + client.on('error', onError) + stream = await client.createStream({ + name: uid('stream') + }) + await client.connect() }) afterEach(async () => { - await teardown() + expect(errors).toHaveLength(expectedErrors) }) - describe('subscribe/unsubscribe events', () => { - it('fires events in correct order', async (done) => { - const subscriptionEvents = createMonitoredSubscription() - subscription.on('subscribed', async () => { - subscription.on('unsubscribed', () => { - expect(subscriptionEvents).toEqual([ - 'subscribed', - 'unsubscribed', - ]) - done() - }) - await client.unsubscribe(subscription) - }) + afterEach(async () => { + if (!client) { return } + client.off('error', onError) + client.debug('disconnecting after test') + await client.disconnect() + }) - await client.connect() + describe('subscribe/unsubscribe events', () => { + it('fires events in correct order 1', async () => { + const subscriptionEvents = await createMonitoredSubscription() + await waitForEvent(subscription, 'no_resend') + await client.unsubscribe(subscription) + expect(subscriptionEvents).toEqual([ + 'no_resend', + 'unsubscribed', + ]) }) }) describe('resending/no_resend events', () => { - it('fires events in correct order', async (done) => { - const subscriptionEvents = createMonitoredSubscription() - subscription.on('no_resend', async () => { - await wait(0) - expect(subscriptionEvents).toEqual([ - 'subscribed', - 'no_resend', - ]) - done() - }) - - await client.connect() + it('fires events in correct order 2', async () => { + const subscriptionEvents = await createMonitoredSubscription() + await waitForEvent(subscription, 'no_resend') + expect(subscriptionEvents).toEqual([ + 'no_resend', + ]) }) }) describe('resending/resent events', () => { - it('fires events in correct order', async (done) => { - await client.connect() + it('fires events in correct order 3', async () => { const message1 = await publishMessage() const message2 = await publishMessage() await wait(5000) // wait for messages to (probably) land in storage - const subscriptionEvents = createMonitoredSubscription() - subscription.on('resent', async () => { - await wait(500) // wait in case messages appear after resent event - expect(subscriptionEvents).toEqual([ - 'subscribed', - 'resending', - message1, - message2, - 'resent', - ]) - done() - }) - subscription.on('no_resend', () => { - done('error: got no_resend, expected: resent') - }) + const subscriptionEvents = await createMonitoredSubscription() + await waitForEvent(subscription, 'resent') + await wait(500) // wait in case messages appear after resent event + expect(subscriptionEvents).toEqual([ + 'resending', + message1, + message2, + 'resent', + ]) }, 20 * 1000) }) }) diff --git a/test/unit/StreamrClient.test.js b/test/unit/StreamrClient.test.js index 8d73f834d..75bfd352a 100644 --- a/test/unit/StreamrClient.test.js +++ b/test/unit/StreamrClient.test.js @@ -1,7 +1,7 @@ import sinon from 'sinon' import { Wallet } from 'ethers' import { ControlLayer, MessageLayer, Errors } from 'streamr-client-protocol' -import { wait } from 'streamr-test-utils' +import { wait, waitForEvent } from 'streamr-test-utils' import FailedToPublishError from '../../src/errors/FailedToPublishError' import Subscription from '../../src/Subscription' @@ -106,14 +106,13 @@ describe('StreamrClient', () => { errors.push(error) } - function mockSubscription(...opts) { - let sub + async function mockSubscription(...opts) { connection._send = jest.fn(async (request) => { requests.push(request) await wait() if (request.type === ControlMessage.TYPES.SubscribeRequest) { connection.emitMessage(new SubscribeResponse({ - streamId: sub.streamId, + streamId: request.streamId, requestId: request.requestId, streamPartition: request.streamPartition, })) @@ -121,14 +120,13 @@ describe('StreamrClient', () => { if (request.type === ControlMessage.TYPES.UnsubscribeRequest) { connection.emitMessage(new UnsubscribeResponse({ - streamId: sub.streamId, + streamId: request.streamId, requestId: request.requestId, streamPartition: request.streamPartition, })) } }) - sub = client.subscribe(...opts).on('error', onError) - return sub + return client.subscribe(...opts) } const STORAGE_DELAY = 2000 @@ -171,23 +169,24 @@ describe('StreamrClient', () => { }) describe('connecting behaviour', () => { - it('connected event should emit an event on client', (done) => { + it('connected event should emit an event on client', async (done) => { client.once('connected', () => { done() }) - client.connect() + await client.connect() }) it('should not send anything if not subscribed to anything', async () => { - await client.ensureConnected() + await client.connect() expect(connection._send).not.toHaveBeenCalled() }) it('should send pending subscribes', async () => { - client.subscribe('stream1', () => {}).on('error', onError) + const t = mockSubscription('stream1', () => {}) - await client.ensureConnected() + await client.connect() await wait() + await t expect(connection._send.mock.calls).toHaveLength(1) expect(connection._send.mock.calls[0][0]).toMatchObject({ streamId: 'stream1', @@ -196,12 +195,39 @@ describe('StreamrClient', () => { }) }) - it.skip('should send pending subscribes when disconnected and then reconnected', async () => { - client.subscribe('stream1', () => {}).on('error', onError) - await client.ensureConnected() + it('should reconnect subscriptions when connection disconnected before subscribed & reconnected', async () => { + await client.connect() + let subscribed = false + const t = mockSubscription('stream1', () => {}).then((v) => { + subscribed = true + return v + }) connection.socket.close() - await client.ensureConnected() - await wait(100) + expect(subscribed).toBe(false) // shouldn't have subscribed yet + // no connect necessary should connect and subscribe + await t + expect(connection._send.mock.calls).toHaveLength(2) + // On connect + expect(connection._send.mock.calls[0][0]).toMatchObject({ + streamId: 'stream1', + streamPartition, + sessionToken, + }) + + // On reconnect + expect(connection._send.mock.calls[1][0]).toMatchObject({ + streamId: 'stream1', + streamPartition, + sessionToken, + }) + }) + + it('should re-subscribe when subscribed then reconnected', async () => { + await client.connect() + await mockSubscription('stream1', () => {}) + connection.socket.close() + await client.nextConnection() + // no connect necessary should auto-reconnect and subscribe expect(connection._send.mock.calls).toHaveLength(2) // On connect expect(connection._send.mock.calls[0][0]).toMatchObject({ @@ -220,138 +246,123 @@ describe('StreamrClient', () => { // TODO convert and move all super mocked tests to integration }) + describe('promise subscribe behaviour', () => { + beforeEach(async () => client.connect()) + + it('works', async () => { + const sub = await mockSubscription('stream1', () => {}) + expect(sub).toBeTruthy() + expect(sub.streamId).toBe('stream1') + await client.unsubscribe(sub) + expect(client.getSubscriptions(sub.streamId)).toEqual([]) + }) + }) + describe('disconnection behaviour', () => { beforeEach(async () => client.connect()) it('emits disconnected event on client', async (done) => { - client.once('disconnected', done) + client.once('disconnected', () => done()) await connection.disconnect() }) it('removes subscriptions', async () => { - const sub = mockSubscription('stream1', () => {}) - await new Promise((resolve) => sub.once('subscribed', resolve)) + const sub = await mockSubscription('stream1', () => {}) await client.disconnect() expect(client.getSubscriptions(sub.streamId)).toEqual([]) }) it('does not remove subscriptions if disconnected accidentally', async () => { - const sub = client.subscribe('stream1', () => {}) + const sub = await mockSubscription('stream1', () => {}) client.connection.socket.close() - await new Promise((resolve) => client.once('disconnected', resolve)) + await waitForEvent(client, 'disconnected') expect(client.getSubscriptions(sub.streamId)).toEqual([sub]) expect(sub.getState()).toEqual(Subscription.State.unsubscribed) await client.connect() expect(client.getSubscriptions(sub.streamId)).toEqual([sub]) + // re-subscribes + expect(sub.getState()).toEqual(Subscription.State.subscribing) }) it('sets subscription state to unsubscribed', async () => { - const sub = mockSubscription('stream1', () => {}) - await new Promise((resolve) => sub.once('subscribed', resolve)) + const sub = await mockSubscription('stream1', () => {}) await connection.disconnect() expect(sub.getState()).toEqual(Subscription.State.unsubscribed) }) }) describe('SubscribeResponse', () => { - beforeEach(async () => client.ensureConnected()) + beforeEach(async () => client.connect()) - it('marks Subscriptions as subscribed', async (done) => { - const sub = mockSubscription('stream1', () => {}) - sub.once('subscribed', () => { - expect(sub.getState()).toEqual(Subscription.State.subscribed) - done() - }) + it('marks Subscriptions as subscribed', async () => { + const sub = await mockSubscription('stream1', () => {}) + expect(sub.getState()).toEqual(Subscription.State.subscribed) }) - it('generates a requestId without resend', (done) => { - const sub = mockSubscription({ + it('generates a requestId without resend', async () => { + await mockSubscription({ stream: 'stream1', }, () => {}) - sub.once('subscribed', () => { - const { requestId } = requests[0] - expect(requestId).toBeTruthy() - done() - }) + const { requestId } = requests[0] + expect(requestId).toBeTruthy() }) - it('emits a resend request if resend options were given. No second resend if a message is received.', (done) => { - const sub = mockSubscription({ + it('emits a resend request if resend options were given. No second resend if a message is received.', async () => { + const sub = await mockSubscription({ stream: 'stream1', resend: { last: 1, }, }, () => {}) - sub.once('subscribed', async () => { - await wait(100) - const { requestId, type } = requests[requests.length - 1] - expect(type).toEqual(ControlMessage.TYPES.ResendLastRequest) - const streamMessage = getStreamMessage(sub.streamId, {}) - connection.emitMessage(new UnicastMessage({ - requestId, - streamMessage, - })) - await wait(STORAGE_DELAY) - sub.stop() - await wait() - expect(connection._send.mock.calls).toHaveLength(2) // sub + resend - expect(connection._send.mock.calls[1][0]).toMatchObject({ - type: ControlMessage.TYPES.ResendLastRequest, - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId, - numberLast: 1, - sessionToken: 'session-token' - }) - done() + await wait(100) + const { requestId, type } = requests[requests.length - 1] + expect(type).toEqual(ControlMessage.TYPES.ResendLastRequest) + const streamMessage = getStreamMessage(sub.streamId, {}) + connection.emitMessage(new UnicastMessage({ + requestId, + streamMessage, + })) + await wait(STORAGE_DELAY) + sub.stop() + await wait() + expect(connection._send.mock.calls).toHaveLength(2) // sub + resend + expect(connection._send.mock.calls[1][0]).toMatchObject({ + type: ControlMessage.TYPES.ResendLastRequest, + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId, + numberLast: 1, + sessionToken: 'session-token' }) }, STORAGE_DELAY + 1000) - it('emits multiple resend requests as per multiple subscriptions. No second resends if messages are received.', async (done) => { - const sub1 = mockSubscription({ - stream: 'stream1', - resend: { - last: 2, - }, - }, () => {}) - const sub2 = mockSubscription({ - stream: 'stream1', - resend: { - last: 1, - }, - }, () => {}) - - let requestId1 - let requestId2 - - await Promise.all([ - new Promise((resolve) => { - sub1.once('subscribed', async () => { - await wait(200) - requestId1 = requests[requests.length - 2].requestId - const streamMessage = getStreamMessage(sub1.streamId, {}) - connection.emitMessage(new UnicastMessage({ - requestId: requestId1, - streamMessage, - })) - resolve() - }) - }), - new Promise((resolve) => { - sub2.once('subscribed', async () => { - await wait(200) - requestId2 = requests[requests.length - 1].requestId - const streamMessage = getStreamMessage(sub2.streamId, {}) - connection.emitMessage(new UnicastMessage({ - requestId: requestId2, - streamMessage, - })) - resolve() - }) - }) + it('emits multiple resend requests as per multiple subscriptions. No second resends if messages are received.', async () => { + const [sub1, sub2] = await Promise.all([ + mockSubscription({ + stream: 'stream1', + resend: { + last: 2, + }, + }, () => {}), + mockSubscription({ + stream: 'stream1', + resend: { + last: 1, + }, + }, () => {}) ]) + const requestId1 = requests.find((r) => r.numberLast === 2).requestId + connection.emitMessage(new UnicastMessage({ + requestId: requestId1, + streamMessage: getStreamMessage(sub1.streamId, {}) + })) + + const requestId2 = requests.find((r) => r.numberLast === 1).requestId + connection.emitMessage(new UnicastMessage({ + requestId: requestId2, + streamMessage: getStreamMessage(sub2.streamId, {}) + })) - await wait(STORAGE_DELAY + 400) sub1.stop() sub2.stop() @@ -371,8 +382,10 @@ describe('StreamrClient', () => { sessionToken: 'session-token', }) ] - // eslint-disable-next-line semi-style - ;[connection._send.mock.calls[1][0], connection._send.mock.calls[2][0]].forEach((actual, index) => { + + const calls = connection._send.mock.calls.filter(([o]) => [requestId1, requestId2].includes(o.requestId)) + expect(calls).toHaveLength(2) + calls.forEach(([actual], index) => { const expected = expectedResponses[index] expect(actual).toMatchObject({ requestId: expected.requestId, @@ -382,17 +395,15 @@ describe('StreamrClient', () => { sessionToken: expected.sessionToken, }) }) - done() }, STORAGE_DELAY + 1000) }) describe('UnsubscribeResponse', () => { // Before each test, client is connected, subscribed, and unsubscribe() is called let sub - beforeEach(async (done) => { - await client.ensureConnected() - sub = mockSubscription('stream1', () => {}) - sub.once('subscribed', () => done()) + beforeEach(async () => { + await client.connect() + sub = await mockSubscription('stream1', () => {}) }) it('removes the subscription', async () => { @@ -435,13 +446,13 @@ describe('StreamrClient', () => { beforeEach(async () => { await client.connect() - sub = mockSubscription('stream1', () => {}) + sub = await mockSubscription('stream1', () => {}) }) - it('should call the message handler of each subscription', () => { + it('should call the message handler of each subscription', async () => { sub.handleBroadcastMessage = jest.fn() - const sub2 = setupSubscription('stream1') + const sub2 = await mockSubscription('stream1', () => {}) sub2.handleBroadcastMessage = jest.fn() const requestId = uid('broadcastMessage') const msg1 = new BroadcastMessage({ @@ -451,6 +462,7 @@ describe('StreamrClient', () => { connection.emitMessage(msg1) expect(sub.handleBroadcastMessage).toHaveBeenCalledWith(msg1.streamMessage, expect.any(Function)) + expect(sub2.handleBroadcastMessage).toHaveBeenCalledWith(msg1.streamMessage, expect.any(Function)) }) it('should not crash if messages are received for unknown streams', () => { @@ -462,14 +474,14 @@ describe('StreamrClient', () => { connection.emitMessage(msg1) }) - it('should ensure that the promise returned by the verification function is cached and returned for all handlers', (done) => { + it('should ensure that the promise returned by the verification function is cached and returned for all handlers', async (done) => { let firstResult sub.handleBroadcastMessage = (message, verifyFn) => { firstResult = verifyFn() expect(firstResult).toBeInstanceOf(Promise) expect(verifyFn()).toBe(firstResult) } - const sub2 = mockSubscription('stream1', () => {}) + const sub2 = await mockSubscription('stream1', () => {}) sub2.handleBroadcastMessage = (message, verifyFn) => { firstResult = verifyFn() expect(firstResult).toBeInstanceOf(Promise) @@ -492,17 +504,14 @@ describe('StreamrClient', () => { describe('UnicastMessage', () => { let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription({ + sub = await mockSubscription({ stream: 'stream1', resend: { last: 5, }, }, () => {}) - .once('subscribed', () => { - done() - }) }) it('should call the message handler of specified Subscription', async () => { @@ -563,15 +572,14 @@ describe('StreamrClient', () => { describe('ResendResponseResending', () => { let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription({ + sub = await mockSubscription({ stream: 'stream1', resend: { last: 5, }, }, () => {}) - .once('subscribed', () => done()) }) it('emits event on associated subscription', async () => { @@ -609,14 +617,14 @@ describe('StreamrClient', () => { describe('ResendResponseNoResend', () => { let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription({ + sub = await mockSubscription({ stream: 'stream1', resend: { last: 5, }, - }, () => {}).once('subscribed', () => done()) + }, () => {}) }) it('calls event handler on subscription', () => { @@ -653,14 +661,14 @@ describe('StreamrClient', () => { describe('ResendResponseResent', () => { let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription({ + sub = await mockSubscription({ stream: 'stream1', resend: { last: 5, }, - }, () => {}).once('subscribed', () => done()) + }, () => {}) }) it('calls event handler on subscription', () => { @@ -695,14 +703,14 @@ describe('StreamrClient', () => { }) describe('ErrorResponse', () => { - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - mockSubscription({ + await mockSubscription({ stream: 'stream1', resend: { last: 5, } - }, () => {}).once('subscribed', () => done()) + }, () => {}) }) it('emits an error event on client', (done) => { @@ -714,7 +722,7 @@ describe('StreamrClient', () => { errorCode: 'error code' }) - client.once('error', async (err) => { + client.once('error', (err) => { errors.pop() expect(err.message).toEqual(errorResponse.errorMessage) expect(client.onError).toHaveBeenCalled() @@ -727,9 +735,9 @@ describe('StreamrClient', () => { describe('error', () => { let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription('stream1', () => {}).once('subscribed', () => done()) + sub = await mockSubscription('stream1', () => {}) }) it('reports InvalidJsonErrors to subscriptions', (done) => { @@ -751,14 +759,12 @@ describe('StreamrClient', () => { client.onError = jest.fn() const testError = new Error('This is a test error message, ignore') - client.once('error', async (err) => { + client.once('error', (err) => { + errors.pop() expect(err.message).toMatch(testError.message) expect(client.onError).toHaveBeenCalled() done() }) - client.once('error', () => { - errors.pop() - }) connection.emit('error', testError) }) }) @@ -847,81 +853,71 @@ describe('StreamrClient', () => { requestId, }) connection.emitMessage(resendResponse) - client.connection.socket.close() + connection.socket.close() await client.connect() expect(requests.filter((req) => req.type === ControlMessage.TYPES.SubscribeRequest)).toHaveLength(0) }) }) describe('subscribe()', () => { - it('should call client.connect() if autoConnect is set to true', (done) => { + it('should connect if autoConnect is set to true', async () => { client.options.autoConnect = true - client.once('connected', done) - - client.subscribe('stream1', () => {}) + await mockSubscription('stream1', () => {}) }) describe('when connected', () => { beforeEach(() => client.connect()) it('throws an error if no options are given', () => { - expect(() => { + expect(() => ( client.subscribe(undefined, () => {}) - }).toThrow() + )).rejects.toThrow() }) it('throws an error if options is wrong type', () => { - expect(() => { + expect(() => ( client.subscribe(['streamId'], () => {}) - }).toThrow() + )).rejects.toThrow() }) it('throws an error if no callback is given', () => { - expect(() => { + expect(() => ( client.subscribe('stream1') - }).toThrow() + )).rejects.toThrow() }) - it('sends a subscribe request', (done) => { - const sub = mockSubscription('stream1', () => {}) - sub.once('subscribed', () => { - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new SubscribeRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - sessionToken: 'session-token' - })) - done() - }) + it('sends a subscribe request', async () => { + const sub = await mockSubscription('stream1', () => {}) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new SubscribeRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + sessionToken: 'session-token' + })) }) - it('sends a subscribe request for a given partition', (done) => { - const sub = mockSubscription({ + it('sends a subscribe request for a given partition', async () => { + const sub = await mockSubscription({ stream: 'stream1', partition: 5, - }, () => {}).once('subscribed', () => { - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new SubscribeRequest({ - streamId: sub.streamId, - streamPartition: 5, - requestId: lastRequest.requestId, - sessionToken, - })) - done() - }) + }, () => {}) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new SubscribeRequest({ + streamId: sub.streamId, + streamPartition: 5, + requestId: lastRequest.requestId, + sessionToken, + })) }) it('sends subscribe request for each subscribed partition', async () => { const tasks = [] for (let i = 0; i < 3; i++) { - tasks.push(new Promise((resolve) => { - const s = mockSubscription({ - stream: 'stream1', - partition: i, - }, () => {}) - .once('subscribed', () => resolve(s)) - })) + tasks.push(mockSubscription({ + stream: 'stream1', + partition: i, + }, () => {})) } const subs = await Promise.all(tasks) @@ -949,11 +945,9 @@ describe('StreamrClient', () => { }) it('sends just one subscribe request to server even if there are multiple subscriptions for same stream', async () => { - const sub = mockSubscription('stream1', () => {}) - const sub2 = mockSubscription('stream1', () => {}) - await Promise.all([ - new Promise((resolve) => sub.once('subscribed', resolve)), - new Promise((resolve) => sub2.once('subscribed', resolve)) + const [sub, sub2] = await Promise.all([ + mockSubscription('stream1', () => {}), + mockSubscription('stream1', () => {}) ]) expect(requests).toHaveLength(1) const request = requests[0] @@ -969,9 +963,9 @@ describe('StreamrClient', () => { }) describe('with resend options', () => { - it('supports resend.from', (done) => { + it('supports resend.from', async () => { const ref = new MessageRef(5, 0) - const sub = mockSubscription({ + const sub = await mockSubscription({ stream: 'stream1', resend: { from: { @@ -981,60 +975,54 @@ describe('StreamrClient', () => { publisherId: 'publisherId', }, }, () => {}) - sub.once('subscribed', async () => { - await wait(200) - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new ResendFromRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - publisherId: 'publisherId', - fromMsgRef: ref, - sessionToken, - })) - const streamMessage = getStreamMessage(sub.streamId, {}) - connection.emitMessage(new UnicastMessage({ - requestId: lastRequest.requestId, - streamMessage, - })) - // TODO validate message - await wait(STORAGE_DELAY + 200) - sub.stop() - done() - }) + await wait(200) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new ResendFromRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + publisherId: 'publisherId', + fromMsgRef: ref, + sessionToken, + })) + const streamMessage = getStreamMessage(sub.streamId, {}) + connection.emitMessage(new UnicastMessage({ + requestId: lastRequest.requestId, + streamMessage, + })) + // TODO validate message + await wait(STORAGE_DELAY + 200) + sub.stop() }, STORAGE_DELAY + 1000) - it('supports resend.last', (done) => { - const sub = mockSubscription({ + it('supports resend.last', async () => { + const sub = await mockSubscription({ stream: 'stream1', resend: { last: 5, }, }, () => {}) - sub.once('subscribed', async () => { - await wait(200) - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new ResendLastRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - numberLast: 5, - sessionToken, - })) - const streamMessage = getStreamMessage(sub.streamId, {}) - connection.emitMessage(new UnicastMessage({ - requestId: lastRequest.requestId, - streamMessage, - })) - // TODO validate message - await wait(STORAGE_DELAY + 200) - sub.stop() - done() - }) + await wait(200) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new ResendLastRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + numberLast: 5, + sessionToken, + })) + const streamMessage = getStreamMessage(sub.streamId, {}) + connection.emitMessage(new UnicastMessage({ + requestId: lastRequest.requestId, + streamMessage, + })) + // TODO validate message + await wait(STORAGE_DELAY + 200) + sub.stop() }, STORAGE_DELAY + 1000) it('sends a ResendLastRequest if no StreamMessage received and a ResendResponseNoResend received', async () => { - const sub = client.subscribe({ + const t = client.subscribe({ stream: 'stream1', resend: { last: 5, @@ -1045,7 +1033,7 @@ describe('StreamrClient', () => { await wait() if (request.type === ControlMessage.TYPES.SubscribeRequest) { connection.emitMessage(new SubscribeResponse({ - streamId: sub.streamId, + streamId: request.streamId, requestId: request.requestId, streamPartition: request.streamPartition, })) @@ -1053,8 +1041,8 @@ describe('StreamrClient', () => { if (request.type === ControlMessage.TYPES.ResendLastRequest) { const resendResponse = new ResendResponseNoResend({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, + streamId: request.streamId, + streamPartition: request.streamPartition, requestId: request.requestId }) connection.emitMessage(resendResponse) @@ -1062,6 +1050,7 @@ describe('StreamrClient', () => { } await wait(STORAGE_DELAY + 200) + const sub = await t sub.stop() expect(requests).toHaveLength(2) const lastRequest = requests[requests.length - 1] @@ -1075,7 +1064,7 @@ describe('StreamrClient', () => { }, STORAGE_DELAY + 1000) it('throws if multiple resend options are given', () => { - expect(() => { + expect(() => ( client.subscribe({ stream: 'stream1', resend: { @@ -1086,95 +1075,85 @@ describe('StreamrClient', () => { last: 5, }, }, () => {}) - }).toThrow() + )).rejects.toThrow() }) }) describe('Subscription event handling', () => { describe('gap', () => { - it('sends resend request', (done) => { - const sub = mockSubscription('streamId', () => {}) - sub.once('subscribed', async () => { - await wait() - const fromRef = new MessageRef(1, 0) - const toRef = new MessageRef(5, 0) - - const fromRefObject = { - timestamp: fromRef.timestamp, - sequenceNumber: fromRef.sequenceNumber, - } - const toRefObject = { - timestamp: toRef.timestamp, - sequenceNumber: toRef.sequenceNumber, - } - sub.emit('gap', fromRefObject, toRefObject, 'publisherId', 'msgChainId') - await wait(100) - - expect(requests).toHaveLength(2) - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new ResendRangeRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - fromMsgRef: fromRef, - toMsgRef: toRef, - msgChainId: lastRequest.msgChainId, - publisherId: lastRequest.publisherId, - sessionToken, - })) - done() - }) + it('sends resend request', async () => { + const sub = await mockSubscription('streamId', () => {}) + const fromRef = new MessageRef(1, 0) + const toRef = new MessageRef(5, 0) + + const fromRefObject = { + timestamp: fromRef.timestamp, + sequenceNumber: fromRef.sequenceNumber, + } + const toRefObject = { + timestamp: toRef.timestamp, + sequenceNumber: toRef.sequenceNumber, + } + sub.emit('gap', fromRefObject, toRefObject, 'publisherId', 'msgChainId') + await wait(100) + + expect(requests).toHaveLength(2) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new ResendRangeRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + fromMsgRef: fromRef, + toMsgRef: toRef, + msgChainId: lastRequest.msgChainId, + publisherId: lastRequest.publisherId, + sessionToken, + })) }) - it('does not send another resend request while resend is in progress', (done) => { - const sub = mockSubscription('streamId', () => {}) - sub.once('subscribed', async () => { - await wait() - const fromRef = new MessageRef(1, 0) - const toRef = new MessageRef(5, 0) - const fromRefObject = { - timestamp: fromRef.timestamp, - sequenceNumber: fromRef.sequenceNumber, - } - const toRefObject = { - timestamp: toRef.timestamp, - sequenceNumber: toRef.sequenceNumber, - } - sub.emit('gap', fromRefObject, toRefObject, 'publisherId', 'msgChainId') - sub.emit('gap', fromRefObject, { - timestamp: 10, - sequenceNumber: 0, - }, 'publisherId', 'msgChainId') - await wait() - expect(requests).toHaveLength(2) - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new ResendRangeRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - fromMsgRef: fromRef, - toMsgRef: toRef, - msgChainId: lastRequest.msgChainId, - publisherId: lastRequest.publisherId, - sessionToken, - })) - done() - }) + it('does not send another resend request while resend is in progress', async () => { + const sub = await mockSubscription('streamId', () => {}) + const fromRef = new MessageRef(1, 0) + const toRef = new MessageRef(5, 0) + const fromRefObject = { + timestamp: fromRef.timestamp, + sequenceNumber: fromRef.sequenceNumber, + } + const toRefObject = { + timestamp: toRef.timestamp, + sequenceNumber: toRef.sequenceNumber, + } + sub.emit('gap', fromRefObject, toRefObject, 'publisherId', 'msgChainId') + sub.emit('gap', fromRefObject, { + timestamp: 10, + sequenceNumber: 0, + }, 'publisherId', 'msgChainId') + await wait() + expect(requests).toHaveLength(2) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new ResendRangeRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + fromMsgRef: fromRef, + toMsgRef: toRef, + msgChainId: lastRequest.msgChainId, + publisherId: lastRequest.publisherId, + sessionToken, + })) }) }) describe('done', () => { - it('unsubscribes', (done) => { - const sub = mockSubscription('stream1', () => {}) + it('unsubscribes', async (done) => { + const sub = await mockSubscription('stream1', () => {}) client.subscriber.unsubscribe = async (unsub) => { expect(sub).toBe(unsub) done() } - sub.once('subscribed', async () => { - await wait() - sub.emit('done') - }) + await wait() + sub.emit('done') }) }) }) @@ -1184,12 +1163,11 @@ describe('StreamrClient', () => { describe('unsubscribe()', () => { // Before each, client is connected and subscribed let sub - beforeEach(async (done) => { + beforeEach(async () => { await client.connect() - sub = mockSubscription('stream1', () => { + sub = await mockSubscription('stream1', () => { errors.push(new Error('should not fire message handler')) }) - sub.once('subscribed', () => done()) }) it('sends an unsubscribe request', async () => { @@ -1205,7 +1183,7 @@ describe('StreamrClient', () => { }) it('does not send unsubscribe request if there are other subs remaining for the stream', async () => { - client.subscribe({ + await mockSubscription({ stream: sub.streamId, }, () => {}) @@ -1213,23 +1191,36 @@ describe('StreamrClient', () => { expect(requests).toHaveLength(1) }) - it('sends unsubscribe request when the last subscription is unsubscribed', (done) => { - const sub2 = client.subscribe({ - stream: sub.streamId, - }, () => {}) + it('sends unsubscribe request when the last subscription is unsubscribed', async () => { + const sub2 = await mockSubscription(sub.streamId, () => {}) - sub2.once('subscribed', async () => { - await client.unsubscribe(sub) - await client.unsubscribe(sub2) - const lastRequest = requests[requests.length - 1] - expect(lastRequest).toEqual(new UnsubscribeRequest({ - streamId: sub.streamId, - streamPartition: sub.streamPartition, - requestId: lastRequest.requestId, - sessionToken, - })) - done() - }) + await client.unsubscribe(sub) + await client.unsubscribe(sub2) + const lastRequest = requests[requests.length - 1] + expect(lastRequest).toEqual(new UnsubscribeRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + sessionToken, + })) + }) + + it('sends only a single unsubscribe request when the last subscription is unsubscribed', async () => { + const sub2 = await mockSubscription(sub.streamId, () => {}) + requests = [] + await Promise.all([ + client.unsubscribe(sub), + client.unsubscribe(sub2) + ]) + expect(requests).toHaveLength(1) + const lastRequest = requests[requests.length - 1] + + expect(lastRequest).toEqual(new UnsubscribeRequest({ + streamId: sub.streamId, + streamPartition: sub.streamPartition, + requestId: lastRequest.requestId, + sessionToken, + })) }) it('does not send an unsubscribe request again if unsubscribe is called multiple times', async () => { @@ -1399,8 +1390,7 @@ describe('StreamrClient', () => { }) it('does not reset subscriptions', async () => { - const sub = mockSubscription('stream1', () => {}) - await new Promise((resolve) => sub.once('subscribed', resolve)) + const sub = await mockSubscription('stream1', () => {}) await client.pause() expect(client.getSubscriptions(sub.streamId)).toEqual([sub]) }) From a8c6be33a26544b0f95877ad949f01d2e285c748 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Sat, 12 Sep 2020 13:12:45 -0400 Subject: [PATCH 2/6] Fail _request call if unsubscribe/subscribe request fails to send. --- src/Subscriber.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Subscriber.js b/src/Subscriber.js index 43d62b822..11fb83c30 100644 --- a/src/Subscriber.js +++ b/src/Subscriber.js @@ -366,7 +366,7 @@ export default class Subscriber { this.debug('_requestSubscribe: subscribing client: %o', request) await this.client.send(request).catch((err) => { sub.setState(Subscription.State.unsubscribed) - const error = new Error(`Failed to sendnsubscribe request: ${err.stack}`) + const error = new Error(`Failed to send subscribe request: ${err.stack}`) this.onErrorEmit(error) throw error }) @@ -381,9 +381,8 @@ export default class Subscriber { }) await this.client.connection.send(unsubscribeRequest).catch((err) => { sub.setState(Subscription.State.subscribed) - const error = new Error(`Failed to send unsubscribe request: ${err.stack}`) - this.onErrorEmit(error) - throw error + this.client.emit(new Error(`Failed to send unsubscribe request: ${err.stack}`)) + throw err }) return this._checkAutoDisconnect() } From 27deda44b228e7c8cfb3db0cf0d3a3bff4e05021 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Sat, 12 Sep 2020 13:51:01 -0400 Subject: [PATCH 3/6] Fix missing import in resend test. --- test/integration/Resends.test.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/integration/Resends.test.js b/test/integration/Resends.test.js index 7e84cf595..e980fc0f0 100644 --- a/test/integration/Resends.test.js +++ b/test/integration/Resends.test.js @@ -1,3 +1,4 @@ +import { wait, waitForCondition, waitForEvent } from 'streamr-test-utils' import Debug from 'debug' import { uid } from '../utils' @@ -5,8 +6,6 @@ import StreamrClient from '../../src' import config from './config' -const { wait, waitForCondition } = require('streamr-test-utils') - const createClient = (opts = {}) => new StreamrClient({ apiKey: 'tester1-api-key', autoConnect: false, From 6b096bd6c9fdeaede4b6f3bbae3bf372e3c22dea Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 15 Sep 2020 11:16:15 -0400 Subject: [PATCH 4/6] Fix browser tests. --- test/browser/browser.html | 48 +++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/test/browser/browser.html b/test/browser/browser.html index bdc409676..22793a880 100644 --- a/test/browser/browser.html +++ b/test/browser/browser.html @@ -33,32 +33,33 @@ const resetResults = () => $('#result').html('') +client.on('error', (err) => { + console.error(err) + $('#result').html('Error: ' + err) +}) + $('#connect').on('click', async () => { resetResults() await client.connect() $('#result').html(client.connection.getState()) }) -$('#create').on('click', () => { +$('#create').on('click', async () => { resetResults() - client.createStream({ + stream = await client.createStream({ name: streamName - }).then((newStream) => { - stream = newStream - $('#result').html(stream.name) }) + $('#result').html(stream.name) }) -$('#subscribe').on('click', () => { +$('#subscribe').on('click', async () => { resetResults() - const sub = client.subscribe({ - stream: stream.id - }, - (message, metadata) => { - messages.push(message) - } - ) - sub.on('subscribed', () => $('#result').html('subscribed')) + await client.subscribe({ + stream: stream.id + }, (message, metadata) => { + messages.push(message) + }) + $('#result').html('subscribed') }) $('#publish').on('click', async () => { @@ -77,15 +78,13 @@ messages = [] const sub = await client.resend({ - stream: stream.id, - resend: { - last: 10, - }, + stream: stream.id, + resend: { + last: 10, }, - (message) => { - messages.push(message) - } - ) + }, (message) => { + messages.push(message) + }) sub.on('resent', () => { $('#result').html('Resend: ' + JSON.stringify(messages)) @@ -96,10 +95,5 @@ await client.disconnect() $('#result').html(client.connection.getState()) }) - -client.on('error', (err) => { - console.error(err) - $('#result').html('Error: ' + err) -}) From d73804d306f7272d8eda696a49101b235c74fba1 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 15 Sep 2020 14:45:24 -0400 Subject: [PATCH 5/6] Clean up unused function in test. --- test/unit/StreamrClient.test.js | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/test/unit/StreamrClient.test.js b/test/unit/StreamrClient.test.js index 75bfd352a..53825e652 100644 --- a/test/unit/StreamrClient.test.js +++ b/test/unit/StreamrClient.test.js @@ -41,36 +41,6 @@ describe('StreamrClient', () => { const streamPartition = 0 const sessionToken = 'session-token' - function setupSubscription( - streamId, emitSubscribed = true, subscribeOptions = {}, handler = sinon.stub(), - expectSubscribeRequest = !client.getSubscriptions(streamId).length, - ) { - expect(client.isConnected()).toBeTruthy() - const requestId = uid('request') - - if (expectSubscribeRequest) { - connection.expect(new SubscribeRequest({ - requestId, - streamId, - streamPartition, - sessionToken, - })) - } - const sub = client.subscribe({ - stream: streamId, - ...subscribeOptions, - }, handler) - - if (emitSubscribed) { - connection.emitMessage(new SubscribeResponse({ - streamId: sub.streamId, - requestId, - streamPartition, - })) - } - return sub - } - function getStreamMessage(streamId = 'stream1', content = {}, publisherId = '') { const timestamp = Date.now() return new StreamMessage({ From 0fdcba332d03b056defc7c0e6ba18a77acd25a10 Mon Sep 17 00:00:00 2001 From: Tim Kevin Oxley Date: Tue, 24 Nov 2020 10:51:01 -0500 Subject: [PATCH 6/6] 4. Refactor Publish (#164) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor & co-locate publish code. * Avoid using expensive ethers.Wallet.createRandom() calls in test. e.g. 1000x calls with ethers: 14s, randomBytes: 3.5ms. * Ensure messageCreationUtil is cleaned up after test. * Fix non-functional MessageCreationUtil test. * Swap out receptacle for more flexible mem/p-memoize/quick-lru. * Convert LoginEndpoints test to async/await. * Remove calls to ensureConnected/ensureDisconnected in test. * 5. Message Sequencing – Guarantee sequence follows publish order & Prevent backdated messages silently breaking future publishes (#166) * Improve authFetch logging. * Update message sequencer to strictly enforce message order. * Queue publishes per-stream otherwise can skip forward then back even when messages published in correct sequence. * Add partial solution to broken backdated messages, at least doesn't break regular sequential publishes. * Tidy up, add some comments. * Move publish queue fn into utils. --- package-lock.json | 123 ++++++++-- package.json | 5 +- src/MessageCreationUtil.js | 178 -------------- src/Publisher.js | 286 ++++++++++++++++++++-- src/rest/StreamEndpoints.js | 1 + src/rest/authFetch.js | 26 +- src/utils.js | 99 ++++++++ test/benchmarks/publish.js | 4 +- test/flakey/DataUnionEndpoints.test.js | 10 +- test/integration/LoginEndpoints.test.js | 112 ++++----- test/integration/MultipleClients.test.js | 34 ++- test/integration/ResendReconnect.test.js | 12 +- test/integration/Resends.test.js | 6 +- test/integration/Sequencing.test.js | 290 +++++++++++++++++++++++ test/integration/Session.test.js | 5 +- test/integration/StreamEndpoints.test.js | 6 +- test/integration/StreamrClient.test.js | 14 +- test/integration/Subscription.test.js | 5 +- test/integration/authFetch.test.js | 4 +- test/unit/MessageCreationUtil.test.js | 262 +++++++++++--------- test/unit/StreamrClient.test.js | 3 +- test/unit/utils.test.js | 4 +- test/utils.js | 6 + 23 files changed, 1049 insertions(+), 446 deletions(-) delete mode 100644 src/MessageCreationUtil.js create mode 100644 test/integration/Sequencing.test.js diff --git a/package-lock.json b/package-lock.json index ad08949e7..6bbd95df9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8526,6 +8526,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "pkg-dir": { @@ -11039,6 +11050,14 @@ "tmpl": "1.0.x" } }, + "map-age-cleaner": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/map-age-cleaner/-/map-age-cleaner-0.1.3.tgz", + "integrity": "sha512-bJzx6nMoP6PDLPBFmg7+xRKeFZvFboMrGlxmNj9ClvX53KrmvM5bXFXEWjbz4cz1AFn+jWJ9z/DJSz7hrs0w3w==", + "requires": { + "p-defer": "^1.0.0" + } + }, "map-cache": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/map-cache/-/map-cache-0.2.2.tgz", @@ -11071,6 +11090,22 @@ "integrity": "sha1-hxDXrwqmJvj/+hzgAWhUUmMlV0g=", "dev": true }, + "mem": { + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/mem/-/mem-6.1.1.tgz", + "integrity": "sha512-Ci6bIfq/UgcxPTYa8dQQ5FY3BzKkT894bwXWXxC/zqs0XgMO2cT20CGkOqda7gZNkmK5VP4x89IGZ6K7hfbn3Q==", + "requires": { + "map-age-cleaner": "^0.1.3", + "mimic-fn": "^3.0.0" + }, + "dependencies": { + "mimic-fn": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz", + "integrity": "sha512-Ysbi9uYW9hFyfrThdDEQuykN4Ey6BuwPD2kpI5ES/nFTDn/98yxYNLZJcgUAKPT/mcrLLKaGzJR9YVxJrIdASQ==" + } + } + }, "memory-fs": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/memory-fs/-/memory-fs-0.4.1.tgz", @@ -11403,6 +11438,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "strip-json-comments": { @@ -12120,6 +12166,11 @@ "integrity": "sha1-hUNzx/XCMVkU/Jv8a9gjj92h7Cc=", "dev": true }, + "p-defer": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", + "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=" + }, "p-each-series": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-each-series/-/p-each-series-2.1.0.tgz", @@ -12142,10 +12193,9 @@ "dev": true }, "p-limit": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", - "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", - "dev": true, + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.0.2.tgz", + "integrity": "sha512-iwqZSOoWIW+Ew4kAGUlN16J4M7OB3ysMLSZtnhmqx7njIHFPlxWBX8xo3lVTyFVq6mI/lL9qt2IsN1sHwaxJkg==", "requires": { "p-try": "^2.0.0" } @@ -12157,6 +12207,17 @@ "dev": true, "requires": { "p-limit": "^2.2.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "p-map": { @@ -12168,6 +12229,22 @@ "aggregate-error": "^3.0.0" } }, + "p-memoize": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-memoize/-/p-memoize-4.0.0.tgz", + "integrity": "sha512-oMxCJKVS75Bf2RWtXJNQNaX2K1G0FYpllOh2iTsPXZqnf9dWMcis3BL+pRdLeQY8lIdwwL01k/UV5LBdcVhZzg==", + "requires": { + "mem": "^6.0.1", + "mimic-fn": "^3.0.0" + }, + "dependencies": { + "mimic-fn": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz", + "integrity": "sha512-Ysbi9uYW9hFyfrThdDEQuykN4Ey6BuwPD2kpI5ES/nFTDn/98yxYNLZJcgUAKPT/mcrLLKaGzJR9YVxJrIdASQ==" + } + } + }, "p-timeout": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-2.0.1.tgz", @@ -12180,8 +12257,7 @@ "p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", - "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==", - "dev": true + "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==" }, "pac-proxy-agent": { "version": "3.0.1", @@ -12691,6 +12767,11 @@ "integrity": "sha1-nsYfeQSYdXB9aUFFlv2Qek1xHnM=", "dev": true }, + "quick-lru": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/quick-lru/-/quick-lru-5.1.1.tgz", + "integrity": "sha512-WuyALRjWPDGtt/wzJiadO5AXY+8hZ80hVpe6MyivgraREW751X3SbhRvG3eLKOYN+8VEvqLcf3wdnt44Z4S4SA==" + }, "randombytes": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz", @@ -13181,14 +13262,6 @@ } } }, - "receptacle": { - "version": "1.3.2", - "resolved": "https://registry.npmjs.org/receptacle/-/receptacle-1.3.2.tgz", - "integrity": "sha512-HrsFvqZZheusncQRiEE7GatOAETrARKV/lnfYicIm8lbvp/JQOdADOfhjBd2DajvoszEyxSM6RlAAIZgEoeu/A==", - "requires": { - "ms": "^2.1.1" - } - }, "regenerate": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/regenerate/-/regenerate-1.4.1.tgz", @@ -15506,6 +15579,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "supports-color": { @@ -15922,6 +16006,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "yargs": { diff --git a/package.json b/package.json index 7c16e816a..77d823622 100644 --- a/package.json +++ b/package.json @@ -70,13 +70,16 @@ "ethers": "^5.0.12", "eventemitter3": "^4.0.7", "lodash.uniqueid": "^4.0.1", + "mem": "^6.1.1", "node-fetch": "^2.6.1", "node-webcrypto-ossl": "^2.1.1", "once": "^1.4.0", + "p-limit": "^3.0.2", + "p-memoize": "^4.0.0", "promise-memoize": "^1.2.1", "qs": "^6.9.4", + "quick-lru": "^5.1.1", "randomstring": "^1.1.5", - "receptacle": "^1.3.2", "streamr-client-protocol": "^5.1.0", "uuid": "^8.3.0", "webpack-node-externals": "^2.5.2", diff --git a/src/MessageCreationUtil.js b/src/MessageCreationUtil.js deleted file mode 100644 index 17e7c2567..000000000 --- a/src/MessageCreationUtil.js +++ /dev/null @@ -1,178 +0,0 @@ -import crypto from 'crypto' - -import Receptacle from 'receptacle' -import randomstring from 'randomstring' -import { MessageLayer } from 'streamr-client-protocol' -import { ethers } from 'ethers' - -import Stream from './rest/domain/Stream' -import InvalidMessageTypeError from './errors/InvalidMessageTypeError' - -const { StreamMessage, MessageID, MessageRef } = MessageLayer - -export default class MessageCreationUtil { - constructor(auth, signer, getUserInfo, getStreamFunction) { - this.auth = auth - this._signer = signer - this.getUserInfo = getUserInfo - this.getStreamFunction = getStreamFunction - this.cachedStreams = new Receptacle({ - max: 10000, - }) - this.publishedStreams = {} - this.msgChainId = randomstring.generate(20) - this.cachedHashes = {} - } - - stop() { - this.cachedStreams.clear() - } - - async getUsername() { - if (!this.usernamePromise) { - // In the edge case where StreamrClient.auth.apiKey is an anonymous key, userInfo.id is that anonymous key - this.usernamePromise = this.getUserInfo().then((userInfo) => userInfo.username || userInfo.id) - } - return this.usernamePromise - } - - async getStream(streamId) { - if (!this.cachedStreams.get(streamId)) { - const streamPromise = this.getStreamFunction(streamId).then((stream) => ({ - id: stream.id, - partitions: stream.partitions, - })) - const success = this.cachedStreams.set(streamId, streamPromise, { - ttl: 30 * 60 * 1000, // 30 minutes - refresh: true, // reset ttl on access - }) - if (!success) { - console.warn(`Could not store stream with id ${streamId} in local cache.`) - return streamPromise - } - } - return this.cachedStreams.get(streamId) - } - - async getPublisherId() { - if (!this.publisherId) { - if (this.auth.privateKey !== undefined) { - this.publisherId = ethers.utils.computeAddress(this.auth.privateKey).toLowerCase() - } else if (this.auth.provider !== undefined) { - const provider = new ethers.providers.Web3Provider(this.auth.provider) - this.publisherId = provider.getSigner().address.toLowerCase() - } else if (this.auth.apiKey !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(await this.getUsername(), 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else if (this.auth.username !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(this.auth.username, 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else if (this.auth.sessionToken !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(await this.getUsername(), 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else { - throw new Error('Need either "privateKey", "provider", "apiKey", "username"+"password" or "sessionToken" to derive the publisher Id.') - } - } - return this.publisherId - } - - getNextSequenceNumber(key, timestamp) { - if (timestamp !== this.getPrevTimestamp(key)) { - return 0 - } - return this.getPrevSequenceNumber(key) + 1 - } - - getPrevMsgRef(key) { - const prevTimestamp = this.getPrevTimestamp(key) - if (!prevTimestamp) { - return null - } - const prevSequenceNumber = this.getPrevSequenceNumber(key) - return new MessageRef(prevTimestamp, prevSequenceNumber) - } - - getPrevTimestamp(key) { - return this.publishedStreams[key].prevTimestamp - } - - getPrevSequenceNumber(key) { - return this.publishedStreams[key].prevSequenceNumber - } - - async createStreamMessage(streamObjectOrId, data, timestamp = Date.now(), partitionKey = null) { - // Validate data - if (typeof data !== 'object') { - throw new Error(`Message data must be an object! Was: ${data}`) - } - - const stream = (streamObjectOrId instanceof Stream) ? streamObjectOrId : await this.getStream(streamObjectOrId) - const streamPartition = this.computeStreamPartition(stream.partitions, partitionKey) - const publisherId = await this.getPublisherId() - const [messageId, prevMsgRef] = this.createMsgIdAndPrevRef(stream.id, streamPartition, timestamp, publisherId) - - const streamMessage = new StreamMessage({ - messageId, - prevMsgRef, - content: data, - messageType: StreamMessage.MESSAGE_TYPES.MESSAGE, - }) - - if (this._signer) { - await this._signer.signStreamMessage(streamMessage) - } - return streamMessage - } - - createMsgIdAndPrevRef(streamId, streamPartition, timestamp, publisherId) { - const key = streamId + streamPartition - if (!this.publishedStreams[key]) { - this.publishedStreams[key] = { - prevTimestamp: null, - prevSequenceNumber: 0, - } - } - - const sequenceNumber = this.getNextSequenceNumber(key, timestamp) - const messageId = new MessageID(streamId, streamPartition, timestamp, sequenceNumber, publisherId, this.msgChainId) - const prevMsgRef = this.getPrevMsgRef(key) - this.publishedStreams[key].prevTimestamp = timestamp - this.publishedStreams[key].prevSequenceNumber = sequenceNumber - return [messageId, prevMsgRef] - } - - createDefaultMsgIdAndPrevRef(streamId, publisherId) { - return this.createMsgIdAndPrevRef(streamId, 0, Date.now(), publisherId) - } - - static getErrorCodeFromError(error) { - if (error instanceof InvalidMessageTypeError) { - return 'INVALID_MESSAGE_TYPE' - } - return 'UNEXPECTED_ERROR' - } - - hash(stringToHash) { - if (this.cachedHashes[stringToHash] === undefined) { - this.cachedHashes[stringToHash] = crypto.createHash('md5').update(stringToHash).digest() - } - return this.cachedHashes[stringToHash] - } - - computeStreamPartition(partitionCount, partitionKey) { - if (!partitionCount) { - throw new Error('partitionCount is falsey!') - } else if (partitionCount === 1) { - // Fast common case - return 0 - } else if (partitionKey) { - const buffer = this.hash(partitionKey) - const intHash = buffer.readInt32LE() - return Math.abs(intHash) % partitionCount - } else { - // Fallback to random partition if no key - return Math.floor(Math.random() * partitionCount) - } - } -} diff --git a/src/Publisher.js b/src/Publisher.js index a13610454..94dd67749 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -1,10 +1,16 @@ -import once from 'once' -import { ControlLayer } from 'streamr-client-protocol' +import crypto from 'crypto' + +import { ControlLayer, MessageLayer } from 'streamr-client-protocol' +import randomstring from 'randomstring' +import LRU from 'quick-lru' +import { ethers } from 'ethers' import Signer from './Signer' import Stream from './rest/domain/Stream' import FailedToPublishError from './errors/FailedToPublishError' -import MessageCreationUtil from './MessageCreationUtil' +import { CacheAsyncFn, CacheFn, LimitAsyncFnByKey } from './utils' + +const { StreamMessage, MessageID, MessageRef } = MessageLayer function getStreamId(streamObjectOrId) { if (streamObjectOrId instanceof Stream) { @@ -18,12 +24,238 @@ function getStreamId(streamObjectOrId) { throw new Error(`First argument must be a Stream object or the stream id! Was: ${streamObjectOrId}`) } +function hash(stringToHash) { + return crypto.createHash('md5').update(stringToHash).digest() +} + +/** + * Message Chain Sequencing + */ + +class MessageChainSequence { + constructor({ maxSize = 10000 } = {}) { + this.msgChainId = randomstring.generate(20) + // tracks previous timestamp+sequence for stream+partition + this.messageRefs = new LRU({ + maxSize, // should never exceed this except in pathological cases + }) + } + + /** + * Generate the next message MessageID + previous MessageRef for this message chain. + * Messages with same timestamp get incremented sequence numbers. + */ + + add({ streamId, streamPartition, timestamp, publisherId, }) { + // NOTE: publishing back-dated (i.e. non-sequentially timestamped) messages will 'break' sequencing. + // i.e. we lose track of biggest sequence number whenever timestamp changes for stream id+partition combo + // so backdated messages will start at sequence 0 again, regardless of the sequencing of existing messages. + // storage considers timestamp+sequence number unique, so the newer messages will clobber the older messages + // Not feasible to keep greatest sequence number for every millisecond timestamp so not sure a good way around this. + // Possible we should keep a global sequence number + const key = `${streamId}|${streamPartition}` + const prevMsgRef = this.messageRefs.get(key) + const isSameTimestamp = prevMsgRef && prevMsgRef.timestamp === timestamp + const isBackdated = prevMsgRef && prevMsgRef.timestamp > timestamp + // increment if timestamp the same, otherwise 0 + const nextSequenceNumber = isSameTimestamp ? prevMsgRef.sequenceNumber + 1 : 0 + const messageId = new MessageID(streamId, streamPartition, timestamp, nextSequenceNumber, publisherId, this.msgChainId) + // update latest timestamp + sequence for this streamId+partition + // (see note above about clobbering sequencing) + // don't update latest if timestamp < previous timestamp + // this "fixes" the sequence breaking issue above, but this message will silently disappear + if (!isBackdated) { + this.messageRefs.set(key, new MessageRef(timestamp, nextSequenceNumber)) + } + return [messageId, prevMsgRef] + } + + clear() { + this.messageRefs.clear() + } +} + +/** + * Computes appropriate stream partition + */ + +export class StreamPartitioner { + constructor(client) { + this.client = client + const cacheOptions = client.options.cache + this._getStreamPartitions = CacheAsyncFn(this._getStreamPartitions.bind(this), cacheOptions) + this.hash = CacheFn(hash, cacheOptions) + } + + clear() { + this._getStreamPartitions.clear() + this.hash.clear() + } + + /** + * Get partition for given stream/streamId + partitionKey + */ + + async get(streamObjectOrId, partitionKey) { + const streamPartitions = await this.getStreamPartitions(streamObjectOrId) + return this.computeStreamPartition(streamPartitions, partitionKey) + } + + async getStreamPartitions(streamObjectOrId) { + if (streamObjectOrId && streamObjectOrId.partitions != null) { + return streamObjectOrId.partitions + } + + // get streamId here so caching based on id works + const streamId = getStreamId(streamObjectOrId) + return this._getStreamPartitions(streamId) + } + + async _getStreamPartitions(streamId) { + const { partitions } = await this.client.getStream(streamId) + return partitions + } + + computeStreamPartition(partitionCount, partitionKey) { + if (!(Number.isSafeInteger(partitionCount) && partitionCount > 0)) { + throw new Error(`partitionCount is not a safe positive integer! ${partitionCount}`) + } + + if (partitionCount === 1) { + // Fast common case + return 0 + } + + if (!partitionKey) { + // Fallback to random partition if no key + return Math.floor(Math.random() * partitionCount) + } + + const buffer = this.hash(partitionKey) + const intHash = buffer.readInt32LE() + return Math.abs(intHash) % partitionCount + } +} + +export class MessageCreationUtil { + constructor(client) { + this.client = client + const cacheOptions = client.options.cache + this.msgChainer = new MessageChainSequence(cacheOptions) + this.partitioner = new StreamPartitioner(client) + this.getUserInfo = CacheAsyncFn(this.getUserInfo.bind(this), cacheOptions) + this.getPublisherId = CacheAsyncFn(this.getPublisherId.bind(this), cacheOptions) + this.queue = LimitAsyncFnByKey(1) // an async queue for each stream's async deps + } + + stop() { + this.msgChainer.clear() + this.getUserInfo.clear() + this.getPublisherId.clear() + this.partitioner.clear() + this.queue.clear() + } + + async getPublisherId() { + const { options: { auth = {} } = {} } = this.client + if (auth.privateKey !== undefined) { + return ethers.utils.computeAddress(auth.privateKey).toLowerCase() + } + + if (auth.provider !== undefined) { + const provider = new ethers.providers.Web3Provider(auth.provider) + return provider.getSigner().address.toLowerCase() + } + + const username = auth.username || await this.getUsername() + + if (username !== undefined) { + const hexString = ethers.utils.hexlify(Buffer.from(username, 'utf8')) + return ethers.utils.sha256(hexString) + } + + throw new Error('Need either "privateKey", "provider", "apiKey", "username"+"password" or "sessionToken" to derive the publisher Id.') + } + + /* cached remote call */ + async getUserInfo() { + return this.client.getUserInfo() + } + + async getUsername() { + const { username, id } = await this.client.getUserInfo() + return ( + username + // edge case: if auth.apiKey is an anonymous key, userInfo.id is that anonymous key + || id + ) + } + + async createStreamMessage(streamObjectOrId, options = {}) { + const { content } = options + // Validate content + if (typeof content !== 'object') { + throw new Error(`Message content must be an object! Was: ${content}`) + } + + // queued depdendencies fetching + const [publisherId, streamPartition] = await this._getDependencies(streamObjectOrId, options) + return this._createStreamMessage(getStreamId(streamObjectOrId), { + publisherId, + streamPartition, + ...options + }) + } + + /** + * Fetch async dependencies for publishing. + * Should resolve in call-order per-stream to guarantee correct sequencing. + */ + + async _getDependencies(streamObjectOrId, { partitionKey }) { + // This queue guarantees stream messages for the same timestamp are sequenced in-order + // regardless of the async resolution order. + // otherwise, if async calls happen to resolve in a different order + // than they were issued we will end up generating the wrong sequence numbers + const streamId = getStreamId(streamObjectOrId) + return this.queue(streamId, async () => ( + Promise.all([ + this.getPublisherId(), + this.partitioner.get(streamObjectOrId, partitionKey), + ]) + )) + } + + /** + * Synchronously generate chain sequence + stream message after async deps resolved. + */ + + _createStreamMessage(streamId, options = {}) { + const { + content, streamPartition, timestamp, publisherId, ...opts + } = options + + const [messageId, prevMsgRef] = this.msgChainer.add({ + streamId, + streamPartition, + timestamp, + publisherId, + }) + + return new StreamMessage({ + messageId, + prevMsgRef, + content, + ...opts + }) + } +} + export default class Publisher { constructor(client) { this.client = client this.debug = client.debug.extend('Publisher') - this.publishQueue = [] this.signer = Signer.createSigner({ ...client.options.auth, debug: client.debug, @@ -34,59 +266,73 @@ export default class Publisher { if (client.session.isUnauthenticated()) { this.msgCreationUtil = null } else { - this.msgCreationUtil = new MessageCreationUtil( - client.options.auth, this.signer, once(() => client.getUserInfo()), - (streamId) => client.getStream(streamId).catch(this.onErrorEmit) - ) + this.msgCreationUtil = new MessageCreationUtil(this.client) } } async publish(...args) { - this.debug('publish()') + // wrap publish in error emitter return this._publish(...args).catch((err) => { this.onErrorEmit(err) throw err }) } - async _publish(streamObjectOrId, data, timestamp = new Date(), partitionKey = null) { + async _publish(streamObjectOrId, content, timestamp = new Date(), partitionKey = null) { + this.debug('publish()') if (this.client.session.isUnauthenticated()) { throw new Error('Need to be authenticated to publish.') } - // Validate streamObjectOrId - const streamId = getStreamId(streamObjectOrId) const timestampAsNumber = timestamp instanceof Date ? timestamp.getTime() : new Date(timestamp).getTime() - const [sessionToken, streamMessage] = await Promise.all([ - this.client.session.getSessionToken(), - this.msgCreationUtil.createStreamMessage(streamObjectOrId, data, timestampAsNumber, partitionKey), + // get session + generate stream message + // important: stream message call must be executed in publish() call order + // or sequencing will be broken. + // i.e. do not put async work before call to createStreamMessage + const [streamMessage, sessionToken] = await Promise.all([ + this.msgCreationUtil.createStreamMessage(streamObjectOrId, { + content, + timestamp: timestampAsNumber, + partitionKey + }), + this.client.session.getSessionToken(), // fetch in parallel ]) + if (this.signer) { + // optional + await this.signer.signStreamMessage(streamMessage) + } + const requestId = this.client.resender.resendUtil.generateRequestId() const request = new ControlLayer.PublishRequest({ streamMessage, requestId, sessionToken, }) + try { await this.client.send(request) } catch (err) { + const streamId = getStreamId(streamObjectOrId) throw new FailedToPublishError( streamId, - data, + content, err ) } + return request } - getPublisherId() { + async getPublisherId() { + if (this.client.session.isUnauthenticated()) { + throw new Error('Need to be authenticated to getPublisherId.') + } return this.msgCreationUtil.getPublisherId() } stop() { - if (this.msgCreationUtil) { - this.msgCreationUtil.stop() - } + if (!this.msgCreationUtil) { return } + this.msgCreationUtil.stop() } } diff --git a/src/rest/StreamEndpoints.js b/src/rest/StreamEndpoints.js index 6d4c45827..37c5d7533 100644 --- a/src/rest/StreamEndpoints.js +++ b/src/rest/StreamEndpoints.js @@ -127,6 +127,7 @@ export async function isStreamPublisher(streamId, ethAddress) { await authFetch(url, this.session) return true } catch (e) { + this.debug(e) if (e.response && e.response.status === 404) { return false } diff --git a/src/rest/authFetch.js b/src/rest/authFetch.js index 8d92c34d9..0ce5b6e35 100644 --- a/src/rest/authFetch.js +++ b/src/rest/authFetch.js @@ -1,5 +1,5 @@ import fetch from 'node-fetch' -import debugFactory from 'debug' +import Debug from 'debug' import AuthFetchError from '../errors/AuthFetchError' import { getVersionString } from '../utils' @@ -8,23 +8,30 @@ export const DEFAULT_HEADERS = { 'Streamr-Client': `streamr-client-javascript/${getVersionString()}`, } -const debug = debugFactory('StreamrClient:utils') +const debug = Debug('StreamrClient:utils:authfetch') + +let ID = 0 + +export default async function authFetch(url, session, opts, requireNewToken = false) { + ID += 1 + const timeStart = Date.now() + const id = ID -const authFetch = async (url, session, opts = {}, requireNewToken = false) => { const options = { ...opts, headers: { ...DEFAULT_HEADERS, - ...opts.headers, + ...(opts && opts.headers), } } + // add default 'Content-Type: application/json' header for all requests // including 0 body length POST calls if (!options.headers['Content-Type']) { options.headers['Content-Type'] = 'application/json' } - debug('authFetch: ', url, opts) + debug('%d %s >> %o', id, url, opts) const response = await fetch(url, { ...opts, @@ -35,6 +42,8 @@ const authFetch = async (url, session, opts = {}, requireNewToken = false) => { ...options.headers, }, }) + const timeEnd = Date.now() + debug('%d %s << %d %s %s %s', id, url, response.status, response.statusText, Debug.humanize(timeEnd - timeStart)) const body = await response.text() @@ -42,13 +51,14 @@ const authFetch = async (url, session, opts = {}, requireNewToken = false) => { try { return JSON.parse(body || '{}') } catch (e) { + debug('%d %s – failed to parse body: %s', id, url, e.stack) throw new AuthFetchError(e.message, response, body) } } else if ([400, 401].includes(response.status) && !requireNewToken) { + debug('%d %s – revalidating session') return authFetch(url, session, options, true) } else { - throw new AuthFetchError(`Request to ${url} returned with error code ${response.status}.`, response, body) + debug('%d %s – failed', id, url) + throw new AuthFetchError(`Request ${id} to ${url} returned with error code ${response.status}.`, response, body) } } - -export default authFetch diff --git a/src/utils.js b/src/utils.js index 7c778ca1c..3a9491bdb 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,5 +1,9 @@ import { v4 as uuidv4 } from 'uuid' import uniqueId from 'lodash.uniqueid' +import LRU from 'quick-lru' +import pMemoize from 'p-memoize' +import pLimit from 'p-limit' +import mem from 'mem' import pkg from '../package.json' @@ -35,3 +39,98 @@ export function waitFor(emitter, event) { emitter.once('error', onError) }) } + +/* eslint-disable object-curly-newline */ + +/** + * Returns a cached async fn, cached keyed on first argument passed. See documentation for mem/p-memoize. + * Caches into a LRU cache capped at options.maxSize + * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clear() is called. + * Won't cache rejections by default. Override with options.cachePromiseRejection = true. + * + * ```js + * const cachedAsyncFn = CacheAsyncFn(asyncFn, options) + * await cachedAsyncFn(key) + * await cachedAsyncFn(key) + * cachedAsyncFn.clear() + * ``` + */ + +export function CacheAsyncFn(asyncFn, { + maxSize = 10000, + maxAge = 30 * 60 * 1000, // 30 minutes + cachePromiseRejection = false, +} = {}) { + const cachedFn = pMemoize(asyncFn, { + maxAge, + cachePromiseRejection, + cache: new LRU({ + maxSize, + }) + }) + cachedFn.clear = () => pMemoize.clear(cachedFn) + return cachedFn +} + +/** + * Returns a cached fn, cached keyed on first argument passed. See documentation for mem. + * Caches into a LRU cache capped at options.maxSize + * Won't call fn again until options.maxAge or options.maxSize exceeded, or cachedFn.clear() is called. + * + * ```js + * const cachedFn = CacheFn(fn, options) + * cachedFn(key) + * cachedFn(key) + * cachedFn(...args) + * cachedFn.clear() + * ``` + */ + +export function CacheFn(fn, { + maxSize = 10000, + maxAge = 30 * 60 * 1000, // 30 minutes +} = {}) { + const cachedFn = mem(fn, { + maxAge, + cache: new LRU({ + maxSize, + }) + }) + cachedFn.clear = () => mem.clear(cachedFn) + return cachedFn +} + +/* eslint-enable object-curly-newline */ + +/** + * Returns a limit function that limits concurrency per-key. + * + * ```js + * const limit = LimitAsyncFnByKey(1) + * limit('channel1', fn) + * limit('channel2', fn) + * limit('channel2', fn) + * ``` + */ + +export function LimitAsyncFnByKey(limit) { + const pending = new Map() + const f = async (id, fn) => { + const limitFn = pending.get(id) || pending.set(id, pLimit(limit)).get(id) + try { + return await limitFn(fn) + } finally { + if (!limitFn.activeCount && !limitFn.pendingCount && pending.get(id) === limitFn) { + // clean up if no more active entries (if not cleared) + pending.delete(id) + } + } + } + f.clear = () => { + // note: does not cancel promises + pending.forEach((p) => p.clearQueue()) + pending.clear() + } + return f +} + diff --git a/test/benchmarks/publish.js b/test/benchmarks/publish.js index 3da517663..4a00e9ebc 100644 --- a/test/benchmarks/publish.js +++ b/test/benchmarks/publish.js @@ -63,8 +63,8 @@ async function run() { // eslint-disable-next-line no-console console.log('Disconnecting clients') await Promise.all([ - client1.ensureDisconnected(), - client2.ensureDisconnected(), + client1.disconnect(), + client2.disconnect(), ]) }) diff --git a/test/flakey/DataUnionEndpoints.test.js b/test/flakey/DataUnionEndpoints.test.js index 094168f6e..d39f1155b 100644 --- a/test/flakey/DataUnionEndpoints.test.js +++ b/test/flakey/DataUnionEndpoints.test.js @@ -38,7 +38,7 @@ describe('DataUnionEndPoints', () => { }, 10000) beforeEach(async () => { - await adminClient.ensureConnected() + await adminClient.connect() dataUnion = await adminClient.deployDataUnion({ provider: testProvider, }) @@ -51,7 +51,7 @@ describe('DataUnionEndPoints', () => { afterAll(async () => { if (!adminClient) { return } - await adminClient.ensureDisconnected() + await adminClient.disconnect() }) afterAll(async () => { @@ -100,12 +100,12 @@ describe('DataUnionEndPoints', () => { autoDisconnect: false, ...config.clientOptions, }) - await memberClient.ensureConnected() + await memberClient.connect() }) afterAll(async () => { if (!memberClient) { return } - await memberClient.ensureDisconnected() + await memberClient.disconnect() }) it('can join the dataUnion, and get their balances and stats, and check proof, and withdraw', async () => { @@ -201,7 +201,7 @@ describe('DataUnionEndPoints', () => { }) afterAll(async () => { if (!client) { return } - await client.ensureDisconnected() + await client.disconnect() }) it('can get dataUnion stats, member list, and member stats', async () => { diff --git a/test/integration/LoginEndpoints.test.js b/test/integration/LoginEndpoints.test.js index fb654e2ba..e7ab91c30 100644 --- a/test/integration/LoginEndpoints.test.js +++ b/test/integration/LoginEndpoints.test.js @@ -22,98 +22,86 @@ describe('LoginEndpoints', () => { }) afterAll(async (done) => { - await client.ensureDisconnected() + await client.disconnect() done() }) describe('Challenge generation', () => { - it('should retrieve a challenge', () => client.getChallenge('some-address') - .then((challenge) => { - assert(challenge) - assert(challenge.id) - assert(challenge.challenge) - assert(challenge.expires) - })) + it('should retrieve a challenge', async () => { + const challenge = await client.getChallenge('some-address') + assert(challenge) + assert(challenge.id) + assert(challenge.challenge) + assert(challenge.expires) + }) }) - async function assertThrowsAsync(fn, regExp) { - let f = () => {} - try { - await fn() - } catch (e) { - f = () => { - throw e - } - } finally { - assert.throws(f, regExp) - } - } - describe('Challenge response', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.sendChallengeResponse( - { + await expect(async () => { + await client.sendChallengeResponse({ id: 'some-id', challenge: 'some-challenge', - }, - 'some-sig', - 'some-address', - ), /Error/) + }, 'some-sig', 'some-address') + }).rejects.toThrow() }) - it('should get a session token', () => { + + it('should get a session token', async () => { const wallet = ethers.Wallet.createRandom() - return client.getChallenge(wallet.address) - .then(async (challenge) => { - assert(challenge.challenge) - const signature = await wallet.signMessage(challenge.challenge) - return client.sendChallengeResponse(challenge, signature, wallet.address) - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - }) - }) + const challenge = await client.getChallenge(wallet.address) + assert(challenge.challenge) + const signature = await wallet.signMessage(challenge.challenge) + const sessionToken = await client.sendChallengeResponse(challenge, signature, wallet.address) + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token with combined function', () => { + + it('should get a session token with combined function', async () => { const wallet = ethers.Wallet.createRandom() - return client.loginWithChallengeResponse((d) => wallet.signMessage(d), wallet.address) - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - }) + const sessionToken = await client.loginWithChallengeResponse((d) => wallet.signMessage(d), wallet.address) + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) }) describe('API key login', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.loginWithApiKey('apikey'), /Error/) + await expect(async () => { + await client.loginWithApiKey('apikey') + }).rejects.toThrow() + }) + + it('should get a session token', async () => { + const sessionToken = await client.loginWithApiKey('tester1-api-key') + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token', () => client.loginWithApiKey('tester1-api-key') - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - })) }) describe('Username/password login', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.loginWithUsernamePassword('username', 'password'), /Error/) + await expect(async () => { + await client.loginWithUsernamePassword('username', 'password') + }).rejects.toThrow() + }) + + it('should get a session token', async () => { + const sessionToken = await client.loginWithUsernamePassword('tester2@streamr.com', 'tester2') + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token', () => client.loginWithUsernamePassword('tester2@streamr.com', 'tester2') - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - })) }) describe('UserInfo', () => { - it('should get user info', () => client.getUserInfo().then((userInfo) => { + it('should get user info', async () => { + const userInfo = await client.getUserInfo() assert(userInfo.name) assert(userInfo.username) - })) + }) }) describe('logout', () => { diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 8dbf29912..05b6c294f 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -1,14 +1,14 @@ -import { ethers } from 'ethers' import { wait } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' +import Connection from '../../src/Connection' import config from './config' const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey() }, autoConnect: false, autoDisconnect: false, @@ -24,8 +24,8 @@ describe('PubSub with multiple clients', () => { let otherClient let privateKey - async function setup() { - privateKey = ethers.Wallet.createRandom().privateKey + beforeEach(async () => { + privateKey = fakePrivateKey() mainClient = createClient({ auth: { @@ -36,29 +36,25 @@ describe('PubSub with multiple clients', () => { stream = await mainClient.createStream({ name: uid('stream') }) - } + }) - async function teardown() { + afterEach(async () => { if (stream) { await stream.delete() - stream = undefined // eslint-disable-line require-atomic-updates } if (mainClient) { - await mainClient.ensureDisconnected() + await mainClient.disconnect() } if (otherClient) { - await otherClient.ensureDisconnected() + await otherClient.disconnect() } - } - - beforeEach(async () => { - await setup() - }) - afterEach(async () => { - await teardown() + const openSockets = Connection.getOpen() + if (openSockets !== 0) { + throw new Error(`sockets not closed: ${openSockets}`) + } }) test('can get messages published from other client', async (done) => { @@ -69,8 +65,8 @@ describe('PubSub with multiple clients', () => { }) otherClient.once('error', done) mainClient.once('error', done) - await otherClient.ensureConnected() - await mainClient.ensureConnected() + await otherClient.connect() + await mainClient.connect() const receivedMessagesOther = [] const receivedMessagesMain = [] diff --git a/test/integration/ResendReconnect.test.js b/test/integration/ResendReconnect.test.js index c324dac11..b1a524a9d 100644 --- a/test/integration/ResendReconnect.test.js +++ b/test/integration/ResendReconnect.test.js @@ -1,15 +1,13 @@ -import { ethers } from 'ethers' +import { wait } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import config from './config' -const { wait } = require('streamr-test-utils') - const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, @@ -29,7 +27,7 @@ describe('resend/reconnect', () => { beforeEach(async () => { client = createClient() - await client.ensureConnected() + await client.connect() publishedMessages = [] @@ -51,7 +49,7 @@ describe('resend/reconnect', () => { }, 10 * 1000) afterEach(async () => { - await client.ensureDisconnected() + await client.disconnect() }) describe('reconnect after resend', () => { diff --git a/test/integration/Resends.test.js b/test/integration/Resends.test.js index e980fc0f0..d7f2d2cc9 100644 --- a/test/integration/Resends.test.js +++ b/test/integration/Resends.test.js @@ -47,7 +47,7 @@ describe('StreamrClient resends', () => { }, 10 * 1000) afterEach(async () => { - await client.ensureDisconnected() + await client.disconnect() }) describe('issue resend and subscribe at the same time', () => { @@ -313,10 +313,10 @@ describe('StreamrClient resends', () => { } await wait(30000) - await client.ensureDisconnected() + await client.disconnect() // resend from LONG_RESEND messages - await client.ensureConnected() + await client.connect() const receivedMessages = [] const sub = await client.resend({ diff --git a/test/integration/Sequencing.test.js b/test/integration/Sequencing.test.js new file mode 100644 index 000000000..93a448eb9 --- /dev/null +++ b/test/integration/Sequencing.test.js @@ -0,0 +1,290 @@ +import { wait, waitForCondition, waitForEvent } from 'streamr-test-utils' + +import { uid, fakePrivateKey } from '../utils' +import StreamrClient from '../../src' +import Connection from '../../src/Connection' + +import config from './config' + +const Msg = (opts) => ({ + value: uid('msg'), + ...opts, +}) + +function toSeq(requests, ts = Date.now()) { + return requests.map((m) => { + const { prevMsgRef } = m.streamMessage + return [ + [m.streamMessage.getTimestamp() - ts, m.streamMessage.getSequenceNumber()], + prevMsgRef ? [prevMsgRef.timestamp - ts, prevMsgRef.sequenceNumber] : null + ] + }) +} + +describe('Sequencing', () => { + let expectErrors = 0 // check no errors by default + let onError = jest.fn() + let client + let stream + + const createClient = (opts = {}) => { + const c = new StreamrClient({ + auth: { + privateKey: fakePrivateKey(), + }, + autoConnect: false, + autoDisconnect: false, + maxRetries: 2, + ...config.clientOptions, + ...opts, + }) + c.onError = jest.fn() + c.on('error', onError) + return c + } + + beforeEach(async () => { + expectErrors = 0 + onError = jest.fn() + client = createClient() + await client.connect() + + stream = await client.createStream({ + name: uid('stream') + }) + }) + + afterEach(async () => { + await wait() + // ensure no unexpected errors + expect(onError).toHaveBeenCalledTimes(expectErrors) + if (client) { + expect(client.onError).toHaveBeenCalledTimes(expectErrors) + } + }) + + afterEach(async () => { + await wait() + if (client) { + client.debug('disconnecting after test') + await client.disconnect() + } + + const openSockets = Connection.getOpen() + if (openSockets !== 0) { + throw new Error(`sockets not closed: ${openSockets}`) + } + }) + + it('should sequence in order', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = () => { + const msg = Msg() + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // first 2 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + ]) + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[1, 0], [0, 1]], + [[1, 1], [1, 0]], + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 5000).catch(() => {}) // ignore, tests will fail anyway + + expect(msgsReceieved).toEqual(msgsPublished) + }, 10000) + + it('should sequence in order even if some calls delayed', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + let calls = 0 + const { clear } = client.publisher.msgCreationUtil.getPublisherId + const getPublisherId = client.publisher.msgCreationUtil.getPublisherId.bind(client.publisher.msgCreationUtil) + client.publisher.msgCreationUtil.getPublisherId = async (...args) => { + // delay getPublisher call + calls += 1 + if (calls === 2) { + const result = await getPublisherId(...args) + // delay resolving this call + await wait(100) + return result + } + return getPublisherId(...args) + } + client.publisher.msgCreationUtil.getPublisherId.clear = clear + + const nextMsg = () => { + const msg = Msg() + msgsPublished.push(msg) + return msg + } + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + const requests = await Promise.all([ + // first 2 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + ]) + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[1, 0], [0, 1]], + [[1, 1], [1, 0]], + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 5000).catch(() => {}) // ignore, tests will fail anyway + + expect(msgsReceieved).toEqual(msgsPublished) + }, 10000) + + it('should sequence in order even if publish requests backdated', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = (...args) => { + const msg = Msg(...args) + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // publish at ts + 0 + client.publish(stream, nextMsg(), ts), + // publish at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + // backdate at ts + 0 + client.publish(stream, nextMsg({ + backdated: true, + }), ts), + // resume at ts + 2 + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 3), + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 2000).catch(() => {}) // ignore, tests will fail anyway + + const msgsResent = [] + const sub = await client.resend({ + stream: stream.id, + resend: { + from: { + timestamp: 0 + }, + }, + }, (m) => msgsResent.push(m)) + await waitForEvent(sub, 'resent') + + expect(msgsReceieved).toEqual(msgsResent) + // backdated messages disappear + expect(msgsReceieved).toEqual(msgsPublished.filter(({ backdated }) => !backdated)) + + const seq = toSeq(requests, ts) + client.debug(seq) + expect(seq).toEqual([ + [[0, 0], null], + [[1, 0], [0, 0]], + [[0, 0], [1, 0]], // bad message + [[2, 0], [1, 0]], + [[2, 1], [2, 0]], + [[3, 0], [2, 1]], + ]) + }, 10000) + + it('should sequence in order even if publish requests backdated in sequence', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = (...args) => { + const msg = Msg(...args) + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // first 3 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + // backdate at ts + 0 + client.publish(stream, nextMsg({ + backdated: true, + }), ts), + // resume publishing at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 2), + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 2000).catch(() => {}) // ignore, tests will fail anyway + + const msgsResent = [] + const sub = await client.resend({ + stream: stream.id, + resend: { + from: { + timestamp: 0 + }, + }, + }, (m) => msgsResent.push(m)) + await waitForEvent(sub, 'resent') + + expect(msgsReceieved).toEqual(msgsResent) + // backdated messages disappear + expect(msgsReceieved).toEqual(msgsPublished.filter(({ backdated }) => !backdated)) + + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[0, 2], [0, 1]], + [[1, 0], [0, 2]], + [[1, 1], [1, 0]], + [[0, 0], [1, 1]], // bad message + [[1, 2], [1, 1]], + [[1, 3], [1, 2]], + [[2, 0], [1, 3]], + [[2, 1], [2, 0]], + ]) + }, 10000) +}) diff --git a/test/integration/Session.test.js b/test/integration/Session.test.js index 041ac0243..3a9c2af0e 100644 --- a/test/integration/Session.test.js +++ b/test/integration/Session.test.js @@ -1,6 +1,5 @@ -import { ethers } from 'ethers' - import StreamrClient from '../../src' +import { fakePrivateKey } from '../utils' import config from './config' @@ -37,7 +36,7 @@ describe('Session', () => { expect.assertions(1) await expect(createClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, }).session.getSessionToken()).resolves.toBeTruthy() }) diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index 6fdcf405f..59dad5841 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -117,8 +117,8 @@ describe('StreamEndpoints', () => { }) describe('Stream configuration', () => { - it.skip('Stream.detectFields', async () => { - await client.ensureConnected() + it('Stream.detectFields', async () => { + await client.connect() await client.publish(createdStream.id, { foo: 'bar', count: 0, @@ -139,7 +139,7 @@ describe('StreamEndpoints', () => { }, ], ) - await client.ensureDisconnected() + await client.disconnect() }, 15000) }) diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index 55d68e9e6..e71eb6993 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -4,9 +4,8 @@ import path from 'path' import fetch from 'node-fetch' import { ControlLayer, MessageLayer } from 'streamr-client-protocol' import { wait, waitForEvent } from 'streamr-test-utils' -import { ethers } from 'ethers' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import Connection from '../../src/Connection' @@ -25,7 +24,7 @@ describe('StreamrClient', () => { const createClient = (opts = {}) => { const c = new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, @@ -759,9 +758,6 @@ describe('StreamrClient', () => { client = createClient() await client.connect() stream = await createStream() - const publisherId = await client.getPublisherId() - const res = await client.isStreamPublisher(stream.id, publisherId.toLowerCase()) - expect(res).toBe(true) expect(onError).toHaveBeenCalledTimes(0) }) @@ -785,6 +781,12 @@ describe('StreamrClient', () => { } }) + it('is stream publisher', async () => { + const publisherId = await client.getPublisherId() + const res = await client.isStreamPublisher(stream.id, publisherId) + expect(res).toBe(true) + }) + describe('Pub/Sub', () => { it('client.publish does not error', async (done) => { await client.publish(stream.id, { diff --git a/test/integration/Subscription.test.js b/test/integration/Subscription.test.js index 17c7d6a5b..7f9284fb6 100644 --- a/test/integration/Subscription.test.js +++ b/test/integration/Subscription.test.js @@ -1,14 +1,13 @@ -import { ethers } from 'ethers' import { wait, waitForEvent } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import config from './config' const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, diff --git a/test/integration/authFetch.test.js b/test/integration/authFetch.test.js index 12caec830..1c8749049 100644 --- a/test/integration/authFetch.test.js +++ b/test/integration/authFetch.test.js @@ -1,9 +1,9 @@ jest.mock('node-fetch') -import { ethers } from 'ethers' import fetch from 'node-fetch' import StreamrClient from '../../src' +import { fakePrivateKey } from '../utils' import config from './config' @@ -27,7 +27,7 @@ describe('authFetch', () => { fetch.mockImplementation(realFetch) client = new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey() }, autoConnect: false, autoDisconnect: false, diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 0616e8787..dfb46b3ba 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -1,92 +1,110 @@ import sinon from 'sinon' import { ethers } from 'ethers' +import { wait } from 'streamr-test-utils' import { MessageLayer } from 'streamr-client-protocol' -import MessageCreationUtil from '../../src/MessageCreationUtil' +import { MessageCreationUtil, StreamPartitioner } from '../../src/Publisher' import Stream from '../../src/rest/domain/Stream' +// eslint-disable-next-line import/no-named-as-default-member +import StubbedStreamrClient from './StubbedStreamrClient' + const { StreamMessage, MessageID, MessageRef } = MessageLayer describe('MessageCreationUtil', () => { + let client + let msgCreationUtil const hashedUsername = '0x16F78A7D6317F102BBD95FC9A4F3FF2E3249287690B8BDAD6B7810F82B34ACE3'.toLowerCase() + const createClient = (opts = {}) => { + return new StubbedStreamrClient({ + auth: { + username: 'username', + }, + autoConnect: false, + autoDisconnect: false, + maxRetries: 2, + ...opts, + }) + } + + afterEach(async () => { + if (msgCreationUtil) { + msgCreationUtil.stop() + } + + if (client) { + await client.disconnect() + } + }) + describe('getPublisherId', () => { - it('uses address', async () => { + it('uses address for privateKey auth', async () => { const wallet = ethers.Wallet.createRandom() - const client = { - options: { - auth: { - privateKey: wallet.privateKey, - }, + client = createClient({ + auth: { + privateKey: wallet.privateKey, }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + }) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(wallet.address.toLowerCase()) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - apiKey: 'apiKey', - }, + it('uses hash of username for apiKey auth', async () => { + client = createClient({ + auth: { + apiKey: 'apiKey', }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + }) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - username: 'username', - }, - }, - getUserInfo: sinon.stub().resolves({ + it('uses hash of username for username auth', async () => { + client = createClient({ + auth: { username: 'username', - }), - } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + }, + }) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - sessionToken: 'session-token', - }, + it('uses hash of username for sessionToken auth', async () => { + client = createClient({ + auth: { + sessionToken: 'session-token', }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + }) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) }) - describe('partitioner', () => { + describe('StreamPartitioner', () => { + let streamPartitioner + + beforeAll(() => { + client = createClient() + }) + + beforeEach(() => { + streamPartitioner = new StreamPartitioner(client) + }) + it('should throw if partition count is not defined', () => { expect(() => { - new MessageCreationUtil().computeStreamPartition(undefined, 'foo') + streamPartitioner.computeStreamPartition(undefined, 'foo') }).toThrow() }) it('should always return partition 0 for all keys if partition count is 1', () => { for (let i = 0; i < 100; i++) { - expect(new MessageCreationUtil().computeStreamPartition(1, `foo${i}`)).toEqual(0) + expect(streamPartitioner.computeStreamPartition(1, `foo${i}`)).toEqual(0) } }) @@ -104,7 +122,7 @@ describe('MessageCreationUtil', () => { expect(correctResults.length).toEqual(keys.length) for (let i = 0; i < keys.length; i++) { - const partition = new MessageCreationUtil().computeStreamPartition(10, keys[i]) + const partition = streamPartitioner.computeStreamPartition(10, keys[i]) expect(correctResults[i]).toStrictEqual(partition) } }) @@ -115,84 +133,106 @@ describe('MessageCreationUtil', () => { foo: 'bar', } - const stream = new Stream(null, { - id: 'streamId', - partitions: 1, - }) - - let client - let msgCreationUtil + let stream - beforeEach(() => { - client = { - options: { - auth: { - username: 'username', - }, - }, - signer: { - signStreamMessage: (streamMessage) => { - /* eslint-disable no-param-reassign */ - streamMessage.signatureType = StreamMessage.SIGNATURE_TYPES.ETH - streamMessage.signature = 'signature' - /* eslint-enable no-param-reassign */ - return Promise.resolve() - }, - }, - getUserInfo: () => Promise.resolve({ + beforeAll(() => { + client = createClient({ + auth: { username: 'username', - }), - getStream: sinon.stub().resolves(stream), - } - msgCreationUtil = new MessageCreationUtil(client.options.auth, client.signer, client.getUserInfo(), client.getStream) + }, + }) }) - afterAll(() => { - msgCreationUtil.stop() + beforeEach(() => { + stream = new Stream(null, { + id: 'streamId', + partitions: 1, + }) + client.getStream = sinon.stub().resolves(stream) + msgCreationUtil = new MessageCreationUtil(client) }) function getStreamMessage(streamId, timestamp, sequenceNumber, prevMsgRef) { return new StreamMessage({ - messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainId), - prevMesssageRef: prevMsgRef, + messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainer.msgChainId), + prevMsgRef, content: pubMsg, messageType: StreamMessage.MESSAGE_TYPES.MESSAGE, encryptionType: StreamMessage.ENCRYPTION_TYPES.NONE, - signatureType: StreamMessage.SIGNATURE_TYPES.ETH, - signature: 'signature', }) } it('should create messages with increasing sequence numbers', async () => { const ts = Date.now() - const promises = [] let prevMsgRef = null for (let i = 0; i < 10; i++) { - /* eslint-disable no-loop-func */ - prevMsgRef = new MessageRef(ts, i) - promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts) - expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) + // eslint-disable-next-line no-await-in-loop + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, }) - /* eslint-enable no-loop-func */ + expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) + prevMsgRef = new MessageRef(ts, i) } - await Promise.all(promises) }) - it('should create messages with sequence number 0', async () => { + it('should create messages with sequence number 0 if different timestamp', async () => { const ts = Date.now() - const promises = [] let prevMsgRef = null for (let i = 0; i < 10; i++) { - prevMsgRef = new MessageRef(ts + i, i) - /* eslint-disable no-loop-func */ - promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts + i) - expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) + // eslint-disable-next-line no-await-in-loop + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts + i, }) - /* eslint-enable no-loop-func */ + expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) + prevMsgRef = new MessageRef(ts + i, 0) } - await Promise.all(promises) + }) + + it('should sequence in order even if async dependencies resolve out of order', async () => { + const ts = Date.now() + let calls = 0 + const getStreamPartitions = msgCreationUtil.partitioner.getStreamPartitions.bind(msgCreationUtil.partitioner) + msgCreationUtil.partitioner.getStreamPartitions = async (...args) => { + calls += 1 + if (calls === 2) { + const result = await getStreamPartitions(...args) + // delay resolving this call + await wait(100) + return result + } + return getStreamPartitions(...args) + } + + // simultaneously publish 3 messages, but the second item's dependencies will resolve late. + // this shouldn't affect the sequencing + const messages = await Promise.all([ + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }), + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }), + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + ]) + const sequenceNumbers = messages.map((m) => m.getSequenceNumber()) + expect(sequenceNumbers).toEqual([0, 1, 2]) + }) + + it('should handle old timestamps', async () => { + const ts = Date.now() + const streamMessage1 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + const streamMessage2 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts + 1, + }) + const streamMessage3 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + const sequenceNumbers = [streamMessage1, streamMessage2, streamMessage3].map((m) => m.getSequenceNumber()) + expect(sequenceNumbers).toEqual([0, 0, 0]) }) it('should publish messages with sequence number 0 (different streams)', async () => { @@ -206,23 +246,33 @@ describe('MessageCreationUtil', () => { partitions: 1, }) - const msg1 = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts) - const msg2 = await msgCreationUtil.createStreamMessage(stream2, pubMsg, ts) - const msg3 = await msgCreationUtil.createStreamMessage(stream3, pubMsg, ts) + const msg1 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts + }) + const msg2 = await msgCreationUtil.createStreamMessage(stream2, { + content: pubMsg, timestamp: ts + }) + const msg3 = await msgCreationUtil.createStreamMessage(stream3, { + content: pubMsg, timestamp: ts + }) expect(msg1).toEqual(getStreamMessage('streamId', ts, 0, null)) expect(msg2).toEqual(getStreamMessage('streamId2', ts, 0, null)) expect(msg3).toEqual(getStreamMessage('streamId3', ts, 0, null)) }) - it('should sign messages if signer is defined', async () => { - const msg1 = await msgCreationUtil.createStreamMessage(stream, pubMsg, Date.now()) + it.skip('should sign messages if signer is defined', async () => { + const msg1 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: Date.now() + }) expect(msg1.signature).toBe('signature') }) it('should create message from a stream id by fetching the stream', async () => { const ts = Date.now() - const streamMessage = await msgCreationUtil.createStreamMessage(stream.id, pubMsg, ts) + const streamMessage = await msgCreationUtil.createStreamMessage(stream.id, { + content: pubMsg, timestamp: ts + }) expect(streamMessage).toEqual(getStreamMessage(stream.id, ts, 0, null)) }) }) diff --git a/test/unit/StreamrClient.test.js b/test/unit/StreamrClient.test.js index 53825e652..492d6bbec 100644 --- a/test/unit/StreamrClient.test.js +++ b/test/unit/StreamrClient.test.js @@ -6,7 +6,6 @@ import { wait, waitForEvent } from 'streamr-test-utils' import FailedToPublishError from '../../src/errors/FailedToPublishError' import Subscription from '../../src/Subscription' import Connection from '../../src/Connection' -// import StreamrClient from '../../src/StreamrClient' import { uid } from '../utils' // eslint-disable-next-line import/no-named-as-default-member @@ -1233,7 +1232,7 @@ describe('StreamrClient', () => { describe('publish', () => { function getPublishRequest(content, streamId, timestamp, seqNum, prevMsgRef, requestId) { const { hashedUsername } = StubbedStreamrClient - const { msgChainId } = client.publisher.msgCreationUtil + const { msgChainId } = client.publisher.msgCreationUtil.msgChainer const messageId = new MessageID(streamId, 0, timestamp, seqNum, hashedUsername, msgChainId) const streamMessage = new StreamMessage({ messageId, diff --git a/test/unit/utils.test.js b/test/unit/utils.test.js index 3ff7b47ff..e0079f127 100644 --- a/test/unit/utils.test.js +++ b/test/unit/utils.test.js @@ -53,8 +53,8 @@ describe('utils', () => { session.getSessionToken = sinon.stub().resolves('invalid token') return authFetch(baseUrl + testUrl, session).catch((err) => { expect(session.getSessionToken.calledTwice).toBeTruthy() - expect(err.toString()).toEqual( - `Error: Request to ${baseUrl + testUrl} returned with error code 401. Unauthorized` + expect(err.toString()).toMatch( + `${baseUrl + testUrl} returned with error code 401. Unauthorized` ) expect(err.body).toEqual('Unauthorized') done() diff --git a/test/utils.js b/test/utils.js index 88ae9bee9..524b90c3d 100644 --- a/test/utils.js +++ b/test/utils.js @@ -1,3 +1,9 @@ +const crypto = require('crypto') + const uniqueId = require('lodash.uniqueid') export const uid = (prefix) => uniqueId(`p${process.pid}${prefix ? '-' + prefix : ''}`) + +export function fakePrivateKey() { + return crypto.randomBytes(32).toString('hex') +}