From c2050597b6afdae26c112d1ffcbe3c74b3df006c Mon Sep 17 00:00:00 2001 From: Tuomas Koponen Date: Wed, 3 Feb 2021 15:09:00 +0200 Subject: [PATCH 1/7] Add detectFields implementation to Stream --- src/stream/index.ts | 24 +++++-- src/utils/FieldDetector.js | 33 ++++++++++ src/utils/index.js | 3 +- test/integration/Stream.test.js | 107 ++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 6 deletions(-) create mode 100644 src/utils/FieldDetector.js create mode 100644 test/integration/Stream.test.js diff --git a/src/stream/index.ts b/src/stream/index.ts index 7f9c4ac9f..11a8d879e 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -1,4 +1,4 @@ -import { getEndpointUrl } from '../utils' +import { getEndpointUrl, FieldDetector } from '../utils' import authFetch from '../rest/authFetch' import StorageNode from './StorageNode' @@ -125,10 +125,24 @@ export default class Stream { } async detectFields() { - return authFetch( - getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'detectFields'), - this._client.session, - ) + // Get last message of the stream to be used for field detecting + const sub = await this._client.resend({ + stream: this.id, + resend: { + last: 1, + }, + }) + const receivedMsgs = await sub.collect() + + if (receivedMsgs.length > 0) { + const lastMessage = receivedMsgs[0] + const fd = new FieldDetector(lastMessage) + const fields = fd.detect() + + // Save field config back to the stream + this.config.fields = fields + await this.update() + } } async addToStorageNode(address: string) { diff --git a/src/utils/FieldDetector.js b/src/utils/FieldDetector.js new file mode 100644 index 000000000..69ba45d13 --- /dev/null +++ b/src/utils/FieldDetector.js @@ -0,0 +1,33 @@ +export default class FieldDetector { + message = null + + constructor(message) { + this.message = message + } + + detect() { + if (this.message == null) { + throw new Error('Invalid message provided to FieldDetector constructor') + } + + const content = this.message + const fields = [] + + Object.keys(content).forEach((key) => { + let type + if (Array.isArray(content[key])) { + type = 'list' + } else if ((typeof content[key]) === 'object') { + type = 'map' + } else { + type = typeof content[key] + } + fields.push({ + name: key, + type, + }) + }) + + return fields + } +} diff --git a/src/utils/index.js b/src/utils/index.js index f0cff05e9..51c258b2c 100644 --- a/src/utils/index.js +++ b/src/utils/index.js @@ -11,8 +11,9 @@ import pkg from '../../package.json' import AggregatedError from './AggregatedError' import Scaffold from './Scaffold' +import FieldDetector from './FieldDetector' -export { AggregatedError, Scaffold } +export { AggregatedError, Scaffold, FieldDetector } const UUID = uuidv4() diff --git a/test/integration/Stream.test.js b/test/integration/Stream.test.js new file mode 100644 index 000000000..a8506e71b --- /dev/null +++ b/test/integration/Stream.test.js @@ -0,0 +1,107 @@ +import StreamrClient from '../../src' +import { uid, fakePrivateKey, getPublishTestMessages } from '../utils' + +import config from './config' + +const createClient = (opts = {}) => new StreamrClient({ + ...config.clientOptions, + auth: { + privateKey: fakePrivateKey(), + }, + autoConnect: false, + autoDisconnect: false, + ...opts, +}) + +describe('Stream', () => { + let client + let stream + + beforeEach(async () => { + client = createClient() + await client.connect() + + stream = await client.createStream({ + name: uid('stream-integration-test') + }) + }) + + afterEach(async () => { + await client.disconnect() + }) + + describe('detectFields()', () => { + it('does detect primitive types', async () => { + const msg = { + number: 123, + boolean: true, + object: { + k: 1, + v: 2, + }, + array: [1, 2, 3], + string: 'test', + } + const publishTestMessages = getPublishTestMessages(client, { + streamId: stream.id, + waitForLast: true, + createMessage: () => msg, + }) + await publishTestMessages(1) + + expect(stream.config.fields).toEqual([]) + await stream.detectFields() + expect(stream.config.fields).toEqual([ + { + name: 'number', + type: 'number', + }, + { + name: 'boolean', + type: 'boolean', + }, + { + name: 'object', + type: 'map', + }, + { + name: 'array', + type: 'list', + }, + { + name: 'string', + type: 'string', + }, + ]) + }) + + it('skips unsupported types', async () => { + const msg = { + null: null, + empty: {}, + func: () => null, + nonexistent: undefined, + symbol: Symbol('test'), + } + const publishTestMessages = getPublishTestMessages(client, { + streamId: stream.id, + waitForLast: true, + createMessage: () => msg, + }) + await publishTestMessages(1) + + expect(stream.config.fields).toEqual([]) + await stream.detectFields() + expect(stream.config.fields).toEqual([ + { + name: 'null', + type: 'map', + }, + { + name: 'empty', + type: 'map', + }, + ]) + }) + }) +}) From b0849615ea6a52f8974ee5c2c341812aeb2ac22b Mon Sep 17 00:00:00 2001 From: Tuomas Koponen Date: Fri, 12 Feb 2021 08:52:52 +0200 Subject: [PATCH 2/7] Remove FieldDetector class --- src/stream/index.ts | 20 +++++++++++++++++--- src/utils/FieldDetector.js | 33 --------------------------------- src/utils/index.js | 3 +-- 3 files changed, 18 insertions(+), 38 deletions(-) delete mode 100644 src/utils/FieldDetector.js diff --git a/src/stream/index.ts b/src/stream/index.ts index 11a8d879e..1b4130601 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -1,4 +1,4 @@ -import { getEndpointUrl, FieldDetector } from '../utils' +import { getEndpointUrl } from '../utils' import authFetch from '../rest/authFetch' import StorageNode from './StorageNode' @@ -136,8 +136,22 @@ export default class Stream { if (receivedMsgs.length > 0) { const lastMessage = receivedMsgs[0] - const fd = new FieldDetector(lastMessage) - const fields = fd.detect() + const fields = [] + + Object.keys(lastMessage).forEach((key) => { + let type + if (Array.isArray(lastMessage[key])) { + type = 'list' + } else if ((typeof lastMessage[key]) === 'object') { + type = 'map' + } else { + type = typeof lastMessage[key] + } + fields.push({ + name: key, + type, + }) + }) // Save field config back to the stream this.config.fields = fields diff --git a/src/utils/FieldDetector.js b/src/utils/FieldDetector.js deleted file mode 100644 index 69ba45d13..000000000 --- a/src/utils/FieldDetector.js +++ /dev/null @@ -1,33 +0,0 @@ -export default class FieldDetector { - message = null - - constructor(message) { - this.message = message - } - - detect() { - if (this.message == null) { - throw new Error('Invalid message provided to FieldDetector constructor') - } - - const content = this.message - const fields = [] - - Object.keys(content).forEach((key) => { - let type - if (Array.isArray(content[key])) { - type = 'list' - } else if ((typeof content[key]) === 'object') { - type = 'map' - } else { - type = typeof content[key] - } - fields.push({ - name: key, - type, - }) - }) - - return fields - } -} diff --git a/src/utils/index.js b/src/utils/index.js index 51c258b2c..f0cff05e9 100644 --- a/src/utils/index.js +++ b/src/utils/index.js @@ -11,9 +11,8 @@ import pkg from '../../package.json' import AggregatedError from './AggregatedError' import Scaffold from './Scaffold' -import FieldDetector from './FieldDetector' -export { AggregatedError, Scaffold, FieldDetector } +export { AggregatedError, Scaffold } const UUID = uuidv4() From b918b3d4069103374ddcbdb643dcb8a38e37dcc9 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 17 Feb 2021 14:44:48 -0500 Subject: [PATCH 3/7] Update resend/subscribe signature so onMessage is optional. --- src/StreamrClient.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index a5a045960..30d521ec0 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -337,7 +337,7 @@ export default class StreamrClient extends EventEmitter { return this.publisher.rotateGroupKey(...args) } - async subscribe(opts: Todo, onMessage: OnMessageCallback) { + async subscribe(opts: Todo, onMessage?: OnMessageCallback) { let subTask: Todo let sub: Todo const hasResend = !!(opts.resend || opts.from || opts.to || opts.last) @@ -372,7 +372,7 @@ export default class StreamrClient extends EventEmitter { await this.subscriber.unsubscribe(opts) } - async resend(opts: Todo, onMessage: OnMessageCallback) { + async resend(opts: Todo, onMessage?: OnMessageCallback) { const task = this.subscriber.resend(opts) if (typeof onMessage !== 'function') { return task From e67d00a4929aae90b743356b82c362529a049739 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 17 Feb 2021 14:45:41 -0500 Subject: [PATCH 4/7] Remove old Stream.detectFields test. --- test/integration/StreamEndpoints.test.js | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index 7f50b741f..c5ccb1a74 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -176,30 +176,6 @@ function TestStreamEndpoints(getName) { }) }) - describe.skip('Stream configuration', () => { - it('Stream.detectFields', async () => { - await client.connect() - await client.publish(createdStream.id, { - foo: 'bar', - count: 0, - }) - // Need time to propagate to storage - await wait(10000) - const stream = await createdStream.detectFields() - expect(stream.config.fields).toEqual([ - { - name: 'foo', - type: 'string', - }, - { - name: 'count', - type: 'number', - }, - ]) - await client.disconnect() - }, 15000) - }) - describe('Stream permissions', () => { it('Stream.getPermissions', async () => { const permissions = await createdStream.getPermissions() From 061bbd89d202a81c8405f3bd961796c84d1618e9 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 17 Feb 2021 14:46:39 -0500 Subject: [PATCH 5/7] Fix client import in test, also verify config works after reloading stream info from server. --- test/integration/Stream.test.js | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/test/integration/Stream.test.js b/test/integration/Stream.test.js index a8506e71b..56f112f1b 100644 --- a/test/integration/Stream.test.js +++ b/test/integration/Stream.test.js @@ -1,4 +1,4 @@ -import StreamrClient from '../../src' +import StreamrClient from '../../src/StreamrClient' import { uid, fakePrivateKey, getPublishTestMessages } from '../utils' import config from './config' @@ -51,7 +51,7 @@ describe('Stream', () => { expect(stream.config.fields).toEqual([]) await stream.detectFields() - expect(stream.config.fields).toEqual([ + const expectedFields = [ { name: 'number', type: 'number', @@ -72,7 +72,11 @@ describe('Stream', () => { name: 'string', type: 'string', }, - ]) + ] + + expect(stream.config.fields).toEqual(expectedFields) + const loadedStream = await client.getStream(stream.id) + expect(loadedStream.config.fields).toEqual(expectedFields) }) it('skips unsupported types', async () => { @@ -82,6 +86,7 @@ describe('Stream', () => { func: () => null, nonexistent: undefined, symbol: Symbol('test'), + // TODO: bigint: 10n, } const publishTestMessages = getPublishTestMessages(client, { streamId: stream.id, @@ -92,7 +97,7 @@ describe('Stream', () => { expect(stream.config.fields).toEqual([]) await stream.detectFields() - expect(stream.config.fields).toEqual([ + const expectedFields = [ { name: 'null', type: 'map', @@ -101,7 +106,12 @@ describe('Stream', () => { name: 'empty', type: 'map', }, - ]) + ] + + expect(stream.config.fields).toEqual(expectedFields) + + const loadedStream = await client.getStream(stream.id) + expect(loadedStream.config.fields).toEqual(expectedFields) }) }) }) From 2ce215789dba24799f614a4afad0c31a6a24f195 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 17 Feb 2021 14:47:41 -0500 Subject: [PATCH 6/7] Add types to stream fields, refactor detectFields to fix typing. --- src/stream/index.ts | 67 +++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/src/stream/index.ts b/src/stream/index.ts index 1b4130601..29245468a 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -16,11 +16,39 @@ export enum StreamOperation { export type StreamProperties = Todo -export default class Stream { +const VALID_FIELD_TYPES = ['number', 'string', 'boolean', 'list', 'map'] as const + +type Field = { + name: string; + type: typeof VALID_FIELD_TYPES[number]; +} +function getFieldType(value: any): (Field['type'] | undefined) { + const type = typeof value + switch (true) { + case Array.isArray(value): { + return 'list' + } + case type === 'object': { + return 'map' + } + case (VALID_FIELD_TYPES as ReadonlyArray).includes(type): { + // see https://github.com/microsoft/TypeScript/issues/36275 + return type as Field['type'] + } + default: { + return undefined + } + } +} + +export default class Stream { // TODO add field definitions for all fields // @ts-expect-error id: string + config: { + fields: Field[]; + } = { fields: [] } _client: StreamrClient constructor(client: StreamrClient, props: StreamProperties) { @@ -132,31 +160,24 @@ export default class Stream { last: 1, }, }) + const receivedMsgs = await sub.collect() - if (receivedMsgs.length > 0) { - const lastMessage = receivedMsgs[0] - const fields = [] - - Object.keys(lastMessage).forEach((key) => { - let type - if (Array.isArray(lastMessage[key])) { - type = 'list' - } else if ((typeof lastMessage[key]) === 'object') { - type = 'map' - } else { - type = typeof lastMessage[key] - } - fields.push({ - name: key, - type, - }) - }) + if (!receivedMsgs.length) { return } - // Save field config back to the stream - this.config.fields = fields - await this.update() - } + const [lastMessage] = receivedMsgs + + const fields = Object.entries(lastMessage).map(([name, value]) => { + const type = getFieldType(value) + return !!type && { + name, + type, + } + }).filter(Boolean) as Field[] // see https://github.com/microsoft/TypeScript/issues/30621 + + // Save field config back to the stream + this.config.fields = fields + await this.update() } async addToStorageNode(address: string) { From ba222508db1b2e576da9cc5435a74250565bb815 Mon Sep 17 00:00:00 2001 From: Tim Oxley Date: Wed, 17 Feb 2021 16:21:43 -0500 Subject: [PATCH 7/7] Tidy unused. --- test/integration/StreamEndpoints.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index c5ccb1a74..6be2ee3c9 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -1,5 +1,4 @@ import { ethers } from 'ethers' -import { wait } from 'streamr-test-utils' import StreamrClient from '../../src/StreamrClient' import { uid } from '../utils'