diff --git a/src/Connection.ts b/src/Connection.ts index 3b2a6977e..92c3fdb52 100644 --- a/src/Connection.ts +++ b/src/Connection.ts @@ -13,6 +13,16 @@ const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) // @ts-expect-error Debug.formatters.n = (v) => Debug.humanize(v) +export interface ConnectionOptions { + url?: string, + autoConnect?: boolean + autoDisconnect?: boolean + disconnectDelay?: number + maxRetries?: number + retryBackoffFactor?: number + maxRetryWait?: number +} + export class ConnectionError extends Error { reason?: Todo @@ -120,7 +130,7 @@ const STATE = { } /* eslint-disable no-underscore-dangle, no-param-reassign */ -function SocketConnector(connection: Todo) { +function SocketConnector(connection: Connection) { let next: Todo let socket: Todo let startedConnecting = false @@ -189,7 +199,7 @@ function SocketConnector(connection: Todo) { // connect async () => { startedConnecting = true - socket = await OpenWebSocket(connection.options.url, { + socket = await OpenWebSocket(connection.options.url!, { perMessageDeflate: false, debug: connection._debug, }) @@ -287,7 +297,7 @@ const DEFAULT_MAX_RETRIES = 10 export default class Connection extends EventEmitter { _debug: Todo - options: Todo + options: ConnectionOptions retryCount: Todo wantsState: Todo connectionHandles: Todo @@ -312,7 +322,7 @@ export default class Connection extends EventEmitter { })) } - constructor(options = {}, debug?: Debug.Debugger) { + constructor(options: ConnectionOptions = {}, debug?: Debug.Debugger) { super() this._debug = debug !== undefined ? debug.extend(counterId(this.constructor.name)) diff --git a/src/StreamrClient.ts b/src/StreamrClient.ts index 687614d94..2157b733f 100644 --- a/src/StreamrClient.ts +++ b/src/StreamrClient.ts @@ -7,7 +7,7 @@ import { validateOptions } from './stream/utils' import Config, { StreamrClientOptions, StrictStreamrClientOptions } from './Config' import StreamrEthereum from './Ethereum' import Session from './Session' -import Connection, { ConnectionError } from './Connection' +import Connection, { ConnectionError, ConnectionOptions } from './Connection' import Publisher from './publish' import { Subscriber, Subscription } from './subscribe' import { getUserId } from './user' @@ -18,10 +18,21 @@ import { DataUnion, DataUnionDeployOptions } from './dataunion/DataUnion' import { BigNumber } from '@ethersproject/bignumber' import { getAddress } from '@ethersproject/address' import { Contract } from '@ethersproject/contracts' +import { StreamPartDefinition } from './stream' // TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet) export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void> +export type ResendOptions = { + from?: { timestamp: number, sequenceNumber?: number } + to?: { timestamp: number, sequenceNumber?: number } + last?: number +} + +export type SubscribeOptions = { + resend?: ResendOptions +} & ResendOptions + interface MessageEvent { data: any } @@ -29,10 +40,9 @@ interface MessageEvent { /** * Wrap connection message events with message parsing. */ - class StreamrConnection extends Connection { // TODO define args type when we convert Connection class to TypeScript - constructor(options: Todo, debug?: Debug.Debugger) { + constructor(options: ConnectionOptions, debug?: Debug.Debugger) { super(options, debug) this.on('message', this.onConnectionMessage) } @@ -160,6 +170,7 @@ export class StreamrClient extends EventEmitter { /** @internal */ ethereum: StreamrEthereum + // TODO annotate connection parameter as internal parameter if possible? constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) { super() this.id = counterId(`${this.constructor.name}:${uid}`) @@ -280,13 +291,13 @@ export class StreamrClient extends EventEmitter { ]) } - getSubscriptions(...args: Todo) { - return this.subscriber.getAll(...args) + getSubscriptions(): Subscription[] { + return this.subscriber.getAll() } - getSubscription(...args: Todo) { + getSubscription(definition: StreamPartDefinition) { // @ts-expect-error - return this.subscriber.get(...args) + return this.subscriber.get(definition) } async ensureConnected() { @@ -301,8 +312,8 @@ export class StreamrClient extends EventEmitter { return this.session.logout() } - async publish(...args: Todo) { - return this.publisher.publish(...args) + async publish(streamObjectOrId: StreamPartDefinition, content: object, timestamp?: number|string|Date, partitionKey?: string) { + return this.publisher.publish(streamObjectOrId, content, timestamp, partitionKey) } async getUserId() { @@ -319,7 +330,7 @@ export class StreamrClient extends EventEmitter { return this.publisher.rotateGroupKey(...args) } - async subscribe(opts: Todo, onMessage?: OnMessageCallback) { + async subscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage?: OnMessageCallback) { let subTask: Todo let sub: Todo const hasResend = !!(opts.resend || opts.from || opts.to || opts.last) @@ -350,10 +361,11 @@ export class StreamrClient extends EventEmitter { return subTask } - async unsubscribe(opts: Todo) { - await this.subscriber.unsubscribe(opts) + async unsubscribe(subscription: Subscription) { + await this.subscriber.unsubscribe(subscription) } + /** @internal */ async resend(opts: Todo, onMessage?: OnMessageCallback): Promise { const task = this.subscriber.resend(opts) if (typeof onMessage !== 'function') { @@ -373,12 +385,12 @@ export class StreamrClient extends EventEmitter { return task } - enableAutoConnect(...args: Todo) { - return this.connection.enableAutoConnect(...args) + enableAutoConnect(autoConnect?: boolean) { + return this.connection.enableAutoConnect(autoConnect) } - enableAutoDisconnect(...args: Todo) { - return this.connection.enableAutoDisconnect(...args) + enableAutoDisconnect(autoDisconnect?: boolean) { + return this.connection.enableAutoDisconnect(autoDisconnect) } getAddress(): EthereumAddress { diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index 7005f47f5..cd5a0ca08 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -11,7 +11,7 @@ import StreamPart from '../stream/StreamPart' import { isKeyExchangeStream } from '../stream/KeyExchange' import authFetch, { ErrorCode, NotFoundError } from './authFetch' -import { EthereumAddress, Todo } from '../types' +import { EthereumAddress } from '../types' import { StreamrClient } from '../StreamrClient' // TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage' @@ -226,6 +226,7 @@ export class StreamEndpoints { } async getStreamLast(streamObjectOrId: Stream|string): Promise { + // @ts-expect-error const { streamId, streamPartition = 0, count = 1 } = validateOptions(streamObjectOrId) this.client.debug('getStreamLast %o', { streamId, @@ -234,6 +235,7 @@ export class StreamEndpoints { }) const url = ( + // @ts-expect-error getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last') + `?${qs.stringify({ count })}` ) @@ -252,7 +254,7 @@ export class StreamEndpoints { return result } - async publishHttp(streamObjectOrId: Stream|string, data: Todo, requestOptions: Todo = {}, keepAlive: boolean = true) { + async publishHttp(streamObjectOrId: Stream|string, data: any, requestOptions: any = {}, keepAlive: boolean = true) { let streamId if (streamObjectOrId instanceof Stream) { streamId = streamObjectOrId.id diff --git a/src/stream/index.ts b/src/stream/index.ts index 45c45465c..4476be363 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -3,7 +3,11 @@ import authFetch from '../rest/authFetch' import StorageNode from './StorageNode' import { StreamrClient } from '../StreamrClient' -import { Todo } from '../types' + +// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition +export type StreamPartDefinition = string | { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream } + +export type ValidatedStreamPartDefinition = { streamId: string, streamPartition: number, key: string} interface StreamPermisionBase { id: number @@ -247,7 +251,7 @@ export class Stream { return json.map((item: any) => new StorageNode(item.storageNodeAddress)) } - async publish(...theArgs: Todo) { - return this._client.publish(this.id, ...theArgs) + async publish(content: object, timestamp?: number|string|Date, partitionKey?: string) { + return this._client.publish(this.id, content, timestamp, partitionKey) } } diff --git a/src/stream/utils.js b/src/stream/utils.ts similarity index 84% rename from src/stream/utils.js rename to src/stream/utils.ts index 03a3ad5df..91c716b7d 100644 --- a/src/stream/utils.js +++ b/src/stream/utils.ts @@ -7,8 +7,11 @@ import { inspect } from 'util' import { ControlLayer } from 'streamr-client-protocol' import { pTimeout } from '../utils' +import { Todo } from '../types' +import { StreamrClient } from '../StreamrClient' +import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.' -export function StreamKey({ streamId, streamPartition = 0 }) { +export function StreamKey({ streamId, streamPartition = 0 }: Todo) { if (streamId == null) { throw new Error(`StreamKey: invalid streamId (${typeof streamId}): ${streamId}`) } if (!Number.isInteger(streamPartition) || streamPartition < 0) { @@ -17,13 +20,13 @@ export function StreamKey({ streamId, streamPartition = 0 }) { return `${streamId}::${streamPartition}` } -export function validateOptions(optionsOrStreamId) { +export function validateOptions(optionsOrStreamId: StreamPartDefinition): ValidatedStreamPartDefinition { if (!optionsOrStreamId) { throw new Error('streamId is required!') } // Backwards compatibility for giving a streamId as first argument - let options = {} + let options: Todo = {} if (typeof optionsOrStreamId === 'string') { options = { streamId: optionsOrStreamId, @@ -42,7 +45,7 @@ export function validateOptions(optionsOrStreamId) { options.streamId = optionsOrStreamId.id } - if (optionsOrStreamId.partition == null && optionsOrStreamId.streamPartition == null) { + if (optionsOrStreamId.partition != null && optionsOrStreamId.streamPartition == null) { options.streamPartition = optionsOrStreamId.partition } @@ -89,7 +92,7 @@ export async function waitForMatchingMessage({ rejectOnTimeout = true, timeoutMessage, cancelTask, -}) { +}: Todo) { if (typeof matchFn !== 'function') { throw new Error(`matchFn required, got: (${typeof matchFn}) ${matchFn}`) } @@ -98,7 +101,7 @@ export async function waitForMatchingMessage({ let cleanup = () => {} const matchTask = new Promise((resolve, reject) => { - const tryMatch = (...args) => { + const tryMatch = (...args: Todo[]) => { try { return matchFn(...args) } catch (err) { @@ -107,19 +110,20 @@ export async function waitForMatchingMessage({ return false } } - let onDisconnected - const onResponse = (res) => { + let onDisconnected: Todo + const onResponse = (res: Todo) => { if (!tryMatch(res)) { return } // clean up err handler cleanup() resolve(res) } - const onErrorResponse = (res) => { + const onErrorResponse = (res: Todo) => { if (!tryMatch(res)) { return } // clean up success handler cleanup() const error = new Error(res.errorMessage) + // @ts-expect-error error.code = res.errorCode reject(error) } @@ -128,12 +132,12 @@ export async function waitForMatchingMessage({ if (cancelTask) { cancelTask.catch(() => {}) } // ignore connection.off('disconnected', onDisconnected) connection.off(ControlMessage.TYPES.ErrorResponse, onErrorResponse) - types.forEach((type) => { + types.forEach((type: Todo) => { connection.off(type, onResponse) }) } - types.forEach((type) => { + types.forEach((type: Todo) => { connection.on(type, onResponse) }) @@ -141,6 +145,7 @@ export async function waitForMatchingMessage({ onDisconnected = () => { cleanup() + // @ts-expect-error resolve() // noop } @@ -171,7 +176,7 @@ export async function waitForMatchingMessage({ * Wait for matching response types to requestId, or ErrorResponse. */ -export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }) { +export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }: Todo) { if (requestId == null) { throw new Error(`requestId required, got: (${typeof requestId}) ${requestId}`) } @@ -180,13 +185,13 @@ export async function waitForResponse({ requestId, timeoutMessage = `Waiting for ...opts, requestId, timeoutMessage, - matchFn(res) { + matchFn(res: Todo) { return res.requestId === requestId } }) } -export async function waitForRequestResponse(client, request, opts = {}) { +export async function waitForRequestResponse(client: StreamrClient, request: Todo, opts: Todo = {}) { return waitForResponse({ connection: client.connection, types: PAIRS.get(request.type), diff --git a/src/subscribe/index.ts b/src/subscribe/index.ts index b7ce2e205..7cb1320f4 100644 --- a/src/subscribe/index.ts +++ b/src/subscribe/index.ts @@ -11,7 +11,7 @@ import Validator from './Validator' import messageStream from './messageStream' import resendStream from './resendStream' import { Todo } from '../types' -import StreamrClient from '..' +import StreamrClient, { StreamPartDefinition, SubscribeOptions } from '..' export class Subscription extends Emitter { @@ -84,7 +84,7 @@ export class Subscription extends Emitter { * Collect all messages into an array. * Returns array when subscription is ended. */ - async collect(n?: Todo) { + async collect(n?: number) { const msgs = [] for await (const msg of this) { if (n === 0) { @@ -126,8 +126,9 @@ export class Subscription extends Emitter { return this.pipeline.throw(...args) } - async unsubscribe(...args: Todo[]) { - return this.cancel(...args) + // TODO should we expose this to the user as no-args method? + async unsubscribe() { + return this.cancel() } } @@ -375,7 +376,7 @@ class Subscriptions { this.subSessions = new Map() } - async add(opts: Todo, onFinally: Todo = async () => {}) { + async add(opts: StreamPartDefinition, onFinally: Todo = async () => {}) { const options = validateOptions(opts) const { key } = options @@ -441,7 +442,7 @@ class Subscriptions { /** * Remove all subscriptions, optionally only those matching options. */ - async removeAll(options?: Todo) { + async removeAll(options?: StreamPartDefinition) { const subs = this.get(options) return allSettledValues(subs.map((sub: Todo) => ( this.remove(sub) @@ -464,7 +465,7 @@ class Subscriptions { * Count all matching subscriptions. */ - count(options: Todo) { + count(options?: StreamPartDefinition) { if (options === undefined) { return this.countAll() } return this.get(options).length } @@ -493,7 +494,7 @@ class Subscriptions { * Get all subscriptions matching options. */ - get(options: Todo) { + get(options?: StreamPartDefinition) { if (options === undefined) { return this.getAll() } const { key } = validateOptions(options) @@ -522,30 +523,31 @@ export class Subscriber { return this.subscriptions.getSubscriptionSession(...args) } - getAll(...args: Todo[]) { - // @ts-expect-error - return this.subscriptions.getAll(...args) + getAll() { + return this.subscriptions.getAll() } - count(options: Todo[]) { + count(options?: StreamPartDefinition) { return this.subscriptions.count(options) } - async subscribe(...args: Todo[]) { - // @ts-expect-error - return this.subscriptions.add(...args) + async subscribe(opts: StreamPartDefinition, onFinally?: Todo) { + return this.subscriptions.add(opts, onFinally) } - async unsubscribe(options: Todo): Promise { + async unsubscribe(options: Subscription | StreamPartDefinition | { options: Subscription|StreamPartDefinition }): Promise { if (options instanceof Subscription) { const sub = options return sub.cancel() } + // @ts-expect-error if (options && options.options) { + // @ts-expect-error return this.unsubscribe(options.options) } + // @ts-expect-error return this.subscriptions.removeAll(options) } @@ -564,12 +566,13 @@ export class Subscriber { return sub } - async resendSubscribe(opts: Todo, onMessage: Todo) { + async resendSubscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage: Todo) { // This works by passing a custom message stream to a subscription // the custom message stream iterates resends, then iterates realtime const options = validateOptions(opts) const resendMessageStream = resendStream(this.client, options) + // @ts-expect-error const realtimeMessageStream = messageStream(this.client.connection, options) // cancel both streams on end @@ -644,6 +647,7 @@ export class Subscriber { const resendTask = resendMessageStream.subscribe() const realtimeTask = this.subscribe({ ...options, + // @ts-expect-error msgStream: it, afterSteps: [ async function* detectEndOfResend(src: Todo) { diff --git a/test/unit/StreamUtils.test.ts b/test/unit/StreamUtils.test.ts new file mode 100644 index 000000000..3cb5360aa --- /dev/null +++ b/test/unit/StreamUtils.test.ts @@ -0,0 +1,50 @@ +import { Stream } from '../../src/stream' +import { validateOptions } from '../../src/stream/utils' + +describe('Stream utils', () => { + + it('no definition', () => { + expect(() => validateOptions(undefined as any)).toThrow() + expect(() => validateOptions(null as any)).toThrow() + expect(() => validateOptions({})).toThrow() + }) + + it('string', () => { + expect(validateOptions('foo')).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + }) + + it('object', () => { + expect(validateOptions({ streamId: 'foo' })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) + expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) + }) + + it('stream', () => { + const stream = new Stream(undefined as any, { + id: 'foo', + name: 'bar' + }) + expect(validateOptions({ stream })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + }) + +}) diff --git a/test/utils.ts b/test/utils.ts index b691ff5ac..860c5fa9b 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -76,6 +76,7 @@ export function getWaitForStorage(client: StreamrClient, defaultOpts = {}) { /* eslint-disable no-await-in-loop */ return async (publishRequest: any, opts = {}) => { const { + // @ts-expect-error streamId, streamPartition = 0, interval = 500, timeout = 5000, count = 100, messageMatchFn = defaultMessageMatchFn } = validateOptions({ ...defaultOpts, @@ -145,15 +146,25 @@ export function getPublishTestMessages(client: StreamrClient, defaultOpts = {}) const { streamId, streamPartition = 0, + // @ts-expect-error delay = 100, + // @ts-expect-error timeout = 3500, + // @ts-expect-error waitForLast = false, // wait for message to hit storage + // @ts-expect-error waitForLastCount, + // @ts-expect-error waitForLastTimeout, + // @ts-expect-error beforeEach = (m: any) => m, + // @ts-expect-error afterEach = () => {}, + // @ts-expect-error timestamp, + // @ts-expect-error partitionKey, + // @ts-expect-error createMessage = () => { msgCount += 1 return {