From d58d3b68e3d945ee28a361884567d01ab9ffefe0 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Tue, 15 Sep 2020 18:36:36 -0400 Subject: [PATCH 1/8] Refactor & co-locate publish code. --- src/MessageCreationUtil.js | 178 ----------------------- src/Publisher.js | 192 +++++++++++++++++++++++-- src/rest/StreamEndpoints.js | 1 + src/utils.js | 49 +++++++ test/integration/StreamrClient.test.js | 9 +- test/unit/MessageCreationUtil.test.js | 46 +++--- test/unit/StreamrClient.test.js | 2 +- 7 files changed, 264 insertions(+), 213 deletions(-) delete mode 100644 src/MessageCreationUtil.js diff --git a/src/MessageCreationUtil.js b/src/MessageCreationUtil.js deleted file mode 100644 index 17e7c2567..000000000 --- a/src/MessageCreationUtil.js +++ /dev/null @@ -1,178 +0,0 @@ -import crypto from 'crypto' - -import Receptacle from 'receptacle' -import randomstring from 'randomstring' -import { MessageLayer } from 'streamr-client-protocol' -import { ethers } from 'ethers' - -import Stream from './rest/domain/Stream' -import InvalidMessageTypeError from './errors/InvalidMessageTypeError' - -const { StreamMessage, MessageID, MessageRef } = MessageLayer - -export default class MessageCreationUtil { - constructor(auth, signer, getUserInfo, getStreamFunction) { - this.auth = auth - this._signer = signer - this.getUserInfo = getUserInfo - this.getStreamFunction = getStreamFunction - this.cachedStreams = new Receptacle({ - max: 10000, - }) - this.publishedStreams = {} - this.msgChainId = randomstring.generate(20) - this.cachedHashes = {} - } - - stop() { - this.cachedStreams.clear() - } - - async getUsername() { - if (!this.usernamePromise) { - // In the edge case where StreamrClient.auth.apiKey is an anonymous key, userInfo.id is that anonymous key - this.usernamePromise = this.getUserInfo().then((userInfo) => userInfo.username || userInfo.id) - } - return this.usernamePromise - } - - async getStream(streamId) { - if (!this.cachedStreams.get(streamId)) { - const streamPromise = this.getStreamFunction(streamId).then((stream) => ({ - id: stream.id, - partitions: stream.partitions, - })) - const success = this.cachedStreams.set(streamId, streamPromise, { - ttl: 30 * 60 * 1000, // 30 minutes - refresh: true, // reset ttl on access - }) - if (!success) { - console.warn(`Could not store stream with id ${streamId} in local cache.`) - return streamPromise - } - } - return this.cachedStreams.get(streamId) - } - - async getPublisherId() { - if (!this.publisherId) { - if (this.auth.privateKey !== undefined) { - this.publisherId = ethers.utils.computeAddress(this.auth.privateKey).toLowerCase() - } else if (this.auth.provider !== undefined) { - const provider = new ethers.providers.Web3Provider(this.auth.provider) - this.publisherId = provider.getSigner().address.toLowerCase() - } else if (this.auth.apiKey !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(await this.getUsername(), 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else if (this.auth.username !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(this.auth.username, 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else if (this.auth.sessionToken !== undefined) { - const hexString = ethers.utils.hexlify(Buffer.from(await this.getUsername(), 'utf8')) - this.publisherId = ethers.utils.sha256(hexString) - } else { - throw new Error('Need either "privateKey", "provider", "apiKey", "username"+"password" or "sessionToken" to derive the publisher Id.') - } - } - return this.publisherId - } - - getNextSequenceNumber(key, timestamp) { - if (timestamp !== this.getPrevTimestamp(key)) { - return 0 - } - return this.getPrevSequenceNumber(key) + 1 - } - - getPrevMsgRef(key) { - const prevTimestamp = this.getPrevTimestamp(key) - if (!prevTimestamp) { - return null - } - const prevSequenceNumber = this.getPrevSequenceNumber(key) - return new MessageRef(prevTimestamp, prevSequenceNumber) - } - - getPrevTimestamp(key) { - return this.publishedStreams[key].prevTimestamp - } - - getPrevSequenceNumber(key) { - return this.publishedStreams[key].prevSequenceNumber - } - - async createStreamMessage(streamObjectOrId, data, timestamp = Date.now(), partitionKey = null) { - // Validate data - if (typeof data !== 'object') { - throw new Error(`Message data must be an object! Was: ${data}`) - } - - const stream = (streamObjectOrId instanceof Stream) ? streamObjectOrId : await this.getStream(streamObjectOrId) - const streamPartition = this.computeStreamPartition(stream.partitions, partitionKey) - const publisherId = await this.getPublisherId() - const [messageId, prevMsgRef] = this.createMsgIdAndPrevRef(stream.id, streamPartition, timestamp, publisherId) - - const streamMessage = new StreamMessage({ - messageId, - prevMsgRef, - content: data, - messageType: StreamMessage.MESSAGE_TYPES.MESSAGE, - }) - - if (this._signer) { - await this._signer.signStreamMessage(streamMessage) - } - return streamMessage - } - - createMsgIdAndPrevRef(streamId, streamPartition, timestamp, publisherId) { - const key = streamId + streamPartition - if (!this.publishedStreams[key]) { - this.publishedStreams[key] = { - prevTimestamp: null, - prevSequenceNumber: 0, - } - } - - const sequenceNumber = this.getNextSequenceNumber(key, timestamp) - const messageId = new MessageID(streamId, streamPartition, timestamp, sequenceNumber, publisherId, this.msgChainId) - const prevMsgRef = this.getPrevMsgRef(key) - this.publishedStreams[key].prevTimestamp = timestamp - this.publishedStreams[key].prevSequenceNumber = sequenceNumber - return [messageId, prevMsgRef] - } - - createDefaultMsgIdAndPrevRef(streamId, publisherId) { - return this.createMsgIdAndPrevRef(streamId, 0, Date.now(), publisherId) - } - - static getErrorCodeFromError(error) { - if (error instanceof InvalidMessageTypeError) { - return 'INVALID_MESSAGE_TYPE' - } - return 'UNEXPECTED_ERROR' - } - - hash(stringToHash) { - if (this.cachedHashes[stringToHash] === undefined) { - this.cachedHashes[stringToHash] = crypto.createHash('md5').update(stringToHash).digest() - } - return this.cachedHashes[stringToHash] - } - - computeStreamPartition(partitionCount, partitionKey) { - if (!partitionCount) { - throw new Error('partitionCount is falsey!') - } else if (partitionCount === 1) { - // Fast common case - return 0 - } else if (partitionKey) { - const buffer = this.hash(partitionKey) - const intHash = buffer.readInt32LE() - return Math.abs(intHash) % partitionCount - } else { - // Fallback to random partition if no key - return Math.floor(Math.random() * partitionCount) - } - } -} diff --git a/src/Publisher.js b/src/Publisher.js index a13610454..3551736f1 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -1,10 +1,15 @@ -import once from 'once' -import { ControlLayer } from 'streamr-client-protocol' +import crypto from 'crypto' + +import { ControlLayer, MessageLayer } from 'streamr-client-protocol' +import randomstring from 'randomstring' +import { ethers } from 'ethers' import Signer from './Signer' import Stream from './rest/domain/Stream' import FailedToPublishError from './errors/FailedToPublishError' -import MessageCreationUtil from './MessageCreationUtil' +import { AsyncCacheMap, AsyncCacheFn } from './utils' + +const { StreamMessage, MessageID, MessageRef } = MessageLayer function getStreamId(streamObjectOrId) { if (streamObjectOrId instanceof Stream) { @@ -18,12 +23,168 @@ function getStreamId(streamObjectOrId) { throw new Error(`First argument must be a Stream object or the stream id! Was: ${streamObjectOrId}`) } +class MessageChainer { + constructor() { + this.msgChainId = randomstring.generate(20) + this.publishedStreams = {} + } + + create(streamId, streamPartition, timestamp, publisherId) { + const key = streamId + streamPartition + if (!this.publishedStreams[key]) { + this.publishedStreams[key] = { + prevTimestamp: null, + prevSequenceNumber: 0, + } + } + + const sequenceNumber = this.getNextSequenceNumber(key, timestamp) + const messageId = new MessageID(streamId, streamPartition, timestamp, sequenceNumber, publisherId, this.msgChainId) + const prevMsgRef = this.getPrevMsgRef(key) + this.publishedStreams[key].prevTimestamp = timestamp + this.publishedStreams[key].prevSequenceNumber = sequenceNumber + return [messageId, prevMsgRef] + } + + getPrevMsgRef(key) { + const prevTimestamp = this.getPrevTimestamp(key) + if (!prevTimestamp) { + return null + } + const prevSequenceNumber = this.getPrevSequenceNumber(key) + return new MessageRef(prevTimestamp, prevSequenceNumber) + } + + getNextSequenceNumber(key, timestamp) { + if (timestamp !== this.getPrevTimestamp(key)) { + return 0 + } + return this.getPrevSequenceNumber(key) + 1 + } + + getPrevTimestamp(key) { + return this.publishedStreams[key] && this.publishedStreams[key].prevTimestamp + } + + getPrevSequenceNumber(key) { + return this.publishedStreams[key].prevSequenceNumber + } +} + +export class MessageCreationUtil { + constructor(client) { + this.client = client + this.msgChainer = new MessageChainer() + + this.streamPartitionCache = new AsyncCacheMap(async (streamId) => { + const { partitions } = await this.client.getStream(streamId) + return partitions + }) + this.getUserInfo = AsyncCacheFn(this.getUserInfo.bind(this)) + + this.getPublisherId = AsyncCacheFn(this.getPublisherId.bind(this)) + this.cachedHashes = {} + } + + stop() { + this.msgChainer = new MessageChainer() + this.getUserInfo.stop() + this.getPublisherId.stop() + this.streamPartitionCache.stop() + } + + async getPublisherId() { + const { auth = {} } = this.client.options + if (auth.privateKey !== undefined) { + return ethers.utils.computeAddress(auth.privateKey).toLowerCase() + } + + if (auth.provider !== undefined) { + const provider = new ethers.providers.Web3Provider(auth.provider) + return provider.getSigner().address.toLowerCase() + } + + const username = auth.username || await this.getUsername() + + if (username !== undefined) { + const hexString = ethers.utils.hexlify(Buffer.from(username, 'utf8')) + return ethers.utils.sha256(hexString) + } + + throw new Error('Need either "privateKey", "provider", "apiKey", "username"+"password" or "sessionToken" to derive the publisher Id.') + } + + /* cached remote call */ + async getUserInfo() { + return this.client.getUserInfo() + } + + async getUsername() { + return this.getUserInfo().then((userInfo) => ( + userInfo.username + || userInfo.id // In the edge case where StreamrClient.auth.apiKey is an anonymous key, userInfo.id is that anonymous key + )) + } + + async getStreamPartitions(streamObjectOrId) { + if (streamObjectOrId && streamObjectOrId.partitions != null) { + return streamObjectOrId.partitions + } + const streamId = getStreamId(streamObjectOrId) + return this.streamPartitionCache.load(streamId) + } + + hash(stringToHash) { + if (this.cachedHashes[stringToHash] === undefined) { + this.cachedHashes[stringToHash] = crypto.createHash('md5').update(stringToHash).digest() + } + return this.cachedHashes[stringToHash] + } + + computeStreamPartition(partitionCount, partitionKey) { + if (!partitionCount) { + throw new Error('partitionCount is falsey!') + } else if (partitionCount === 1) { + // Fast common case + return 0 + } else if (partitionKey) { + const buffer = this.hash(partitionKey) + const intHash = buffer.readInt32LE() + return Math.abs(intHash) % partitionCount + } else { + // Fallback to random partition if no key + return Math.floor(Math.random() * partitionCount) + } + } + + async createStreamMessage(streamObjectOrId, { data, timestamp, partitionKey } = {}) { + // Validate data + if (typeof data !== 'object') { + throw new Error(`Message data must be an object! Was: ${data}`) + } + + const streamId = getStreamId(streamObjectOrId) + const [streamPartitions, publisherId] = await Promise.all([ + this.getStreamPartitions(streamObjectOrId), + this.getPublisherId(), + ]) + + const streamPartition = this.computeStreamPartition(streamPartitions, partitionKey) + const [messageId, prevMsgRef] = this.msgChainer.create(streamId, streamPartition, timestamp, publisherId) + + return new StreamMessage({ + messageId, + prevMsgRef, + content: data, + }) + } +} + export default class Publisher { constructor(client) { this.client = client this.debug = client.debug.extend('Publisher') - this.publishQueue = [] this.signer = Signer.createSigner({ ...client.options.auth, debug: client.debug, @@ -34,15 +195,13 @@ export default class Publisher { if (client.session.isUnauthenticated()) { this.msgCreationUtil = null } else { - this.msgCreationUtil = new MessageCreationUtil( - client.options.auth, this.signer, once(() => client.getUserInfo()), - (streamId) => client.getStream(streamId).catch(this.onErrorEmit) - ) + this.msgCreationUtil = new MessageCreationUtil(this.client) } } async publish(...args) { this.debug('publish()') + return this._publish(...args).catch((err) => { this.onErrorEmit(err) throw err @@ -53,15 +212,21 @@ export default class Publisher { if (this.client.session.isUnauthenticated()) { throw new Error('Need to be authenticated to publish.') } - // Validate streamObjectOrId - const streamId = getStreamId(streamObjectOrId) const timestampAsNumber = timestamp instanceof Date ? timestamp.getTime() : new Date(timestamp).getTime() const [sessionToken, streamMessage] = await Promise.all([ this.client.session.getSessionToken(), - this.msgCreationUtil.createStreamMessage(streamObjectOrId, data, timestampAsNumber, partitionKey), + this.msgCreationUtil.createStreamMessage(streamObjectOrId, { + data, + timestamp: timestampAsNumber, + partitionKey + }), ]) + if (this.signer) { + await this.signer.signStreamMessage(streamMessage) + } + const requestId = this.client.resender.resendUtil.generateRequestId() const request = new ControlLayer.PublishRequest({ streamMessage, @@ -71,6 +236,7 @@ export default class Publisher { try { await this.client.send(request) } catch (err) { + const streamId = getStreamId(streamObjectOrId) throw new FailedToPublishError( streamId, data, @@ -85,8 +251,6 @@ export default class Publisher { } stop() { - if (this.msgCreationUtil) { - this.msgCreationUtil.stop() - } + return this.msgCreationUtil.stop() } } diff --git a/src/rest/StreamEndpoints.js b/src/rest/StreamEndpoints.js index 6d4c45827..37c5d7533 100644 --- a/src/rest/StreamEndpoints.js +++ b/src/rest/StreamEndpoints.js @@ -127,6 +127,7 @@ export async function isStreamPublisher(streamId, ethAddress) { await authFetch(url, this.session) return true } catch (e) { + this.debug(e) if (e.response && e.response.status === 404) { return false } diff --git a/src/utils.js b/src/utils.js index 7c778ca1c..b779861f3 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,3 +1,4 @@ +import Receptacle from 'receptacle' import { v4 as uuidv4 } from 'uuid' import uniqueId from 'lodash.uniqueid' @@ -35,3 +36,51 @@ export function waitFor(emitter, event) { emitter.once('error', onError) }) } + +export class AsyncCacheMap { + /* eslint-disable object-curly-newline */ + constructor(fn, { + max = 10000, + ttl = 30 * 60 * 1000, // 30 minutes + refresh = true, // reset ttl on access + } = {}) { + /* eslint-disable-next-line object-curly-newline */ + this.ttl = ttl + this.refresh = refresh + this.fn = fn + this.cache = new Receptacle({ + max, + }) + } + + load(id, { ttl = this.ttl, refresh = this.refresh, } = {}) { + if (!this.cache.get(id)) { + const promise = this.fn(id) + const success = this.cache.set(id, promise, { + ttl, + refresh, + }) + if (!success) { + console.warn(`Could not store ${id} in local cache.`) + return promise + } + } + return this.cache.get(id) + } + + stop() { + this.cache.clear() + } +} + +export function AsyncCacheFn(fn, options) { + const cache = new AsyncCacheMap(fn, options) + const cacheFn = async (opts) => { + return cache.load('value', opts) + } + cacheFn.cache = cache + cacheFn.stop = () => { + return cache.stop() + } + return cacheFn +} diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index 55d68e9e6..5d8873bad 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -759,9 +759,6 @@ describe('StreamrClient', () => { client = createClient() await client.connect() stream = await createStream() - const publisherId = await client.getPublisherId() - const res = await client.isStreamPublisher(stream.id, publisherId.toLowerCase()) - expect(res).toBe(true) expect(onError).toHaveBeenCalledTimes(0) }) @@ -785,6 +782,12 @@ describe('StreamrClient', () => { } }) + it('is stream publisher', async () => { + const publisherId = await client.getPublisherId() + const res = await client.isStreamPublisher(stream.id, publisherId) + expect(res).toBe(true) + }) + describe('Pub/Sub', () => { it('client.publish does not error', async (done) => { await client.publish(stream.id, { diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 0616e8787..82eb18f46 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -2,7 +2,7 @@ import sinon from 'sinon' import { ethers } from 'ethers' import { MessageLayer } from 'streamr-client-protocol' -import MessageCreationUtil from '../../src/MessageCreationUtil' +import { MessageCreationUtil } from '../../src/Publisher' import Stream from '../../src/rest/domain/Stream' const { StreamMessage, MessageID, MessageRef } = MessageLayer @@ -23,7 +23,7 @@ describe('MessageCreationUtil', () => { username: 'username', }), } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(wallet.address.toLowerCase()) }) @@ -39,7 +39,7 @@ describe('MessageCreationUtil', () => { username: 'username', }), } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) @@ -55,7 +55,7 @@ describe('MessageCreationUtil', () => { username: 'username', }), } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) @@ -71,7 +71,7 @@ describe('MessageCreationUtil', () => { username: 'username', }), } - const msgCreationUtil = new MessageCreationUtil(client.options.auth, undefined, client.getUserInfo) + const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) @@ -144,7 +144,7 @@ describe('MessageCreationUtil', () => { }), getStream: sinon.stub().resolves(stream), } - msgCreationUtil = new MessageCreationUtil(client.options.auth, client.signer, client.getUserInfo(), client.getStream) + msgCreationUtil = new MessageCreationUtil(client) }) afterAll(() => { @@ -153,13 +153,11 @@ describe('MessageCreationUtil', () => { function getStreamMessage(streamId, timestamp, sequenceNumber, prevMsgRef) { return new StreamMessage({ - messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainId), + messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainer.msgChainId), prevMesssageRef: prevMsgRef, content: pubMsg, messageType: StreamMessage.MESSAGE_TYPES.MESSAGE, encryptionType: StreamMessage.ENCRYPTION_TYPES.NONE, - signatureType: StreamMessage.SIGNATURE_TYPES.ETH, - signature: 'signature', }) } @@ -171,7 +169,9 @@ describe('MessageCreationUtil', () => { /* eslint-disable no-loop-func */ prevMsgRef = new MessageRef(ts, i) promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts) + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: ts + }) expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) }) /* eslint-enable no-loop-func */ @@ -187,7 +187,9 @@ describe('MessageCreationUtil', () => { prevMsgRef = new MessageRef(ts + i, i) /* eslint-disable no-loop-func */ promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts + i) + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: ts + i + }) expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) }) /* eslint-enable no-loop-func */ @@ -206,23 +208,33 @@ describe('MessageCreationUtil', () => { partitions: 1, }) - const msg1 = await msgCreationUtil.createStreamMessage(stream, pubMsg, ts) - const msg2 = await msgCreationUtil.createStreamMessage(stream2, pubMsg, ts) - const msg3 = await msgCreationUtil.createStreamMessage(stream3, pubMsg, ts) + const msg1 = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: ts + }) + const msg2 = await msgCreationUtil.createStreamMessage(stream2, { + data: pubMsg, timestamp: ts + }) + const msg3 = await msgCreationUtil.createStreamMessage(stream3, { + data: pubMsg, timestamp: ts + }) expect(msg1).toEqual(getStreamMessage('streamId', ts, 0, null)) expect(msg2).toEqual(getStreamMessage('streamId2', ts, 0, null)) expect(msg3).toEqual(getStreamMessage('streamId3', ts, 0, null)) }) - it('should sign messages if signer is defined', async () => { - const msg1 = await msgCreationUtil.createStreamMessage(stream, pubMsg, Date.now()) + it.skip('should sign messages if signer is defined', async () => { + const msg1 = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: Date.now() + }) expect(msg1.signature).toBe('signature') }) it('should create message from a stream id by fetching the stream', async () => { const ts = Date.now() - const streamMessage = await msgCreationUtil.createStreamMessage(stream.id, pubMsg, ts) + const streamMessage = await msgCreationUtil.createStreamMessage(stream.id, { + data: pubMsg, timestamp: ts + }) expect(streamMessage).toEqual(getStreamMessage(stream.id, ts, 0, null)) }) }) diff --git a/test/unit/StreamrClient.test.js b/test/unit/StreamrClient.test.js index 53825e652..24014b52a 100644 --- a/test/unit/StreamrClient.test.js +++ b/test/unit/StreamrClient.test.js @@ -1233,7 +1233,7 @@ describe('StreamrClient', () => { describe('publish', () => { function getPublishRequest(content, streamId, timestamp, seqNum, prevMsgRef, requestId) { const { hashedUsername } = StubbedStreamrClient - const { msgChainId } = client.publisher.msgCreationUtil + const { msgChainId } = client.publisher.msgCreationUtil.msgChainer const messageId = new MessageID(streamId, 0, timestamp, seqNum, hashedUsername, msgChainId) const streamMessage = new StreamMessage({ messageId, From 80076e14e9b2096e906118ba94d93eb9e2f3cd92 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 14:56:37 -0400 Subject: [PATCH 2/8] Avoid using expensive ethers.Wallet.createRandom() calls in test. e.g. 1000x calls with ethers: 14s, randomBytes: 3.5ms. --- test/integration/MultipleClients.test.js | 26 ++--- test/integration/ResendReconnect.test.js | 8 +- test/integration/Session.test.js | 5 +- test/integration/StreamEndpoints.test.js | 2 +- test/integration/StreamrClient.test.js | 5 +- test/integration/Subscription.test.js | 5 +- test/integration/authFetch.test.js | 4 +- test/unit/MessageCreationUtil.test.js | 126 +++++++++++------------ test/unit/StreamrClient.test.js | 1 - test/utils.js | 6 ++ 10 files changed, 87 insertions(+), 101 deletions(-) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 8dbf29912..7ce858c25 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -1,14 +1,14 @@ -import { ethers } from 'ethers' import { wait } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' +import Connection from '../../src/Connection' import config from './config' const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey() }, autoConnect: false, autoDisconnect: false, @@ -24,8 +24,8 @@ describe('PubSub with multiple clients', () => { let otherClient let privateKey - async function setup() { - privateKey = ethers.Wallet.createRandom().privateKey + beforeEach(async () => { + privateKey = fakePrivateKey() mainClient = createClient({ auth: { @@ -36,12 +36,11 @@ describe('PubSub with multiple clients', () => { stream = await mainClient.createStream({ name: uid('stream') }) - } + }) - async function teardown() { + afterEach(async () => { if (stream) { await stream.delete() - stream = undefined // eslint-disable-line require-atomic-updates } if (mainClient) { @@ -51,14 +50,11 @@ describe('PubSub with multiple clients', () => { if (otherClient) { await otherClient.ensureDisconnected() } - } - - beforeEach(async () => { - await setup() - }) - afterEach(async () => { - await teardown() + const openSockets = Connection.getOpen() + if (openSockets !== 0) { + throw new Error(`sockets not closed: ${openSockets}`) + } }) test('can get messages published from other client', async (done) => { diff --git a/test/integration/ResendReconnect.test.js b/test/integration/ResendReconnect.test.js index c324dac11..c79cd9e86 100644 --- a/test/integration/ResendReconnect.test.js +++ b/test/integration/ResendReconnect.test.js @@ -1,15 +1,13 @@ -import { ethers } from 'ethers' +import { wait } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import config from './config' -const { wait } = require('streamr-test-utils') - const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, diff --git a/test/integration/Session.test.js b/test/integration/Session.test.js index 041ac0243..3a9c2af0e 100644 --- a/test/integration/Session.test.js +++ b/test/integration/Session.test.js @@ -1,6 +1,5 @@ -import { ethers } from 'ethers' - import StreamrClient from '../../src' +import { fakePrivateKey } from '../utils' import config from './config' @@ -37,7 +36,7 @@ describe('Session', () => { expect.assertions(1) await expect(createClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, }).session.getSessionToken()).resolves.toBeTruthy() }) diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index 6fdcf405f..3e383cfb2 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -117,7 +117,7 @@ describe('StreamEndpoints', () => { }) describe('Stream configuration', () => { - it.skip('Stream.detectFields', async () => { + it('Stream.detectFields', async () => { await client.ensureConnected() await client.publish(createdStream.id, { foo: 'bar', diff --git a/test/integration/StreamrClient.test.js b/test/integration/StreamrClient.test.js index 5d8873bad..e71eb6993 100644 --- a/test/integration/StreamrClient.test.js +++ b/test/integration/StreamrClient.test.js @@ -4,9 +4,8 @@ import path from 'path' import fetch from 'node-fetch' import { ControlLayer, MessageLayer } from 'streamr-client-protocol' import { wait, waitForEvent } from 'streamr-test-utils' -import { ethers } from 'ethers' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import Connection from '../../src/Connection' @@ -25,7 +24,7 @@ describe('StreamrClient', () => { const createClient = (opts = {}) => { const c = new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, diff --git a/test/integration/Subscription.test.js b/test/integration/Subscription.test.js index 17c7d6a5b..7f9284fb6 100644 --- a/test/integration/Subscription.test.js +++ b/test/integration/Subscription.test.js @@ -1,14 +1,13 @@ -import { ethers } from 'ethers' import { wait, waitForEvent } from 'streamr-test-utils' -import { uid } from '../utils' +import { uid, fakePrivateKey } from '../utils' import StreamrClient from '../../src' import config from './config' const createClient = (opts = {}) => new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey(), }, autoConnect: false, autoDisconnect: false, diff --git a/test/integration/authFetch.test.js b/test/integration/authFetch.test.js index 12caec830..1c8749049 100644 --- a/test/integration/authFetch.test.js +++ b/test/integration/authFetch.test.js @@ -1,9 +1,9 @@ jest.mock('node-fetch') -import { ethers } from 'ethers' import fetch from 'node-fetch' import StreamrClient from '../../src' +import { fakePrivateKey } from '../utils' import config from './config' @@ -27,7 +27,7 @@ describe('authFetch', () => { fetch.mockImplementation(realFetch) client = new StreamrClient({ auth: { - privateKey: ethers.Wallet.createRandom().privateKey, + privateKey: fakePrivateKey() }, autoConnect: false, autoDisconnect: false, diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 82eb18f46..1e5252273 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -5,72 +5,67 @@ import { MessageLayer } from 'streamr-client-protocol' import { MessageCreationUtil } from '../../src/Publisher' import Stream from '../../src/rest/domain/Stream' +// eslint-disable-next-line import/no-named-as-default-member +import StubbedStreamrClient from './StubbedStreamrClient' + const { StreamMessage, MessageID, MessageRef } = MessageLayer describe('MessageCreationUtil', () => { const hashedUsername = '0x16F78A7D6317F102BBD95FC9A4F3FF2E3249287690B8BDAD6B7810F82B34ACE3'.toLowerCase() + const createClient = (opts = {}) => { + return new StubbedStreamrClient({ + auth: { + username: 'username', + }, + autoConnect: false, + autoDisconnect: false, + maxRetries: 2, + ...opts, + }) + } + describe('getPublisherId', () => { - it('uses address', async () => { + it('uses address for privateKey auth', async () => { const wallet = ethers.Wallet.createRandom() - const client = { - options: { - auth: { - privateKey: wallet.privateKey, - }, + const client = createClient({ + auth: { + privateKey: wallet.privateKey, }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } + }) const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(wallet.address.toLowerCase()) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - apiKey: 'apiKey', - }, + it('uses hash of username for apiKey auth', async () => { + const client = createClient({ + auth: { + apiKey: 'apiKey', }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } + }) const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - username: 'username', - }, - }, - getUserInfo: sinon.stub().resolves({ + it('uses hash of username for username auth', async () => { + const client = createClient({ + auth: { username: 'username', - }), - } + }, + }) const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) - it('uses hash of username', async () => { - const client = { - options: { - auth: { - sessionToken: 'session-token', - }, + it('uses hash of username for sessionToken auth', async () => { + const client = createClient({ + auth: { + sessionToken: 'session-token', }, - getUserInfo: sinon.stub().resolves({ - username: 'username', - }), - } + }) const msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) @@ -78,15 +73,21 @@ describe('MessageCreationUtil', () => { }) describe('partitioner', () => { + let client + + beforeAll(() => { + client = createClient() + }) + it('should throw if partition count is not defined', () => { expect(() => { - new MessageCreationUtil().computeStreamPartition(undefined, 'foo') + new MessageCreationUtil(client).computeStreamPartition(undefined, 'foo') }).toThrow() }) it('should always return partition 0 for all keys if partition count is 1', () => { for (let i = 0; i < 100; i++) { - expect(new MessageCreationUtil().computeStreamPartition(1, `foo${i}`)).toEqual(0) + expect(new MessageCreationUtil(client).computeStreamPartition(1, `foo${i}`)).toEqual(0) } }) @@ -104,7 +105,7 @@ describe('MessageCreationUtil', () => { expect(correctResults.length).toEqual(keys.length) for (let i = 0; i < keys.length; i++) { - const partition = new MessageCreationUtil().computeStreamPartition(10, keys[i]) + const partition = new MessageCreationUtil(client).computeStreamPartition(10, keys[i]) expect(correctResults[i]).toStrictEqual(partition) } }) @@ -115,35 +116,24 @@ describe('MessageCreationUtil', () => { foo: 'bar', } - const stream = new Stream(null, { - id: 'streamId', - partitions: 1, - }) - let client let msgCreationUtil + let stream - beforeEach(() => { - client = { - options: { - auth: { - username: 'username', - }, - }, - signer: { - signStreamMessage: (streamMessage) => { - /* eslint-disable no-param-reassign */ - streamMessage.signatureType = StreamMessage.SIGNATURE_TYPES.ETH - streamMessage.signature = 'signature' - /* eslint-enable no-param-reassign */ - return Promise.resolve() - }, - }, - getUserInfo: () => Promise.resolve({ + beforeAll(() => { + client = createClient({ + auth: { username: 'username', - }), - getStream: sinon.stub().resolves(stream), - } + }, + }) + }) + + beforeEach(() => { + stream = new Stream(null, { + id: 'streamId', + partitions: 1, + }) + client.getStream = sinon.stub().resolves(stream) msgCreationUtil = new MessageCreationUtil(client) }) diff --git a/test/unit/StreamrClient.test.js b/test/unit/StreamrClient.test.js index 24014b52a..492d6bbec 100644 --- a/test/unit/StreamrClient.test.js +++ b/test/unit/StreamrClient.test.js @@ -6,7 +6,6 @@ import { wait, waitForEvent } from 'streamr-test-utils' import FailedToPublishError from '../../src/errors/FailedToPublishError' import Subscription from '../../src/Subscription' import Connection from '../../src/Connection' -// import StreamrClient from '../../src/StreamrClient' import { uid } from '../utils' // eslint-disable-next-line import/no-named-as-default-member diff --git a/test/utils.js b/test/utils.js index 88ae9bee9..524b90c3d 100644 --- a/test/utils.js +++ b/test/utils.js @@ -1,3 +1,9 @@ +const crypto = require('crypto') + const uniqueId = require('lodash.uniqueid') export const uid = (prefix) => uniqueId(`p${process.pid}${prefix ? '-' + prefix : ''}`) + +export function fakePrivateKey() { + return crypto.randomBytes(32).toString('hex') +} From b2d79a03ead43d4c183de3b6b54818fe5cc52a39 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 14:57:32 -0400 Subject: [PATCH 3/8] Ensure messageCreationUtil is cleaned up after test. --- src/Publisher.js | 7 +++-- test/unit/MessageCreationUtil.test.js | 41 ++++++++++++++++----------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/Publisher.js b/src/Publisher.js index 3551736f1..92b734c6e 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -81,7 +81,6 @@ export class MessageCreationUtil { return partitions }) this.getUserInfo = AsyncCacheFn(this.getUserInfo.bind(this)) - this.getPublisherId = AsyncCacheFn(this.getPublisherId.bind(this)) this.cachedHashes = {} } @@ -247,10 +246,14 @@ export default class Publisher { } getPublisherId() { + if (this.client.session.isUnauthenticated()) { + throw new Error('Need to be authenticated to getPublisherId.') + } return this.msgCreationUtil.getPublisherId() } stop() { - return this.msgCreationUtil.stop() + if (!this.msgCreationUtil) { return } + this.msgCreationUtil.stop() } } diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 1e5252273..0a5f9820b 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -11,6 +11,8 @@ import StubbedStreamrClient from './StubbedStreamrClient' const { StreamMessage, MessageID, MessageRef } = MessageLayer describe('MessageCreationUtil', () => { + let client + let msgCreationUtil const hashedUsername = '0x16F78A7D6317F102BBD95FC9A4F3FF2E3249287690B8BDAD6B7810F82B34ACE3'.toLowerCase() const createClient = (opts = {}) => { @@ -25,69 +27,78 @@ describe('MessageCreationUtil', () => { }) } + afterEach(async () => { + msgCreationUtil.stop() + if (client) { + await client.disconnect() + } + }) + describe('getPublisherId', () => { it('uses address for privateKey auth', async () => { const wallet = ethers.Wallet.createRandom() - const client = createClient({ + client = createClient({ auth: { privateKey: wallet.privateKey, }, }) - const msgCreationUtil = new MessageCreationUtil(client) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(wallet.address.toLowerCase()) }) it('uses hash of username for apiKey auth', async () => { - const client = createClient({ + client = createClient({ auth: { apiKey: 'apiKey', }, }) - const msgCreationUtil = new MessageCreationUtil(client) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) it('uses hash of username for username auth', async () => { - const client = createClient({ + client = createClient({ auth: { username: 'username', }, }) - const msgCreationUtil = new MessageCreationUtil(client) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) it('uses hash of username for sessionToken auth', async () => { - const client = createClient({ + client = createClient({ auth: { sessionToken: 'session-token', }, }) - const msgCreationUtil = new MessageCreationUtil(client) + msgCreationUtil = new MessageCreationUtil(client) const publisherId = await msgCreationUtil.getPublisherId() expect(publisherId).toBe(hashedUsername) }) }) describe('partitioner', () => { - let client - beforeAll(() => { client = createClient() }) + beforeEach(() => { + msgCreationUtil = new MessageCreationUtil(client) + }) + it('should throw if partition count is not defined', () => { expect(() => { - new MessageCreationUtil(client).computeStreamPartition(undefined, 'foo') + msgCreationUtil.computeStreamPartition(undefined, 'foo') }).toThrow() }) it('should always return partition 0 for all keys if partition count is 1', () => { for (let i = 0; i < 100; i++) { - expect(new MessageCreationUtil(client).computeStreamPartition(1, `foo${i}`)).toEqual(0) + expect(msgCreationUtil.computeStreamPartition(1, `foo${i}`)).toEqual(0) } }) @@ -105,7 +116,7 @@ describe('MessageCreationUtil', () => { expect(correctResults.length).toEqual(keys.length) for (let i = 0; i < keys.length; i++) { - const partition = new MessageCreationUtil(client).computeStreamPartition(10, keys[i]) + const partition = msgCreationUtil.computeStreamPartition(10, keys[i]) expect(correctResults[i]).toStrictEqual(partition) } }) @@ -116,8 +127,6 @@ describe('MessageCreationUtil', () => { foo: 'bar', } - let client - let msgCreationUtil let stream beforeAll(() => { @@ -137,7 +146,7 @@ describe('MessageCreationUtil', () => { msgCreationUtil = new MessageCreationUtil(client) }) - afterAll(() => { + afterEach(() => { msgCreationUtil.stop() }) From 8b896b8f48ba0081b49c7b8e46f2de8f7121269a Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 15:47:05 -0400 Subject: [PATCH 4/8] Fix non-functional MessageCreationUtil test. --- test/unit/MessageCreationUtil.test.js | 34 ++++++++++----------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 0a5f9820b..05ac05754 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -153,7 +153,7 @@ describe('MessageCreationUtil', () => { function getStreamMessage(streamId, timestamp, sequenceNumber, prevMsgRef) { return new StreamMessage({ messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainer.msgChainId), - prevMesssageRef: prevMsgRef, + prevMsgRef, content: pubMsg, messageType: StreamMessage.MESSAGE_TYPES.MESSAGE, encryptionType: StreamMessage.ENCRYPTION_TYPES.NONE, @@ -162,38 +162,28 @@ describe('MessageCreationUtil', () => { it('should create messages with increasing sequence numbers', async () => { const ts = Date.now() - const promises = [] let prevMsgRef = null for (let i = 0; i < 10; i++) { - /* eslint-disable no-loop-func */ - prevMsgRef = new MessageRef(ts, i) - promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: ts - }) - expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) + // eslint-disable-next-line no-await-in-loop + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: ts, }) - /* eslint-enable no-loop-func */ + expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) + prevMsgRef = new MessageRef(ts, i) } - await Promise.all(promises) }) - it('should create messages with sequence number 0', async () => { + it('should create messages with sequence number 0 if different timestamp', async () => { const ts = Date.now() - const promises = [] let prevMsgRef = null for (let i = 0; i < 10; i++) { - prevMsgRef = new MessageRef(ts + i, i) - /* eslint-disable no-loop-func */ - promises.push(async () => { - const streamMessage = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: ts + i - }) - expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) + // eslint-disable-next-line no-await-in-loop + const streamMessage = await msgCreationUtil.createStreamMessage(stream, { + data: pubMsg, timestamp: ts + i, }) - /* eslint-enable no-loop-func */ + expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) + prevMsgRef = new MessageRef(ts + i, 0) } - await Promise.all(promises) }) it('should publish messages with sequence number 0 (different streams)', async () => { From eed310c38c8f0879cc59d0c513570de985af7f50 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 15:57:00 -0400 Subject: [PATCH 5/8] Swap out receptacle for more flexible mem/p-memoize/quick-lru. --- package-lock.json | 58 ++++++++++++++--- package.json | 4 +- src/Publisher.js | 94 ++++++++++++--------------- src/utils.js | 74 +++++++++------------ test/unit/MessageCreationUtil.test.js | 9 ++- 5 files changed, 129 insertions(+), 110 deletions(-) diff --git a/package-lock.json b/package-lock.json index ad08949e7..262031ea2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11039,6 +11039,14 @@ "tmpl": "1.0.x" } }, + "map-age-cleaner": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/map-age-cleaner/-/map-age-cleaner-0.1.3.tgz", + "integrity": "sha512-bJzx6nMoP6PDLPBFmg7+xRKeFZvFboMrGlxmNj9ClvX53KrmvM5bXFXEWjbz4cz1AFn+jWJ9z/DJSz7hrs0w3w==", + "requires": { + "p-defer": "^1.0.0" + } + }, "map-cache": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/map-cache/-/map-cache-0.2.2.tgz", @@ -11071,6 +11079,22 @@ "integrity": "sha1-hxDXrwqmJvj/+hzgAWhUUmMlV0g=", "dev": true }, + "mem": { + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/mem/-/mem-6.1.1.tgz", + "integrity": "sha512-Ci6bIfq/UgcxPTYa8dQQ5FY3BzKkT894bwXWXxC/zqs0XgMO2cT20CGkOqda7gZNkmK5VP4x89IGZ6K7hfbn3Q==", + "requires": { + "map-age-cleaner": "^0.1.3", + "mimic-fn": "^3.0.0" + }, + "dependencies": { + "mimic-fn": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz", + "integrity": "sha512-Ysbi9uYW9hFyfrThdDEQuykN4Ey6BuwPD2kpI5ES/nFTDn/98yxYNLZJcgUAKPT/mcrLLKaGzJR9YVxJrIdASQ==" + } + } + }, "memory-fs": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/memory-fs/-/memory-fs-0.4.1.tgz", @@ -12120,6 +12144,11 @@ "integrity": "sha1-hUNzx/XCMVkU/Jv8a9gjj92h7Cc=", "dev": true }, + "p-defer": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", + "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=" + }, "p-each-series": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-each-series/-/p-each-series-2.1.0.tgz", @@ -12168,6 +12197,22 @@ "aggregate-error": "^3.0.0" } }, + "p-memoize": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-memoize/-/p-memoize-4.0.0.tgz", + "integrity": "sha512-oMxCJKVS75Bf2RWtXJNQNaX2K1G0FYpllOh2iTsPXZqnf9dWMcis3BL+pRdLeQY8lIdwwL01k/UV5LBdcVhZzg==", + "requires": { + "mem": "^6.0.1", + "mimic-fn": "^3.0.0" + }, + "dependencies": { + "mimic-fn": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz", + "integrity": "sha512-Ysbi9uYW9hFyfrThdDEQuykN4Ey6BuwPD2kpI5ES/nFTDn/98yxYNLZJcgUAKPT/mcrLLKaGzJR9YVxJrIdASQ==" + } + } + }, "p-timeout": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-2.0.1.tgz", @@ -12691,6 +12736,11 @@ "integrity": "sha1-nsYfeQSYdXB9aUFFlv2Qek1xHnM=", "dev": true }, + "quick-lru": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/quick-lru/-/quick-lru-5.1.1.tgz", + "integrity": "sha512-WuyALRjWPDGtt/wzJiadO5AXY+8hZ80hVpe6MyivgraREW751X3SbhRvG3eLKOYN+8VEvqLcf3wdnt44Z4S4SA==" + }, "randombytes": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz", @@ -13181,14 +13231,6 @@ } } }, - "receptacle": { - "version": "1.3.2", - "resolved": "https://registry.npmjs.org/receptacle/-/receptacle-1.3.2.tgz", - "integrity": "sha512-HrsFvqZZheusncQRiEE7GatOAETrARKV/lnfYicIm8lbvp/JQOdADOfhjBd2DajvoszEyxSM6RlAAIZgEoeu/A==", - "requires": { - "ms": "^2.1.1" - } - }, "regenerate": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/regenerate/-/regenerate-1.4.1.tgz", diff --git a/package.json b/package.json index 7c16e816a..5aaa18963 100644 --- a/package.json +++ b/package.json @@ -70,13 +70,15 @@ "ethers": "^5.0.12", "eventemitter3": "^4.0.7", "lodash.uniqueid": "^4.0.1", + "mem": "^6.1.1", "node-fetch": "^2.6.1", "node-webcrypto-ossl": "^2.1.1", "once": "^1.4.0", + "p-memoize": "^4.0.0", "promise-memoize": "^1.2.1", "qs": "^6.9.4", + "quick-lru": "^5.1.1", "randomstring": "^1.1.5", - "receptacle": "^1.3.2", "streamr-client-protocol": "^5.1.0", "uuid": "^8.3.0", "webpack-node-externals": "^2.5.2", diff --git a/src/Publisher.js b/src/Publisher.js index 92b734c6e..939c94c58 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -7,7 +7,7 @@ import { ethers } from 'ethers' import Signer from './Signer' import Stream from './rest/domain/Stream' import FailedToPublishError from './errors/FailedToPublishError' -import { AsyncCacheMap, AsyncCacheFn } from './utils' +import { CacheAsyncFn, CacheFn } from './utils' const { StreamMessage, MessageID, MessageRef } = MessageLayer @@ -23,77 +23,62 @@ function getStreamId(streamObjectOrId) { throw new Error(`First argument must be a Stream object or the stream id! Was: ${streamObjectOrId}`) } -class MessageChainer { +function hash(stringToHash) { + return crypto.createHash('md5').update(stringToHash).digest() +} + +class OrderedMessageChainCreator { constructor() { this.msgChainId = randomstring.generate(20) - this.publishedStreams = {} + this.messageRefs = new Map() } - create(streamId, streamPartition, timestamp, publisherId) { - const key = streamId + streamPartition - if (!this.publishedStreams[key]) { - this.publishedStreams[key] = { - prevTimestamp: null, - prevSequenceNumber: 0, - } - } - + create({ streamId, streamPartition, timestamp, publisherId, }) { + const key = `${streamId}|${streamPartition}` + const prevMsgRef = this.messageRefs.get(key) const sequenceNumber = this.getNextSequenceNumber(key, timestamp) const messageId = new MessageID(streamId, streamPartition, timestamp, sequenceNumber, publisherId, this.msgChainId) - const prevMsgRef = this.getPrevMsgRef(key) - this.publishedStreams[key].prevTimestamp = timestamp - this.publishedStreams[key].prevSequenceNumber = sequenceNumber + this.messageRefs.set(key, new MessageRef(timestamp, sequenceNumber)) return [messageId, prevMsgRef] } - getPrevMsgRef(key) { - const prevTimestamp = this.getPrevTimestamp(key) - if (!prevTimestamp) { - return null - } - const prevSequenceNumber = this.getPrevSequenceNumber(key) - return new MessageRef(prevTimestamp, prevSequenceNumber) - } - getNextSequenceNumber(key, timestamp) { - if (timestamp !== this.getPrevTimestamp(key)) { + if (!this.messageRefs.has(key)) { return 0 } + const prev = this.messageRefs.get(key) + if (timestamp !== prev.timestamp) { return 0 } - return this.getPrevSequenceNumber(key) + 1 + return prev.sequenceNumber + 1 } - getPrevTimestamp(key) { - return this.publishedStreams[key] && this.publishedStreams[key].prevTimestamp - } - - getPrevSequenceNumber(key) { - return this.publishedStreams[key].prevSequenceNumber + clear() { + this.messageRefs.clear() } } export class MessageCreationUtil { constructor(client) { this.client = client - this.msgChainer = new MessageChainer() + const cacheOptions = client.options.cache + this.msgChainer = new OrderedMessageChainCreator(cacheOptions) - this.streamPartitionCache = new AsyncCacheMap(async (streamId) => { - const { partitions } = await this.client.getStream(streamId) - return partitions - }) - this.getUserInfo = AsyncCacheFn(this.getUserInfo.bind(this)) - this.getPublisherId = AsyncCacheFn(this.getPublisherId.bind(this)) - this.cachedHashes = {} + this._getStreamPartitions = CacheAsyncFn(this._getStreamPartitions.bind(this), cacheOptions) + this.getUserInfo = CacheAsyncFn(this.getUserInfo.bind(this), cacheOptions) + this.getPublisherId = CacheAsyncFn(this.getPublisherId.bind(this), cacheOptions) + this.hash = CacheFn(hash, cacheOptions) } stop() { - this.msgChainer = new MessageChainer() - this.getUserInfo.stop() - this.getPublisherId.stop() - this.streamPartitionCache.stop() + this.msgChainer.clear() + this.msgChainer = new OrderedMessageChainCreator() + this.getUserInfo.clear() + this.getPublisherId.clear() + this._getStreamPartitions.clear() + this.hash.clear() } async getPublisherId() { - const { auth = {} } = this.client.options + const { options: { auth = {} } = {} } = this.client if (auth.privateKey !== undefined) { return ethers.utils.computeAddress(auth.privateKey).toLowerCase() } @@ -129,15 +114,15 @@ export class MessageCreationUtil { if (streamObjectOrId && streamObjectOrId.partitions != null) { return streamObjectOrId.partitions } + + // get streamId here so caching based on id works const streamId = getStreamId(streamObjectOrId) - return this.streamPartitionCache.load(streamId) + return this._getStreamPartitions(streamId) } - hash(stringToHash) { - if (this.cachedHashes[stringToHash] === undefined) { - this.cachedHashes[stringToHash] = crypto.createHash('md5').update(stringToHash).digest() - } - return this.cachedHashes[stringToHash] + async _getStreamPartitions(streamId) { + const { partitions } = await this.client.getStream(streamId) + return partitions } computeStreamPartition(partitionCount, partitionKey) { @@ -169,7 +154,12 @@ export class MessageCreationUtil { ]) const streamPartition = this.computeStreamPartition(streamPartitions, partitionKey) - const [messageId, prevMsgRef] = this.msgChainer.create(streamId, streamPartition, timestamp, publisherId) + const [messageId, prevMsgRef] = this.msgChainer.create({ + streamId, + streamPartition, + timestamp, + publisherId, + }) return new StreamMessage({ messageId, diff --git a/src/utils.js b/src/utils.js index b779861f3..dd388706f 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,6 +1,8 @@ -import Receptacle from 'receptacle' import { v4 as uuidv4 } from 'uuid' import uniqueId from 'lodash.uniqueid' +import LRU from 'quick-lru' +import pMemoize from 'p-memoize' +import mem from 'mem' import pkg from '../package.json' @@ -37,50 +39,34 @@ export function waitFor(emitter, event) { }) } -export class AsyncCacheMap { - /* eslint-disable object-curly-newline */ - constructor(fn, { - max = 10000, - ttl = 30 * 60 * 1000, // 30 minutes - refresh = true, // reset ttl on access - } = {}) { - /* eslint-disable-next-line object-curly-newline */ - this.ttl = ttl - this.refresh = refresh - this.fn = fn - this.cache = new Receptacle({ - max, +/* eslint-disable object-curly-newline */ +export function CacheAsyncFn(fn, { + maxSize = 10000, + maxAge = 30 * 60 * 1000, // 30 minutes + cachePromiseRejection = false, +} = {}) { + const cachedFn = pMemoize(fn, { + maxAge, + cachePromiseRejection, + cache: new LRU({ + maxSize, }) - } - - load(id, { ttl = this.ttl, refresh = this.refresh, } = {}) { - if (!this.cache.get(id)) { - const promise = this.fn(id) - const success = this.cache.set(id, promise, { - ttl, - refresh, - }) - if (!success) { - console.warn(`Could not store ${id} in local cache.`) - return promise - } - } - return this.cache.get(id) - } - - stop() { - this.cache.clear() - } + }) + cachedFn.clear = () => pMemoize.clear(cachedFn) + return cachedFn } -export function AsyncCacheFn(fn, options) { - const cache = new AsyncCacheMap(fn, options) - const cacheFn = async (opts) => { - return cache.load('value', opts) - } - cacheFn.cache = cache - cacheFn.stop = () => { - return cache.stop() - } - return cacheFn +export function CacheFn(fn, { + maxSize = 10000, + maxAge = 30 * 60 * 1000, // 30 minutes +} = {}) { + const cachedFn = mem(fn, { + maxAge, + cache: new LRU({ + maxSize, + }) + }) + cachedFn.clear = () => mem.clear(cachedFn) + return cachedFn } +/* eslint-enable object-curly-newline */ diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index 05ac05754..e3fb43f5c 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -28,7 +28,10 @@ describe('MessageCreationUtil', () => { } afterEach(async () => { - msgCreationUtil.stop() + if (msgCreationUtil) { + msgCreationUtil.stop() + } + if (client) { await client.disconnect() } @@ -146,10 +149,6 @@ describe('MessageCreationUtil', () => { msgCreationUtil = new MessageCreationUtil(client) }) - afterEach(() => { - msgCreationUtil.stop() - }) - function getStreamMessage(streamId, timestamp, sequenceNumber, prevMsgRef) { return new StreamMessage({ messageId: new MessageID(streamId, 0, timestamp, sequenceNumber, hashedUsername, msgCreationUtil.msgChainer.msgChainId), From ffbb08017bfe136b617c38eddce68548eb73d1a2 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 16:08:39 -0400 Subject: [PATCH 6/8] Convert LoginEndpoints test to async/await. --- test/integration/LoginEndpoints.test.js | 112 +++++++++++------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/test/integration/LoginEndpoints.test.js b/test/integration/LoginEndpoints.test.js index fb654e2ba..e7ab91c30 100644 --- a/test/integration/LoginEndpoints.test.js +++ b/test/integration/LoginEndpoints.test.js @@ -22,98 +22,86 @@ describe('LoginEndpoints', () => { }) afterAll(async (done) => { - await client.ensureDisconnected() + await client.disconnect() done() }) describe('Challenge generation', () => { - it('should retrieve a challenge', () => client.getChallenge('some-address') - .then((challenge) => { - assert(challenge) - assert(challenge.id) - assert(challenge.challenge) - assert(challenge.expires) - })) + it('should retrieve a challenge', async () => { + const challenge = await client.getChallenge('some-address') + assert(challenge) + assert(challenge.id) + assert(challenge.challenge) + assert(challenge.expires) + }) }) - async function assertThrowsAsync(fn, regExp) { - let f = () => {} - try { - await fn() - } catch (e) { - f = () => { - throw e - } - } finally { - assert.throws(f, regExp) - } - } - describe('Challenge response', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.sendChallengeResponse( - { + await expect(async () => { + await client.sendChallengeResponse({ id: 'some-id', challenge: 'some-challenge', - }, - 'some-sig', - 'some-address', - ), /Error/) + }, 'some-sig', 'some-address') + }).rejects.toThrow() }) - it('should get a session token', () => { + + it('should get a session token', async () => { const wallet = ethers.Wallet.createRandom() - return client.getChallenge(wallet.address) - .then(async (challenge) => { - assert(challenge.challenge) - const signature = await wallet.signMessage(challenge.challenge) - return client.sendChallengeResponse(challenge, signature, wallet.address) - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - }) - }) + const challenge = await client.getChallenge(wallet.address) + assert(challenge.challenge) + const signature = await wallet.signMessage(challenge.challenge) + const sessionToken = await client.sendChallengeResponse(challenge, signature, wallet.address) + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token with combined function', () => { + + it('should get a session token with combined function', async () => { const wallet = ethers.Wallet.createRandom() - return client.loginWithChallengeResponse((d) => wallet.signMessage(d), wallet.address) - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - }) + const sessionToken = await client.loginWithChallengeResponse((d) => wallet.signMessage(d), wallet.address) + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) }) describe('API key login', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.loginWithApiKey('apikey'), /Error/) + await expect(async () => { + await client.loginWithApiKey('apikey') + }).rejects.toThrow() + }) + + it('should get a session token', async () => { + const sessionToken = await client.loginWithApiKey('tester1-api-key') + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token', () => client.loginWithApiKey('tester1-api-key') - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - })) }) describe('Username/password login', () => { it('should fail to get a session token', async () => { - await assertThrowsAsync(async () => client.loginWithUsernamePassword('username', 'password'), /Error/) + await expect(async () => { + await client.loginWithUsernamePassword('username', 'password') + }).rejects.toThrow() + }) + + it('should get a session token', async () => { + const sessionToken = await client.loginWithUsernamePassword('tester2@streamr.com', 'tester2') + assert(sessionToken) + assert(sessionToken.token) + assert(sessionToken.expires) }) - it('should get a session token', () => client.loginWithUsernamePassword('tester2@streamr.com', 'tester2') - .then((sessionToken) => { - assert(sessionToken) - assert(sessionToken.token) - assert(sessionToken.expires) - })) }) describe('UserInfo', () => { - it('should get user info', () => client.getUserInfo().then((userInfo) => { + it('should get user info', async () => { + const userInfo = await client.getUserInfo() assert(userInfo.name) assert(userInfo.username) - })) + }) }) describe('logout', () => { From 7ae593cdf732d73bfee4ca352b47c726dee90762 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 16 Sep 2020 16:09:13 -0400 Subject: [PATCH 7/8] Remove calls to ensureConnected/ensureDisconnected in test. --- test/benchmarks/publish.js | 4 ++-- test/flakey/DataUnionEndpoints.test.js | 10 +++++----- test/integration/MultipleClients.test.js | 8 ++++---- test/integration/ResendReconnect.test.js | 4 ++-- test/integration/Resends.test.js | 6 +++--- test/integration/StreamEndpoints.test.js | 4 ++-- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/test/benchmarks/publish.js b/test/benchmarks/publish.js index 3da517663..4a00e9ebc 100644 --- a/test/benchmarks/publish.js +++ b/test/benchmarks/publish.js @@ -63,8 +63,8 @@ async function run() { // eslint-disable-next-line no-console console.log('Disconnecting clients') await Promise.all([ - client1.ensureDisconnected(), - client2.ensureDisconnected(), + client1.disconnect(), + client2.disconnect(), ]) }) diff --git a/test/flakey/DataUnionEndpoints.test.js b/test/flakey/DataUnionEndpoints.test.js index 094168f6e..d39f1155b 100644 --- a/test/flakey/DataUnionEndpoints.test.js +++ b/test/flakey/DataUnionEndpoints.test.js @@ -38,7 +38,7 @@ describe('DataUnionEndPoints', () => { }, 10000) beforeEach(async () => { - await adminClient.ensureConnected() + await adminClient.connect() dataUnion = await adminClient.deployDataUnion({ provider: testProvider, }) @@ -51,7 +51,7 @@ describe('DataUnionEndPoints', () => { afterAll(async () => { if (!adminClient) { return } - await adminClient.ensureDisconnected() + await adminClient.disconnect() }) afterAll(async () => { @@ -100,12 +100,12 @@ describe('DataUnionEndPoints', () => { autoDisconnect: false, ...config.clientOptions, }) - await memberClient.ensureConnected() + await memberClient.connect() }) afterAll(async () => { if (!memberClient) { return } - await memberClient.ensureDisconnected() + await memberClient.disconnect() }) it('can join the dataUnion, and get their balances and stats, and check proof, and withdraw', async () => { @@ -201,7 +201,7 @@ describe('DataUnionEndPoints', () => { }) afterAll(async () => { if (!client) { return } - await client.ensureDisconnected() + await client.disconnect() }) it('can get dataUnion stats, member list, and member stats', async () => { diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 7ce858c25..05b6c294f 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -44,11 +44,11 @@ describe('PubSub with multiple clients', () => { } if (mainClient) { - await mainClient.ensureDisconnected() + await mainClient.disconnect() } if (otherClient) { - await otherClient.ensureDisconnected() + await otherClient.disconnect() } const openSockets = Connection.getOpen() @@ -65,8 +65,8 @@ describe('PubSub with multiple clients', () => { }) otherClient.once('error', done) mainClient.once('error', done) - await otherClient.ensureConnected() - await mainClient.ensureConnected() + await otherClient.connect() + await mainClient.connect() const receivedMessagesOther = [] const receivedMessagesMain = [] diff --git a/test/integration/ResendReconnect.test.js b/test/integration/ResendReconnect.test.js index c79cd9e86..b1a524a9d 100644 --- a/test/integration/ResendReconnect.test.js +++ b/test/integration/ResendReconnect.test.js @@ -27,7 +27,7 @@ describe('resend/reconnect', () => { beforeEach(async () => { client = createClient() - await client.ensureConnected() + await client.connect() publishedMessages = [] @@ -49,7 +49,7 @@ describe('resend/reconnect', () => { }, 10 * 1000) afterEach(async () => { - await client.ensureDisconnected() + await client.disconnect() }) describe('reconnect after resend', () => { diff --git a/test/integration/Resends.test.js b/test/integration/Resends.test.js index e980fc0f0..d7f2d2cc9 100644 --- a/test/integration/Resends.test.js +++ b/test/integration/Resends.test.js @@ -47,7 +47,7 @@ describe('StreamrClient resends', () => { }, 10 * 1000) afterEach(async () => { - await client.ensureDisconnected() + await client.disconnect() }) describe('issue resend and subscribe at the same time', () => { @@ -313,10 +313,10 @@ describe('StreamrClient resends', () => { } await wait(30000) - await client.ensureDisconnected() + await client.disconnect() // resend from LONG_RESEND messages - await client.ensureConnected() + await client.connect() const receivedMessages = [] const sub = await client.resend({ diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index 3e383cfb2..59dad5841 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -118,7 +118,7 @@ describe('StreamEndpoints', () => { describe('Stream configuration', () => { it('Stream.detectFields', async () => { - await client.ensureConnected() + await client.connect() await client.publish(createdStream.id, { foo: 'bar', count: 0, @@ -139,7 +139,7 @@ describe('StreamEndpoints', () => { }, ], ) - await client.ensureDisconnected() + await client.disconnect() }, 15000) }) From f4d60b15ef1f448cda412d07cb0823912a456666 Mon Sep 17 00:00:00 2001 From: Tim Kevin Oxley Date: Tue, 24 Nov 2020 10:50:09 -0500 Subject: [PATCH 8/8] =?UTF-8?q?5.=20Message=20Sequencing=20=E2=80=93=20Gua?= =?UTF-8?q?rantee=20sequence=20follows=20publish=20order=20&=20Prevent=20b?= =?UTF-8?q?ackdated=20messages=20silently=20breaking=20future=20publishes?= =?UTF-8?q?=20(#166)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improve authFetch logging. * Update message sequencer to strictly enforce message order. * Queue publishes per-stream otherwise can skip forward then back even when messages published in correct sequence. * Add partial solution to broken backdated messages, at least doesn't break regular sequential publishes. * Tidy up, add some comments. * Move publish queue fn into utils. --- package-lock.json | 65 +++++- package.json | 1 + src/Publisher.js | 233 ++++++++++++++------- src/rest/authFetch.js | 26 ++- src/utils.js | 68 +++++- test/integration/Sequencing.test.js | 290 ++++++++++++++++++++++++++ test/unit/MessageCreationUtil.test.js | 76 +++++-- test/unit/utils.test.js | 4 +- 8 files changed, 660 insertions(+), 103 deletions(-) create mode 100644 test/integration/Sequencing.test.js diff --git a/package-lock.json b/package-lock.json index 262031ea2..6bbd95df9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8526,6 +8526,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "pkg-dir": { @@ -11427,6 +11438,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "strip-json-comments": { @@ -12171,10 +12193,9 @@ "dev": true }, "p-limit": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", - "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", - "dev": true, + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.0.2.tgz", + "integrity": "sha512-iwqZSOoWIW+Ew4kAGUlN16J4M7OB3ysMLSZtnhmqx7njIHFPlxWBX8xo3lVTyFVq6mI/lL9qt2IsN1sHwaxJkg==", "requires": { "p-try": "^2.0.0" } @@ -12186,6 +12207,17 @@ "dev": true, "requires": { "p-limit": "^2.2.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "p-map": { @@ -12225,8 +12257,7 @@ "p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", - "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==", - "dev": true + "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==" }, "pac-proxy-agent": { "version": "3.0.1", @@ -15548,6 +15579,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "supports-color": { @@ -15964,6 +16006,17 @@ "dev": true, "requires": { "p-limit": "^2.0.0" + }, + "dependencies": { + "p-limit": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.3.0.tgz", + "integrity": "sha512-//88mFWSJx8lxCzwdAABTJL2MyWB12+eIY7MDL2SqLmAkeKU9qxRvWuSyTjm3FUmpBEMuFfckAIqEaVGUDxb6w==", + "dev": true, + "requires": { + "p-try": "^2.0.0" + } + } } }, "yargs": { diff --git a/package.json b/package.json index 5aaa18963..77d823622 100644 --- a/package.json +++ b/package.json @@ -74,6 +74,7 @@ "node-fetch": "^2.6.1", "node-webcrypto-ossl": "^2.1.1", "once": "^1.4.0", + "p-limit": "^3.0.2", "p-memoize": "^4.0.0", "promise-memoize": "^1.2.1", "qs": "^6.9.4", diff --git a/src/Publisher.js b/src/Publisher.js index 939c94c58..94dd67749 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -2,12 +2,13 @@ import crypto from 'crypto' import { ControlLayer, MessageLayer } from 'streamr-client-protocol' import randomstring from 'randomstring' +import LRU from 'quick-lru' import { ethers } from 'ethers' import Signer from './Signer' import Stream from './rest/domain/Stream' import FailedToPublishError from './errors/FailedToPublishError' -import { CacheAsyncFn, CacheFn } from './utils' +import { CacheAsyncFn, CacheFn, LimitAsyncFnByKey } from './utils' const { StreamMessage, MessageID, MessageRef } = MessageLayer @@ -27,32 +28,112 @@ function hash(stringToHash) { return crypto.createHash('md5').update(stringToHash).digest() } -class OrderedMessageChainCreator { - constructor() { +/** + * Message Chain Sequencing + */ + +class MessageChainSequence { + constructor({ maxSize = 10000 } = {}) { this.msgChainId = randomstring.generate(20) - this.messageRefs = new Map() + // tracks previous timestamp+sequence for stream+partition + this.messageRefs = new LRU({ + maxSize, // should never exceed this except in pathological cases + }) } - create({ streamId, streamPartition, timestamp, publisherId, }) { + /** + * Generate the next message MessageID + previous MessageRef for this message chain. + * Messages with same timestamp get incremented sequence numbers. + */ + + add({ streamId, streamPartition, timestamp, publisherId, }) { + // NOTE: publishing back-dated (i.e. non-sequentially timestamped) messages will 'break' sequencing. + // i.e. we lose track of biggest sequence number whenever timestamp changes for stream id+partition combo + // so backdated messages will start at sequence 0 again, regardless of the sequencing of existing messages. + // storage considers timestamp+sequence number unique, so the newer messages will clobber the older messages + // Not feasible to keep greatest sequence number for every millisecond timestamp so not sure a good way around this. + // Possible we should keep a global sequence number const key = `${streamId}|${streamPartition}` const prevMsgRef = this.messageRefs.get(key) - const sequenceNumber = this.getNextSequenceNumber(key, timestamp) - const messageId = new MessageID(streamId, streamPartition, timestamp, sequenceNumber, publisherId, this.msgChainId) - this.messageRefs.set(key, new MessageRef(timestamp, sequenceNumber)) + const isSameTimestamp = prevMsgRef && prevMsgRef.timestamp === timestamp + const isBackdated = prevMsgRef && prevMsgRef.timestamp > timestamp + // increment if timestamp the same, otherwise 0 + const nextSequenceNumber = isSameTimestamp ? prevMsgRef.sequenceNumber + 1 : 0 + const messageId = new MessageID(streamId, streamPartition, timestamp, nextSequenceNumber, publisherId, this.msgChainId) + // update latest timestamp + sequence for this streamId+partition + // (see note above about clobbering sequencing) + // don't update latest if timestamp < previous timestamp + // this "fixes" the sequence breaking issue above, but this message will silently disappear + if (!isBackdated) { + this.messageRefs.set(key, new MessageRef(timestamp, nextSequenceNumber)) + } return [messageId, prevMsgRef] } - getNextSequenceNumber(key, timestamp) { - if (!this.messageRefs.has(key)) { return 0 } - const prev = this.messageRefs.get(key) - if (timestamp !== prev.timestamp) { - return 0 - } - return prev.sequenceNumber + 1 + clear() { + this.messageRefs.clear() + } +} + +/** + * Computes appropriate stream partition + */ + +export class StreamPartitioner { + constructor(client) { + this.client = client + const cacheOptions = client.options.cache + this._getStreamPartitions = CacheAsyncFn(this._getStreamPartitions.bind(this), cacheOptions) + this.hash = CacheFn(hash, cacheOptions) } clear() { - this.messageRefs.clear() + this._getStreamPartitions.clear() + this.hash.clear() + } + + /** + * Get partition for given stream/streamId + partitionKey + */ + + async get(streamObjectOrId, partitionKey) { + const streamPartitions = await this.getStreamPartitions(streamObjectOrId) + return this.computeStreamPartition(streamPartitions, partitionKey) + } + + async getStreamPartitions(streamObjectOrId) { + if (streamObjectOrId && streamObjectOrId.partitions != null) { + return streamObjectOrId.partitions + } + + // get streamId here so caching based on id works + const streamId = getStreamId(streamObjectOrId) + return this._getStreamPartitions(streamId) + } + + async _getStreamPartitions(streamId) { + const { partitions } = await this.client.getStream(streamId) + return partitions + } + + computeStreamPartition(partitionCount, partitionKey) { + if (!(Number.isSafeInteger(partitionCount) && partitionCount > 0)) { + throw new Error(`partitionCount is not a safe positive integer! ${partitionCount}`) + } + + if (partitionCount === 1) { + // Fast common case + return 0 + } + + if (!partitionKey) { + // Fallback to random partition if no key + return Math.floor(Math.random() * partitionCount) + } + + const buffer = this.hash(partitionKey) + const intHash = buffer.readInt32LE() + return Math.abs(intHash) % partitionCount } } @@ -60,21 +141,19 @@ export class MessageCreationUtil { constructor(client) { this.client = client const cacheOptions = client.options.cache - this.msgChainer = new OrderedMessageChainCreator(cacheOptions) - - this._getStreamPartitions = CacheAsyncFn(this._getStreamPartitions.bind(this), cacheOptions) + this.msgChainer = new MessageChainSequence(cacheOptions) + this.partitioner = new StreamPartitioner(client) this.getUserInfo = CacheAsyncFn(this.getUserInfo.bind(this), cacheOptions) this.getPublisherId = CacheAsyncFn(this.getPublisherId.bind(this), cacheOptions) - this.hash = CacheFn(hash, cacheOptions) + this.queue = LimitAsyncFnByKey(1) // an async queue for each stream's async deps } stop() { this.msgChainer.clear() - this.msgChainer = new OrderedMessageChainCreator() this.getUserInfo.clear() this.getPublisherId.clear() - this._getStreamPartitions.clear() - this.hash.clear() + this.partitioner.clear() + this.queue.clear() } async getPublisherId() { @@ -104,57 +183,59 @@ export class MessageCreationUtil { } async getUsername() { - return this.getUserInfo().then((userInfo) => ( - userInfo.username - || userInfo.id // In the edge case where StreamrClient.auth.apiKey is an anonymous key, userInfo.id is that anonymous key - )) + const { username, id } = await this.client.getUserInfo() + return ( + username + // edge case: if auth.apiKey is an anonymous key, userInfo.id is that anonymous key + || id + ) } - async getStreamPartitions(streamObjectOrId) { - if (streamObjectOrId && streamObjectOrId.partitions != null) { - return streamObjectOrId.partitions + async createStreamMessage(streamObjectOrId, options = {}) { + const { content } = options + // Validate content + if (typeof content !== 'object') { + throw new Error(`Message content must be an object! Was: ${content}`) } - // get streamId here so caching based on id works - const streamId = getStreamId(streamObjectOrId) - return this._getStreamPartitions(streamId) + // queued depdendencies fetching + const [publisherId, streamPartition] = await this._getDependencies(streamObjectOrId, options) + return this._createStreamMessage(getStreamId(streamObjectOrId), { + publisherId, + streamPartition, + ...options + }) } - async _getStreamPartitions(streamId) { - const { partitions } = await this.client.getStream(streamId) - return partitions - } + /** + * Fetch async dependencies for publishing. + * Should resolve in call-order per-stream to guarantee correct sequencing. + */ - computeStreamPartition(partitionCount, partitionKey) { - if (!partitionCount) { - throw new Error('partitionCount is falsey!') - } else if (partitionCount === 1) { - // Fast common case - return 0 - } else if (partitionKey) { - const buffer = this.hash(partitionKey) - const intHash = buffer.readInt32LE() - return Math.abs(intHash) % partitionCount - } else { - // Fallback to random partition if no key - return Math.floor(Math.random() * partitionCount) - } + async _getDependencies(streamObjectOrId, { partitionKey }) { + // This queue guarantees stream messages for the same timestamp are sequenced in-order + // regardless of the async resolution order. + // otherwise, if async calls happen to resolve in a different order + // than they were issued we will end up generating the wrong sequence numbers + const streamId = getStreamId(streamObjectOrId) + return this.queue(streamId, async () => ( + Promise.all([ + this.getPublisherId(), + this.partitioner.get(streamObjectOrId, partitionKey), + ]) + )) } - async createStreamMessage(streamObjectOrId, { data, timestamp, partitionKey } = {}) { - // Validate data - if (typeof data !== 'object') { - throw new Error(`Message data must be an object! Was: ${data}`) - } + /** + * Synchronously generate chain sequence + stream message after async deps resolved. + */ - const streamId = getStreamId(streamObjectOrId) - const [streamPartitions, publisherId] = await Promise.all([ - this.getStreamPartitions(streamObjectOrId), - this.getPublisherId(), - ]) + _createStreamMessage(streamId, options = {}) { + const { + content, streamPartition, timestamp, publisherId, ...opts + } = options - const streamPartition = this.computeStreamPartition(streamPartitions, partitionKey) - const [messageId, prevMsgRef] = this.msgChainer.create({ + const [messageId, prevMsgRef] = this.msgChainer.add({ streamId, streamPartition, timestamp, @@ -164,7 +245,8 @@ export class MessageCreationUtil { return new StreamMessage({ messageId, prevMsgRef, - content: data, + content, + ...opts }) } } @@ -189,30 +271,35 @@ export default class Publisher { } async publish(...args) { - this.debug('publish()') - + // wrap publish in error emitter return this._publish(...args).catch((err) => { this.onErrorEmit(err) throw err }) } - async _publish(streamObjectOrId, data, timestamp = new Date(), partitionKey = null) { + async _publish(streamObjectOrId, content, timestamp = new Date(), partitionKey = null) { + this.debug('publish()') if (this.client.session.isUnauthenticated()) { throw new Error('Need to be authenticated to publish.') } const timestampAsNumber = timestamp instanceof Date ? timestamp.getTime() : new Date(timestamp).getTime() - const [sessionToken, streamMessage] = await Promise.all([ - this.client.session.getSessionToken(), + // get session + generate stream message + // important: stream message call must be executed in publish() call order + // or sequencing will be broken. + // i.e. do not put async work before call to createStreamMessage + const [streamMessage, sessionToken] = await Promise.all([ this.msgCreationUtil.createStreamMessage(streamObjectOrId, { - data, + content, timestamp: timestampAsNumber, partitionKey }), + this.client.session.getSessionToken(), // fetch in parallel ]) if (this.signer) { + // optional await this.signer.signStreamMessage(streamMessage) } @@ -222,20 +309,22 @@ export default class Publisher { requestId, sessionToken, }) + try { await this.client.send(request) } catch (err) { const streamId = getStreamId(streamObjectOrId) throw new FailedToPublishError( streamId, - data, + content, err ) } + return request } - getPublisherId() { + async getPublisherId() { if (this.client.session.isUnauthenticated()) { throw new Error('Need to be authenticated to getPublisherId.') } diff --git a/src/rest/authFetch.js b/src/rest/authFetch.js index 8d92c34d9..0ce5b6e35 100644 --- a/src/rest/authFetch.js +++ b/src/rest/authFetch.js @@ -1,5 +1,5 @@ import fetch from 'node-fetch' -import debugFactory from 'debug' +import Debug from 'debug' import AuthFetchError from '../errors/AuthFetchError' import { getVersionString } from '../utils' @@ -8,23 +8,30 @@ export const DEFAULT_HEADERS = { 'Streamr-Client': `streamr-client-javascript/${getVersionString()}`, } -const debug = debugFactory('StreamrClient:utils') +const debug = Debug('StreamrClient:utils:authfetch') + +let ID = 0 + +export default async function authFetch(url, session, opts, requireNewToken = false) { + ID += 1 + const timeStart = Date.now() + const id = ID -const authFetch = async (url, session, opts = {}, requireNewToken = false) => { const options = { ...opts, headers: { ...DEFAULT_HEADERS, - ...opts.headers, + ...(opts && opts.headers), } } + // add default 'Content-Type: application/json' header for all requests // including 0 body length POST calls if (!options.headers['Content-Type']) { options.headers['Content-Type'] = 'application/json' } - debug('authFetch: ', url, opts) + debug('%d %s >> %o', id, url, opts) const response = await fetch(url, { ...opts, @@ -35,6 +42,8 @@ const authFetch = async (url, session, opts = {}, requireNewToken = false) => { ...options.headers, }, }) + const timeEnd = Date.now() + debug('%d %s << %d %s %s %s', id, url, response.status, response.statusText, Debug.humanize(timeEnd - timeStart)) const body = await response.text() @@ -42,13 +51,14 @@ const authFetch = async (url, session, opts = {}, requireNewToken = false) => { try { return JSON.parse(body || '{}') } catch (e) { + debug('%d %s – failed to parse body: %s', id, url, e.stack) throw new AuthFetchError(e.message, response, body) } } else if ([400, 401].includes(response.status) && !requireNewToken) { + debug('%d %s – revalidating session') return authFetch(url, session, options, true) } else { - throw new AuthFetchError(`Request to ${url} returned with error code ${response.status}.`, response, body) + debug('%d %s – failed', id, url) + throw new AuthFetchError(`Request ${id} to ${url} returned with error code ${response.status}.`, response, body) } } - -export default authFetch diff --git a/src/utils.js b/src/utils.js index dd388706f..3a9491bdb 100644 --- a/src/utils.js +++ b/src/utils.js @@ -2,6 +2,7 @@ import { v4 as uuidv4 } from 'uuid' import uniqueId from 'lodash.uniqueid' import LRU from 'quick-lru' import pMemoize from 'p-memoize' +import pLimit from 'p-limit' import mem from 'mem' import pkg from '../package.json' @@ -40,12 +41,27 @@ export function waitFor(emitter, event) { } /* eslint-disable object-curly-newline */ -export function CacheAsyncFn(fn, { + +/** + * Returns a cached async fn, cached keyed on first argument passed. See documentation for mem/p-memoize. + * Caches into a LRU cache capped at options.maxSize + * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clear() is called. + * Won't cache rejections by default. Override with options.cachePromiseRejection = true. + * + * ```js + * const cachedAsyncFn = CacheAsyncFn(asyncFn, options) + * await cachedAsyncFn(key) + * await cachedAsyncFn(key) + * cachedAsyncFn.clear() + * ``` + */ + +export function CacheAsyncFn(asyncFn, { maxSize = 10000, maxAge = 30 * 60 * 1000, // 30 minutes cachePromiseRejection = false, } = {}) { - const cachedFn = pMemoize(fn, { + const cachedFn = pMemoize(asyncFn, { maxAge, cachePromiseRejection, cache: new LRU({ @@ -56,6 +72,20 @@ export function CacheAsyncFn(fn, { return cachedFn } +/** + * Returns a cached fn, cached keyed on first argument passed. See documentation for mem. + * Caches into a LRU cache capped at options.maxSize + * Won't call fn again until options.maxAge or options.maxSize exceeded, or cachedFn.clear() is called. + * + * ```js + * const cachedFn = CacheFn(fn, options) + * cachedFn(key) + * cachedFn(key) + * cachedFn(...args) + * cachedFn.clear() + * ``` + */ + export function CacheFn(fn, { maxSize = 10000, maxAge = 30 * 60 * 1000, // 30 minutes @@ -69,4 +99,38 @@ export function CacheFn(fn, { cachedFn.clear = () => mem.clear(cachedFn) return cachedFn } + /* eslint-enable object-curly-newline */ + +/** + * Returns a limit function that limits concurrency per-key. + * + * ```js + * const limit = LimitAsyncFnByKey(1) + * limit('channel1', fn) + * limit('channel2', fn) + * limit('channel2', fn) + * ``` + */ + +export function LimitAsyncFnByKey(limit) { + const pending = new Map() + const f = async (id, fn) => { + const limitFn = pending.get(id) || pending.set(id, pLimit(limit)).get(id) + try { + return await limitFn(fn) + } finally { + if (!limitFn.activeCount && !limitFn.pendingCount && pending.get(id) === limitFn) { + // clean up if no more active entries (if not cleared) + pending.delete(id) + } + } + } + f.clear = () => { + // note: does not cancel promises + pending.forEach((p) => p.clearQueue()) + pending.clear() + } + return f +} + diff --git a/test/integration/Sequencing.test.js b/test/integration/Sequencing.test.js new file mode 100644 index 000000000..93a448eb9 --- /dev/null +++ b/test/integration/Sequencing.test.js @@ -0,0 +1,290 @@ +import { wait, waitForCondition, waitForEvent } from 'streamr-test-utils' + +import { uid, fakePrivateKey } from '../utils' +import StreamrClient from '../../src' +import Connection from '../../src/Connection' + +import config from './config' + +const Msg = (opts) => ({ + value: uid('msg'), + ...opts, +}) + +function toSeq(requests, ts = Date.now()) { + return requests.map((m) => { + const { prevMsgRef } = m.streamMessage + return [ + [m.streamMessage.getTimestamp() - ts, m.streamMessage.getSequenceNumber()], + prevMsgRef ? [prevMsgRef.timestamp - ts, prevMsgRef.sequenceNumber] : null + ] + }) +} + +describe('Sequencing', () => { + let expectErrors = 0 // check no errors by default + let onError = jest.fn() + let client + let stream + + const createClient = (opts = {}) => { + const c = new StreamrClient({ + auth: { + privateKey: fakePrivateKey(), + }, + autoConnect: false, + autoDisconnect: false, + maxRetries: 2, + ...config.clientOptions, + ...opts, + }) + c.onError = jest.fn() + c.on('error', onError) + return c + } + + beforeEach(async () => { + expectErrors = 0 + onError = jest.fn() + client = createClient() + await client.connect() + + stream = await client.createStream({ + name: uid('stream') + }) + }) + + afterEach(async () => { + await wait() + // ensure no unexpected errors + expect(onError).toHaveBeenCalledTimes(expectErrors) + if (client) { + expect(client.onError).toHaveBeenCalledTimes(expectErrors) + } + }) + + afterEach(async () => { + await wait() + if (client) { + client.debug('disconnecting after test') + await client.disconnect() + } + + const openSockets = Connection.getOpen() + if (openSockets !== 0) { + throw new Error(`sockets not closed: ${openSockets}`) + } + }) + + it('should sequence in order', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = () => { + const msg = Msg() + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // first 2 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + ]) + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[1, 0], [0, 1]], + [[1, 1], [1, 0]], + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 5000).catch(() => {}) // ignore, tests will fail anyway + + expect(msgsReceieved).toEqual(msgsPublished) + }, 10000) + + it('should sequence in order even if some calls delayed', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + let calls = 0 + const { clear } = client.publisher.msgCreationUtil.getPublisherId + const getPublisherId = client.publisher.msgCreationUtil.getPublisherId.bind(client.publisher.msgCreationUtil) + client.publisher.msgCreationUtil.getPublisherId = async (...args) => { + // delay getPublisher call + calls += 1 + if (calls === 2) { + const result = await getPublisherId(...args) + // delay resolving this call + await wait(100) + return result + } + return getPublisherId(...args) + } + client.publisher.msgCreationUtil.getPublisherId.clear = clear + + const nextMsg = () => { + const msg = Msg() + msgsPublished.push(msg) + return msg + } + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + const requests = await Promise.all([ + // first 2 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + ]) + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[1, 0], [0, 1]], + [[1, 1], [1, 0]], + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 5000).catch(() => {}) // ignore, tests will fail anyway + + expect(msgsReceieved).toEqual(msgsPublished) + }, 10000) + + it('should sequence in order even if publish requests backdated', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = (...args) => { + const msg = Msg(...args) + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // publish at ts + 0 + client.publish(stream, nextMsg(), ts), + // publish at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + // backdate at ts + 0 + client.publish(stream, nextMsg({ + backdated: true, + }), ts), + // resume at ts + 2 + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 3), + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 2000).catch(() => {}) // ignore, tests will fail anyway + + const msgsResent = [] + const sub = await client.resend({ + stream: stream.id, + resend: { + from: { + timestamp: 0 + }, + }, + }, (m) => msgsResent.push(m)) + await waitForEvent(sub, 'resent') + + expect(msgsReceieved).toEqual(msgsResent) + // backdated messages disappear + expect(msgsReceieved).toEqual(msgsPublished.filter(({ backdated }) => !backdated)) + + const seq = toSeq(requests, ts) + client.debug(seq) + expect(seq).toEqual([ + [[0, 0], null], + [[1, 0], [0, 0]], + [[0, 0], [1, 0]], // bad message + [[2, 0], [1, 0]], + [[2, 1], [2, 0]], + [[3, 0], [2, 1]], + ]) + }, 10000) + + it('should sequence in order even if publish requests backdated in sequence', async () => { + const ts = Date.now() + const msgsPublished = [] + const msgsReceieved = [] + + await client.subscribe(stream.id, (m) => msgsReceieved.push(m)) + + const nextMsg = (...args) => { + const msg = Msg(...args) + msgsPublished.push(msg) + return msg + } + + const requests = await Promise.all([ + // first 3 messages at ts + 0 + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + client.publish(stream, nextMsg(), ts), + // next two messages at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + // backdate at ts + 0 + client.publish(stream, nextMsg({ + backdated: true, + }), ts), + // resume publishing at ts + 1 + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 1), + client.publish(stream, nextMsg(), ts + 2), + client.publish(stream, nextMsg(), ts + 2), + ]) + + await waitForCondition(() => ( + msgsReceieved.length === msgsPublished.length + ), 2000).catch(() => {}) // ignore, tests will fail anyway + + const msgsResent = [] + const sub = await client.resend({ + stream: stream.id, + resend: { + from: { + timestamp: 0 + }, + }, + }, (m) => msgsResent.push(m)) + await waitForEvent(sub, 'resent') + + expect(msgsReceieved).toEqual(msgsResent) + // backdated messages disappear + expect(msgsReceieved).toEqual(msgsPublished.filter(({ backdated }) => !backdated)) + + const seq = toSeq(requests, ts) + expect(seq).toEqual([ + [[0, 0], null], + [[0, 1], [0, 0]], + [[0, 2], [0, 1]], + [[1, 0], [0, 2]], + [[1, 1], [1, 0]], + [[0, 0], [1, 1]], // bad message + [[1, 2], [1, 1]], + [[1, 3], [1, 2]], + [[2, 0], [1, 3]], + [[2, 1], [2, 0]], + ]) + }, 10000) +}) diff --git a/test/unit/MessageCreationUtil.test.js b/test/unit/MessageCreationUtil.test.js index e3fb43f5c..dfb46b3ba 100644 --- a/test/unit/MessageCreationUtil.test.js +++ b/test/unit/MessageCreationUtil.test.js @@ -1,8 +1,9 @@ import sinon from 'sinon' import { ethers } from 'ethers' +import { wait } from 'streamr-test-utils' import { MessageLayer } from 'streamr-client-protocol' -import { MessageCreationUtil } from '../../src/Publisher' +import { MessageCreationUtil, StreamPartitioner } from '../../src/Publisher' import Stream from '../../src/rest/domain/Stream' // eslint-disable-next-line import/no-named-as-default-member @@ -84,24 +85,26 @@ describe('MessageCreationUtil', () => { }) }) - describe('partitioner', () => { + describe('StreamPartitioner', () => { + let streamPartitioner + beforeAll(() => { client = createClient() }) beforeEach(() => { - msgCreationUtil = new MessageCreationUtil(client) + streamPartitioner = new StreamPartitioner(client) }) it('should throw if partition count is not defined', () => { expect(() => { - msgCreationUtil.computeStreamPartition(undefined, 'foo') + streamPartitioner.computeStreamPartition(undefined, 'foo') }).toThrow() }) it('should always return partition 0 for all keys if partition count is 1', () => { for (let i = 0; i < 100; i++) { - expect(msgCreationUtil.computeStreamPartition(1, `foo${i}`)).toEqual(0) + expect(streamPartitioner.computeStreamPartition(1, `foo${i}`)).toEqual(0) } }) @@ -119,7 +122,7 @@ describe('MessageCreationUtil', () => { expect(correctResults.length).toEqual(keys.length) for (let i = 0; i < keys.length; i++) { - const partition = msgCreationUtil.computeStreamPartition(10, keys[i]) + const partition = streamPartitioner.computeStreamPartition(10, keys[i]) expect(correctResults[i]).toStrictEqual(partition) } }) @@ -165,7 +168,7 @@ describe('MessageCreationUtil', () => { for (let i = 0; i < 10; i++) { // eslint-disable-next-line no-await-in-loop const streamMessage = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: ts, + content: pubMsg, timestamp: ts, }) expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts, i, prevMsgRef)) prevMsgRef = new MessageRef(ts, i) @@ -178,13 +181,60 @@ describe('MessageCreationUtil', () => { for (let i = 0; i < 10; i++) { // eslint-disable-next-line no-await-in-loop const streamMessage = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: ts + i, + content: pubMsg, timestamp: ts + i, }) expect(streamMessage).toStrictEqual(getStreamMessage('streamId', ts + i, 0, prevMsgRef)) prevMsgRef = new MessageRef(ts + i, 0) } }) + it('should sequence in order even if async dependencies resolve out of order', async () => { + const ts = Date.now() + let calls = 0 + const getStreamPartitions = msgCreationUtil.partitioner.getStreamPartitions.bind(msgCreationUtil.partitioner) + msgCreationUtil.partitioner.getStreamPartitions = async (...args) => { + calls += 1 + if (calls === 2) { + const result = await getStreamPartitions(...args) + // delay resolving this call + await wait(100) + return result + } + return getStreamPartitions(...args) + } + + // simultaneously publish 3 messages, but the second item's dependencies will resolve late. + // this shouldn't affect the sequencing + const messages = await Promise.all([ + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }), + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }), + msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + ]) + const sequenceNumbers = messages.map((m) => m.getSequenceNumber()) + expect(sequenceNumbers).toEqual([0, 1, 2]) + }) + + it('should handle old timestamps', async () => { + const ts = Date.now() + const streamMessage1 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + const streamMessage2 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts + 1, + }) + const streamMessage3 = await msgCreationUtil.createStreamMessage(stream, { + content: pubMsg, timestamp: ts, + }) + const sequenceNumbers = [streamMessage1, streamMessage2, streamMessage3].map((m) => m.getSequenceNumber()) + expect(sequenceNumbers).toEqual([0, 0, 0]) + }) + it('should publish messages with sequence number 0 (different streams)', async () => { const ts = Date.now() const stream2 = new Stream(null, { @@ -197,13 +247,13 @@ describe('MessageCreationUtil', () => { }) const msg1 = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: ts + content: pubMsg, timestamp: ts }) const msg2 = await msgCreationUtil.createStreamMessage(stream2, { - data: pubMsg, timestamp: ts + content: pubMsg, timestamp: ts }) const msg3 = await msgCreationUtil.createStreamMessage(stream3, { - data: pubMsg, timestamp: ts + content: pubMsg, timestamp: ts }) expect(msg1).toEqual(getStreamMessage('streamId', ts, 0, null)) @@ -213,7 +263,7 @@ describe('MessageCreationUtil', () => { it.skip('should sign messages if signer is defined', async () => { const msg1 = await msgCreationUtil.createStreamMessage(stream, { - data: pubMsg, timestamp: Date.now() + content: pubMsg, timestamp: Date.now() }) expect(msg1.signature).toBe('signature') }) @@ -221,7 +271,7 @@ describe('MessageCreationUtil', () => { it('should create message from a stream id by fetching the stream', async () => { const ts = Date.now() const streamMessage = await msgCreationUtil.createStreamMessage(stream.id, { - data: pubMsg, timestamp: ts + content: pubMsg, timestamp: ts }) expect(streamMessage).toEqual(getStreamMessage(stream.id, ts, 0, null)) }) diff --git a/test/unit/utils.test.js b/test/unit/utils.test.js index 3ff7b47ff..e0079f127 100644 --- a/test/unit/utils.test.js +++ b/test/unit/utils.test.js @@ -53,8 +53,8 @@ describe('utils', () => { session.getSessionToken = sinon.stub().resolves('invalid token') return authFetch(baseUrl + testUrl, session).catch((err) => { expect(session.getSessionToken.calledTwice).toBeTruthy() - expect(err.toString()).toEqual( - `Error: Request to ${baseUrl + testUrl} returned with error code 401. Unauthorized` + expect(err.toString()).toMatch( + `${baseUrl + testUrl} returned with error code 401. Unauthorized` ) expect(err.body).toEqual('Unauthorized') done()