From 7f36061e4587707541c9b2ae3a0aa2490160c662 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 13:30:25 +0200 Subject: [PATCH 01/11] Connection --- src/Connection.ts | 18 ++++++++++++++---- src/StreamrClient.ts | 6 +++--- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Connection.ts b/src/Connection.ts index 3b2a6977e..92c3fdb52 100644 --- a/src/Connection.ts +++ b/src/Connection.ts @@ -13,6 +13,16 @@ const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) // @ts-expect-error Debug.formatters.n = (v) => Debug.humanize(v) +export interface ConnectionOptions { + url?: string, + autoConnect?: boolean + autoDisconnect?: boolean + disconnectDelay?: number + maxRetries?: number + retryBackoffFactor?: number + maxRetryWait?: number +} + export class ConnectionError extends Error { reason?: Todo @@ -120,7 +130,7 @@ const STATE = { } /* eslint-disable no-underscore-dangle, no-param-reassign */ -function SocketConnector(connection: Todo) { +function SocketConnector(connection: Connection) { let next: Todo let socket: Todo let startedConnecting = false @@ -189,7 +199,7 @@ function SocketConnector(connection: Todo) { // connect async () => { startedConnecting = true - socket = await OpenWebSocket(connection.options.url, { + socket = await OpenWebSocket(connection.options.url!, { perMessageDeflate: false, debug: connection._debug, }) @@ -287,7 +297,7 @@ const DEFAULT_MAX_RETRIES = 10 export default class Connection extends EventEmitter { _debug: Todo - options: Todo + options: ConnectionOptions retryCount: Todo wantsState: Todo connectionHandles: Todo @@ -312,7 +322,7 @@ export default class Connection extends EventEmitter { })) } - constructor(options = {}, debug?: Debug.Debugger) { + constructor(options: ConnectionOptions = {}, debug?: Debug.Debugger) { super() this._debug = debug !== undefined ? debug.extend(counterId(this.constructor.name)) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 687614d94..ae9c1ede7 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -7,7 +7,7 @@ import { validateOptions } from './stream/utils' import Config, { StreamrClientOptions, StrictStreamrClientOptions } from './Config' import StreamrEthereum from './Ethereum' import Session from './Session' -import Connection, { ConnectionError } from './Connection' +import Connection, { ConnectionError, ConnectionOptions } from './Connection' import Publisher from './publish' import { Subscriber, Subscription } from './subscribe' import { getUserId } from './user' @@ -29,10 +29,9 @@ interface MessageEvent { /** * Wrap connection message events with message parsing. */ - class StreamrConnection extends Connection { // TODO define args type when we convert Connection class to TypeScript - constructor(options: Todo, debug?: Debug.Debugger) { + constructor(options: ConnectionOptions, debug?: Debug.Debugger) { super(options, debug) this.on('message', this.onConnectionMessage) } @@ -160,6 +159,7 @@ export class StreamrClient extends EventEmitter { /** @internal */ ethereum: StreamrEthereum + // TODO annotate connection parameter as internal parameter if possible? constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) { super() this.id = counterId(`${this.constructor.name}:${uid}`) From 0c027e010244e81949530edad0a107455ce585b5 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 13:45:45 +0200 Subject: [PATCH 02/11] stream utils to TypeScript --- src/stream/{utils.js => utils.ts} | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) rename src/stream/{utils.js => utils.ts} (87%) diff --git a/src/stream/utils.js b/src/stream/utils.ts similarity index 87% rename from src/stream/utils.js rename to src/stream/utils.ts index 03a3ad5df..cd872ce9a 100644 --- a/src/stream/utils.js +++ b/src/stream/utils.ts @@ -7,8 +7,10 @@ import { inspect } from 'util' import { ControlLayer } from 'streamr-client-protocol' import { pTimeout } from '../utils' +import { Todo } from '../types' +import { StreamrClient } from '../StreamrClient' -export function StreamKey({ streamId, streamPartition = 0 }) { +export function StreamKey({ streamId, streamPartition = 0 }: Todo) { if (streamId == null) { throw new Error(`StreamKey: invalid streamId (${typeof streamId}): ${streamId}`) } if (!Number.isInteger(streamPartition) || streamPartition < 0) { @@ -17,13 +19,13 @@ export function StreamKey({ streamId, streamPartition = 0 }) { return `${streamId}::${streamPartition}` } -export function validateOptions(optionsOrStreamId) { +export function validateOptions(optionsOrStreamId: Todo): Todo { if (!optionsOrStreamId) { throw new Error('streamId is required!') } // Backwards compatibility for giving a streamId as first argument - let options = {} + let options: Todo = {} if (typeof optionsOrStreamId === 'string') { options = { streamId: optionsOrStreamId, @@ -89,7 +91,7 @@ export async function waitForMatchingMessage({ rejectOnTimeout = true, timeoutMessage, cancelTask, -}) { +}: Todo) { if (typeof matchFn !== 'function') { throw new Error(`matchFn required, got: (${typeof matchFn}) ${matchFn}`) } @@ -98,7 +100,7 @@ export async function waitForMatchingMessage({ let cleanup = () => {} const matchTask = new Promise((resolve, reject) => { - const tryMatch = (...args) => { + const tryMatch = (...args: Todo[]) => { try { return matchFn(...args) } catch (err) { @@ -107,19 +109,20 @@ export async function waitForMatchingMessage({ return false } } - let onDisconnected - const onResponse = (res) => { + let onDisconnected: Todo + const onResponse = (res: Todo) => { if (!tryMatch(res)) { return } // clean up err handler cleanup() resolve(res) } - const onErrorResponse = (res) => { + const onErrorResponse = (res: Todo) => { if (!tryMatch(res)) { return } // clean up success handler cleanup() const error = new Error(res.errorMessage) + // @ts-expect-error error.code = res.errorCode reject(error) } @@ -128,12 +131,12 @@ export async function waitForMatchingMessage({ if (cancelTask) { cancelTask.catch(() => {}) } // ignore connection.off('disconnected', onDisconnected) connection.off(ControlMessage.TYPES.ErrorResponse, onErrorResponse) - types.forEach((type) => { + types.forEach((type: Todo) => { connection.off(type, onResponse) }) } - types.forEach((type) => { + types.forEach((type: Todo) => { connection.on(type, onResponse) }) @@ -141,6 +144,7 @@ export async function waitForMatchingMessage({ onDisconnected = () => { cleanup() + // @ts-expect-error resolve() // noop } @@ -171,7 +175,7 @@ export async function waitForMatchingMessage({ * Wait for matching response types to requestId, or ErrorResponse. */ -export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }) { +export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }: Todo) { if (requestId == null) { throw new Error(`requestId required, got: (${typeof requestId}) ${requestId}`) } @@ -180,13 +184,13 @@ export async function waitForResponse({ requestId, timeoutMessage = `Waiting for ...opts, requestId, timeoutMessage, - matchFn(res) { + matchFn(res: Todo) { return res.requestId === requestId } }) } -export async function waitForRequestResponse(client, request, opts = {}) { +export async function waitForRequestResponse(client: StreamrClient, request: Todo, opts: Todo = {}) { return waitForResponse({ connection: client.connection, types: PAIRS.get(request.type), From 6a49d7ece4bc9226b037ea9f138f886ef191fec0 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 14:57:42 +0200 Subject: [PATCH 03/11] StreamPartDefinition --- src/StreamrClient.ts | 9 ++++--- src/rest/StreamEndpoints.ts | 2 ++ src/stream/index.ts | 5 ++++ src/stream/utils.ts | 3 ++- src/subscribe/index.ts | 16 +++++------ test/unit/StreamUtils.test.ts | 50 +++++++++++++++++++++++++++++++++++ test/utils.ts | 11 ++++++++ 7 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 test/unit/StreamUtils.test.ts diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index ae9c1ede7..0327915ee 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -18,6 +18,7 @@ import { DataUnion, DataUnionDeployOptions } from './dataunion/DataUnion' import { BigNumber } from '@ethersproject/bignumber' import { getAddress } from '@ethersproject/address' import { Contract } from '@ethersproject/contracts' +import { StreamPartDefinition } from './stream' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> @@ -280,13 +281,13 @@ export class StreamrClient extends EventEmitter { ]) } - getSubscriptions(...args: Todo) { - return this.subscriber.getAll(...args) + getSubscriptions(): Subscription[] { + return this.subscriber.getAll() } - getSubscription(...args: Todo) { + getSubscription(definition: StreamPartDefinition) { // @ts-expect-error - return this.subscriber.get(...args) + return this.subscriber.get(definition) } async ensureConnected() { diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index 7005f47f5..1391319ab 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -226,6 +226,7 @@ export class StreamEndpoints { } async getStreamLast(streamObjectOrId: Stream|string): Promise { + // @ts-expect-error const { streamId, streamPartition = 0, count = 1 } = validateOptions(streamObjectOrId) this.client.debug('getStreamLast %o', { streamId, @@ -234,6 +235,7 @@ export class StreamEndpoints { }) const url = ( + // @ts-expect-error getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last') + `?${qs.stringify({ count })}` ) diff --git a/src/stream/index.ts b/src/stream/index.ts index 45c45465c..5b7ec979f 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -5,6 +5,11 @@ import StorageNode from './StorageNode' import { StreamrClient } from '../StreamrClient' import { Todo } from '../types' +// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition +export type StreamPartDefinition = string | { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream } + +export type ValidatedStreamPartDefinition = { streamId: string, streamPartition: number, key: string} + interface StreamPermisionBase { id: number operation: StreamOperation diff --git a/src/stream/utils.ts b/src/stream/utils.ts index cd872ce9a..bf1021576 100644 --- a/src/stream/utils.ts +++ b/src/stream/utils.ts @@ -9,6 +9,7 @@ import { ControlLayer } from 'streamr-client-protocol' import { pTimeout } from '../utils' import { Todo } from '../types' import { StreamrClient } from '../StreamrClient' +import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.' export function StreamKey({ streamId, streamPartition = 0 }: Todo) { if (streamId == null) { throw new Error(`StreamKey: invalid streamId (${typeof streamId}): ${streamId}`) } @@ -19,7 +20,7 @@ export function StreamKey({ streamId, streamPartition = 0 }: Todo) { return `${streamId}::${streamPartition}` } -export function validateOptions(optionsOrStreamId: Todo): Todo { +export function validateOptions(optionsOrStreamId: StreamPartDefinition): ValidatedStreamPartDefinition { if (!optionsOrStreamId) { throw new Error('streamId is required!') } diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index b7ce2e205..a13dfc65c 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -11,7 +11,7 @@ import Validator from './Validator' import messageStream from './messageStream' import resendStream from './resendStream' import { Todo } from '../types' -import StreamrClient from '..' +import StreamrClient, { StreamPartDefinition } from '..' export class Subscription extends Emitter { @@ -441,7 +441,7 @@ class Subscriptions { /** * Remove all subscriptions, optionally only those matching options. */ - async removeAll(options?: Todo) { + async removeAll(options?: StreamPartDefinition) { const subs = this.get(options) return allSettledValues(subs.map((sub: Todo) => ( this.remove(sub) @@ -464,7 +464,7 @@ class Subscriptions { * Count all matching subscriptions. */ - count(options: Todo) { + count(options?: StreamPartDefinition) { if (options === undefined) { return this.countAll() } return this.get(options).length } @@ -493,7 +493,7 @@ class Subscriptions { * Get all subscriptions matching options. */ - get(options: Todo) { + get(options?: StreamPartDefinition) { if (options === undefined) { return this.getAll() } const { key } = validateOptions(options) @@ -522,12 +522,11 @@ export class Subscriber { return this.subscriptions.getSubscriptionSession(...args) } - getAll(...args: Todo[]) { - // @ts-expect-error - return this.subscriptions.getAll(...args) + getAll() { + return this.subscriptions.getAll() } - count(options: Todo[]) { + count(options?: StreamPartDefinition) { return this.subscriptions.count(options) } @@ -570,6 +569,7 @@ export class Subscriber { const options = validateOptions(opts) const resendMessageStream = resendStream(this.client, options) + // @ts-expect-error const realtimeMessageStream = messageStream(this.client.connection, options) // cancel both streams on end diff --git a/test/unit/StreamUtils.test.ts b/test/unit/StreamUtils.test.ts new file mode 100644 index 000000000..c167e0d5b --- /dev/null +++ b/test/unit/StreamUtils.test.ts @@ -0,0 +1,50 @@ +import { Stream } from '../../src/stream' +import { validateOptions } from '../../src/stream/utils' + +describe('Stream utils', () => { + + it('no definition', () => { + expect(() => validateOptions(undefined)).toThrow() + expect(() => validateOptions(null)).toThrow() + expect(() => validateOptions({})).toThrow() + }) + + it('string', () => { + expect(validateOptions('foo')).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + }) + + it('object', () => { + expect(validateOptions({ streamId: 'foo' })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) + expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) + }) + + it('stream', () => { + const stream = new Stream(undefined as any, { + id: 'foo', + name: 'bar' + }) + expect(validateOptions({ stream })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + }) + +}) diff --git a/test/utils.ts b/test/utils.ts index b691ff5ac..860c5fa9b 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -76,6 +76,7 @@ export function getWaitForStorage(client: StreamrClient, defaultOpts = {}) { /* eslint-disable no-await-in-loop */ return async (publishRequest: any, opts = {}) => { const { + // @ts-expect-error streamId, streamPartition = 0, interval = 500, timeout = 5000, count = 100, messageMatchFn = defaultMessageMatchFn } = validateOptions({ ...defaultOpts, @@ -145,15 +146,25 @@ export function getPublishTestMessages(client: StreamrClient, defaultOpts = {}) const { streamId, streamPartition = 0, + // @ts-expect-error delay = 100, + // @ts-expect-error timeout = 3500, + // @ts-expect-error waitForLast = false, // wait for message to hit storage + // @ts-expect-error waitForLastCount, + // @ts-expect-error waitForLastTimeout, + // @ts-expect-error beforeEach = (m: any) => m, + // @ts-expect-error afterEach = () => {}, + // @ts-expect-error timestamp, + // @ts-expect-error partitionKey, + // @ts-expect-error createMessage = () => { msgCount += 1 return { From 0a755e3653c61d10c58f455788cabda73ae47bfe Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 14:58:51 +0200 Subject: [PATCH 04/11] Fix: read partition field from stream definition --- src/stream/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/utils.ts b/src/stream/utils.ts index bf1021576..91c716b7d 100644 --- a/src/stream/utils.ts +++ b/src/stream/utils.ts @@ -45,7 +45,7 @@ export function validateOptions(optionsOrStreamId: StreamPartDefinition): Valida options.streamId = optionsOrStreamId.id } - if (optionsOrStreamId.partition == null && optionsOrStreamId.streamPartition == null) { + if (optionsOrStreamId.partition != null && optionsOrStreamId.streamPartition == null) { options.streamPartition = optionsOrStreamId.partition } From 5de2330b3c07c793fb73acee28ee1bf82d316c81 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 15:04:47 +0200 Subject: [PATCH 05/11] enableAutoConnect, enableAutoDisconnect --- src/StreamrClient.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 0327915ee..236a991c7 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -374,12 +374,12 @@ export class StreamrClient extends EventEmitter { return task } - enableAutoConnect(...args: Todo) { - return this.connection.enableAutoConnect(...args) + enableAutoConnect(autoConnect?: boolean) { + return this.connection.enableAutoConnect(autoConnect) } - enableAutoDisconnect(...args: Todo) { - return this.connection.enableAutoDisconnect(...args) + enableAutoDisconnect(autoDisconnect?: boolean) { + return this.connection.enableAutoDisconnect(autoDisconnect) } getAddress(): EthereumAddress { From acea1fa1e1a0b4698b2ed57e580b50e581df0dd8 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 15:44:11 +0200 Subject: [PATCH 06/11] subscribe, unsubscribe --- src/StreamrClient.ts | 19 ++++++++++++++----- src/stream/index.ts | 5 ++--- src/subscribe/index.ts | 17 ++++++++++------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 236a991c7..e81dc14ab 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -23,6 +23,14 @@ import { StreamPartDefinition } from './stream' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> +// TODO check if these are correct +export interface SubscribeOptions { + resend?: boolean + from?: { timestamp: number, sequenceNumber?: number } + to?: { timestamp: number, sequenceNumber?: number } + last?: number +} + interface MessageEvent { data: any } @@ -302,8 +310,8 @@ export class StreamrClient extends EventEmitter { return this.session.logout() } - async publish(...args: Todo) { - return this.publisher.publish(...args) + async publish(streamObjectOrId: StreamPartDefinition, content: object, timestamp?: number|string|Date, partitionKey?: string) { + return this.publisher.publish(streamObjectOrId, content, timestamp, partitionKey) } async getUserId() { @@ -320,7 +328,7 @@ export class StreamrClient extends EventEmitter { return this.publisher.rotateGroupKey(...args) } - async subscribe(opts: Todo, onMessage?: OnMessageCallback) { + async subscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage?: OnMessageCallback) { let subTask: Todo let sub: Todo const hasResend = !!(opts.resend || opts.from || opts.to || opts.last) @@ -351,10 +359,11 @@ export class StreamrClient extends EventEmitter { return subTask } - async unsubscribe(opts: Todo) { - await this.subscriber.unsubscribe(opts) + async unsubscribe(subscription: Subscription) { + await this.subscriber.unsubscribe(subscription) } + /** @internal */ async resend(opts: Todo, onMessage?: OnMessageCallback): Promise { const task = this.subscriber.resend(opts) if (typeof onMessage !== 'function') { diff --git a/src/stream/index.ts b/src/stream/index.ts index 5b7ec979f..4476be363 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -3,7 +3,6 @@ import authFetch from '../rest/authFetch' import StorageNode from './StorageNode' import { StreamrClient } from '../StreamrClient' -import { Todo } from '../types' // TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition export type StreamPartDefinition = string | { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream } @@ -252,7 +251,7 @@ export class Stream { return json.map((item: any) => new StorageNode(item.storageNodeAddress)) } - async publish(...theArgs: Todo) { - return this._client.publish(this.id, ...theArgs) + async publish(content: object, timestamp?: number|string|Date, partitionKey?: string) { + return this._client.publish(this.id, content, timestamp, partitionKey) } } diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index a13dfc65c..017bd9a84 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -11,7 +11,7 @@ import Validator from './Validator' import messageStream from './messageStream' import resendStream from './resendStream' import { Todo } from '../types' -import StreamrClient, { StreamPartDefinition } from '..' +import StreamrClient, { StreamPartDefinition, SubscribeOptions } from '..' export class Subscription extends Emitter { @@ -375,7 +375,7 @@ class Subscriptions { this.subSessions = new Map() } - async add(opts: Todo, onFinally: Todo = async () => {}) { + async add(opts: StreamPartDefinition, onFinally: Todo = async () => {}) { const options = validateOptions(opts) const { key } = options @@ -530,21 +530,23 @@ export class Subscriber { return this.subscriptions.count(options) } - async subscribe(...args: Todo[]) { - // @ts-expect-error - return this.subscriptions.add(...args) + async subscribe(opts: StreamPartDefinition, onFinally?: Todo) { + return this.subscriptions.add(opts, onFinally) } - async unsubscribe(options: Todo): Promise { + async unsubscribe(options: Subscription | StreamPartDefinition | { options: Subscription|StreamPartDefinition }): Promise { if (options instanceof Subscription) { const sub = options return sub.cancel() } + // @ts-expect-error if (options && options.options) { + // @ts-expect-error return this.unsubscribe(options.options) } + // @ts-expect-error return this.subscriptions.removeAll(options) } @@ -563,7 +565,7 @@ export class Subscriber { return sub } - async resendSubscribe(opts: Todo, onMessage: Todo) { + async resendSubscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage: Todo) { // This works by passing a custom message stream to a subscription // the custom message stream iterates resends, then iterates realtime const options = validateOptions(opts) @@ -644,6 +646,7 @@ export class Subscriber { const resendTask = resendMessageStream.subscribe() const realtimeTask = this.subscribe({ ...options, + // @ts-expect-error msgStream: it, afterSteps: [ async function* detectEndOfResend(src: Todo) { From 14188153faf7c6c258ff51191c4d6c6f5002e236 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 15:57:47 +0200 Subject: [PATCH 07/11] Subscription --- src/subscribe/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index 017bd9a84..8cc5576b1 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -84,7 +84,7 @@ export class Subscription extends Emitter { * Collect all messages into an array. * Returns array when subscription is ended. */ - async collect(n?: Todo) { + async collect(n?: number) { const msgs = [] for await (const msg of this) { if (n === 0) { @@ -126,6 +126,7 @@ export class Subscription extends Emitter { return this.pipeline.throw(...args) } + // TODO should we expose this to the user as no-args method? async unsubscribe(...args: Todo[]) { return this.cancel(...args) } From 9d5e27d653f17a2f34e82dc197fde0ecc411bdf1 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 16:02:18 +0200 Subject: [PATCH 08/11] StreamEndpoints --- src/rest/StreamEndpoints.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index 1391319ab..cd5a0ca08 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -11,7 +11,7 @@ import StreamPart from '../stream/StreamPart' import { isKeyExchangeStream } from '../stream/KeyExchange' import authFetch, { ErrorCode, NotFoundError } from './authFetch' -import { EthereumAddress, Todo } from '../types' +import { EthereumAddress } from '../types' import { StreamrClient } from '../StreamrClient' // TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage' @@ -254,7 +254,7 @@ export class StreamEndpoints { return result } - async publishHttp(streamObjectOrId: Stream|string, data: Todo, requestOptions: Todo = {}, keepAlive: boolean = true) { + async publishHttp(streamObjectOrId: Stream|string, data: any, requestOptions: any = {}, keepAlive: boolean = true) { let streamId if (streamObjectOrId instanceof Stream) { streamId = streamObjectOrId.id From b0f2630b2c70d0d98c1bb5ddf88a758b813e501d Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 16:26:52 +0200 Subject: [PATCH 09/11] Fix test --- test/unit/StreamUtils.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/StreamUtils.test.ts b/test/unit/StreamUtils.test.ts index c167e0d5b..3cb5360aa 100644 --- a/test/unit/StreamUtils.test.ts +++ b/test/unit/StreamUtils.test.ts @@ -4,8 +4,8 @@ import { validateOptions } from '../../src/stream/utils' describe('Stream utils', () => { it('no definition', () => { - expect(() => validateOptions(undefined)).toThrow() - expect(() => validateOptions(null)).toThrow() + expect(() => validateOptions(undefined as any)).toThrow() + expect(() => validateOptions(null as any)).toThrow() expect(() => validateOptions({})).toThrow() }) From f5a43904603c0618a1a6227e214d6f7e4c7a8edc Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 17:39:39 +0200 Subject: [PATCH 10/11] Fix resend options --- src/StreamrClient.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index e81dc14ab..2157b733f 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -23,14 +23,16 @@ import { StreamPartDefinition } from './stream' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> -// TODO check if these are correct -export interface SubscribeOptions { - resend?: boolean +export type ResendOptions = { from?: { timestamp: number, sequenceNumber?: number } to?: { timestamp: number, sequenceNumber?: number } last?: number } +export type SubscribeOptions = { + resend?: ResendOptions +} & ResendOptions + interface MessageEvent { data: any } From 4ec19df4e50d6f1b82abade1d27cd612623a0213 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Mar 2021 18:12:20 +0200 Subject: [PATCH 11/11] Unsubscribe doesn't need parameters --- src/subscribe/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index 8cc5576b1..7cb1320f4 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -127,8 +127,8 @@ export class Subscription extends Emitter { } // TODO should we expose this to the user as no-args method? - async unsubscribe(...args: Todo[]) { - return this.cancel(...args) + async unsubscribe() { + return this.cancel() } }