Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
// @ts-expect-error
Debug.formatters.n = (v) => Debug.humanize(v)

export interface ConnectionOptions {
url?: string,
autoConnect?: boolean
autoDisconnect?: boolean
disconnectDelay?: number
maxRetries?: number
retryBackoffFactor?: number
maxRetryWait?: number
}

export class ConnectionError extends Error {
reason?: Todo

Expand Down Expand Up @@ -120,7 +130,7 @@ const STATE = {
}

/* eslint-disable no-underscore-dangle, no-param-reassign */
function SocketConnector(connection: Todo) {
function SocketConnector(connection: Connection) {
let next: Todo
let socket: Todo
let startedConnecting = false
Expand Down Expand Up @@ -189,7 +199,7 @@ function SocketConnector(connection: Todo) {
// connect
async () => {
startedConnecting = true
socket = await OpenWebSocket(connection.options.url, {
socket = await OpenWebSocket(connection.options.url!, {
perMessageDeflate: false,
debug: connection._debug,
})
Expand Down Expand Up @@ -287,7 +297,7 @@ const DEFAULT_MAX_RETRIES = 10
export default class Connection extends EventEmitter {

_debug: Todo
options: Todo
options: ConnectionOptions
retryCount: Todo
wantsState: Todo
connectionHandles: Todo
Expand All @@ -312,7 +322,7 @@ export default class Connection extends EventEmitter {
}))
}

constructor(options = {}, debug?: Debug.Debugger) {
constructor(options: ConnectionOptions = {}, debug?: Debug.Debugger) {
super()
this._debug = debug !== undefined
? debug.extend(counterId(this.constructor.name))
Expand Down
44 changes: 28 additions & 16 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { validateOptions } from './stream/utils'
import Config, { StreamrClientOptions, StrictStreamrClientOptions } from './Config'
import StreamrEthereum from './Ethereum'
import Session from './Session'
import Connection, { ConnectionError } from './Connection'
import Connection, { ConnectionError, ConnectionOptions } from './Connection'
import Publisher from './publish'
import { Subscriber, Subscription } from './subscribe'
import { getUserId } from './user'
Expand All @@ -18,21 +18,31 @@ import { DataUnion, DataUnionDeployOptions } from './dataunion/DataUnion'
import { BigNumber } from '@ethersproject/bignumber'
import { getAddress } from '@ethersproject/address'
import { Contract } from '@ethersproject/contracts'
import { StreamPartDefinition } from './stream'

// TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet)
export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void>

export type ResendOptions = {
from?: { timestamp: number, sequenceNumber?: number }
to?: { timestamp: number, sequenceNumber?: number }
last?: number
}

export type SubscribeOptions = {
resend?: ResendOptions
} & ResendOptions

interface MessageEvent {
data: any
}

/**
* Wrap connection message events with message parsing.
*/

class StreamrConnection extends Connection {
// TODO define args type when we convert Connection class to TypeScript
constructor(options: Todo, debug?: Debug.Debugger) {
constructor(options: ConnectionOptions, debug?: Debug.Debugger) {
super(options, debug)
this.on('message', this.onConnectionMessage)
}
Expand Down Expand Up @@ -160,6 +170,7 @@ export class StreamrClient extends EventEmitter {
/** @internal */
ethereum: StreamrEthereum

// TODO annotate connection parameter as internal parameter if possible?
constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) {
super()
this.id = counterId(`${this.constructor.name}:${uid}`)
Expand Down Expand Up @@ -280,13 +291,13 @@ export class StreamrClient extends EventEmitter {
])
}

getSubscriptions(...args: Todo) {
return this.subscriber.getAll(...args)
getSubscriptions(): Subscription[] {
return this.subscriber.getAll()
}

getSubscription(...args: Todo) {
getSubscription(definition: StreamPartDefinition) {
// @ts-expect-error
return this.subscriber.get(...args)
return this.subscriber.get(definition)
}

async ensureConnected() {
Expand All @@ -301,8 +312,8 @@ export class StreamrClient extends EventEmitter {
return this.session.logout()
}

async publish(...args: Todo) {
return this.publisher.publish(...args)
async publish(streamObjectOrId: StreamPartDefinition, content: object, timestamp?: number|string|Date, partitionKey?: string) {
return this.publisher.publish(streamObjectOrId, content, timestamp, partitionKey)
}

async getUserId() {
Expand All @@ -319,7 +330,7 @@ export class StreamrClient extends EventEmitter {
return this.publisher.rotateGroupKey(...args)
}

async subscribe(opts: Todo, onMessage?: OnMessageCallback) {
async subscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage?: OnMessageCallback) {
let subTask: Todo
let sub: Todo
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
Expand Down Expand Up @@ -350,10 +361,11 @@ export class StreamrClient extends EventEmitter {
return subTask
}

async unsubscribe(opts: Todo) {
await this.subscriber.unsubscribe(opts)
async unsubscribe(subscription: Subscription) {
await this.subscriber.unsubscribe(subscription)
}

/** @internal */
async resend(opts: Todo, onMessage?: OnMessageCallback): Promise<Subscription> {
const task = this.subscriber.resend(opts)
if (typeof onMessage !== 'function') {
Expand All @@ -373,12 +385,12 @@ export class StreamrClient extends EventEmitter {
return task
}

enableAutoConnect(...args: Todo) {
return this.connection.enableAutoConnect(...args)
enableAutoConnect(autoConnect?: boolean) {
return this.connection.enableAutoConnect(autoConnect)
}

enableAutoDisconnect(...args: Todo) {
return this.connection.enableAutoDisconnect(...args)
enableAutoDisconnect(autoDisconnect?: boolean) {
return this.connection.enableAutoDisconnect(autoDisconnect)
}

getAddress(): EthereumAddress {
Expand Down
6 changes: 4 additions & 2 deletions src/rest/StreamEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import StreamPart from '../stream/StreamPart'
import { isKeyExchangeStream } from '../stream/KeyExchange'

import authFetch, { ErrorCode, NotFoundError } from './authFetch'
import { EthereumAddress, Todo } from '../types'
import { EthereumAddress } from '../types'
import { StreamrClient } from '../StreamrClient'
// TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly
import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage'
Expand Down Expand Up @@ -226,6 +226,7 @@ export class StreamEndpoints {
}

async getStreamLast(streamObjectOrId: Stream|string): Promise<StreamMessageAsObject> {
// @ts-expect-error
const { streamId, streamPartition = 0, count = 1 } = validateOptions(streamObjectOrId)
this.client.debug('getStreamLast %o', {
streamId,
Expand All @@ -234,6 +235,7 @@ export class StreamEndpoints {
})

const url = (
// @ts-expect-error
getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last')
+ `?${qs.stringify({ count })}`
)
Expand All @@ -252,7 +254,7 @@ export class StreamEndpoints {
return result
}

async publishHttp(streamObjectOrId: Stream|string, data: Todo, requestOptions: Todo = {}, keepAlive: boolean = true) {
async publishHttp(streamObjectOrId: Stream|string, data: any, requestOptions: any = {}, keepAlive: boolean = true) {
let streamId
if (streamObjectOrId instanceof Stream) {
streamId = streamObjectOrId.id
Expand Down
10 changes: 7 additions & 3 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import authFetch from '../rest/authFetch'

import StorageNode from './StorageNode'
import { StreamrClient } from '../StreamrClient'
import { Todo } from '../types'

// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
export type StreamPartDefinition = string | { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream }

export type ValidatedStreamPartDefinition = { streamId: string, streamPartition: number, key: string}

interface StreamPermisionBase {
id: number
Expand Down Expand Up @@ -247,7 +251,7 @@ export class Stream {
return json.map((item: any) => new StorageNode(item.storageNodeAddress))
}

async publish(...theArgs: Todo) {
return this._client.publish(this.id, ...theArgs)
async publish(content: object, timestamp?: number|string|Date, partitionKey?: string) {
return this._client.publish(this.id, content, timestamp, partitionKey)
}
}
33 changes: 19 additions & 14 deletions src/stream/utils.js → src/stream/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import { inspect } from 'util'
import { ControlLayer } from 'streamr-client-protocol'

import { pTimeout } from '../utils'
import { Todo } from '../types'
import { StreamrClient } from '../StreamrClient'
import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.'

export function StreamKey({ streamId, streamPartition = 0 }) {
export function StreamKey({ streamId, streamPartition = 0 }: Todo) {
if (streamId == null) { throw new Error(`StreamKey: invalid streamId (${typeof streamId}): ${streamId}`) }

if (!Number.isInteger(streamPartition) || streamPartition < 0) {
Expand All @@ -17,13 +20,13 @@ export function StreamKey({ streamId, streamPartition = 0 }) {
return `${streamId}::${streamPartition}`
}

export function validateOptions(optionsOrStreamId) {
export function validateOptions(optionsOrStreamId: StreamPartDefinition): ValidatedStreamPartDefinition {
if (!optionsOrStreamId) {
throw new Error('streamId is required!')
}

// Backwards compatibility for giving a streamId as first argument
let options = {}
let options: Todo = {}
if (typeof optionsOrStreamId === 'string') {
options = {
streamId: optionsOrStreamId,
Expand All @@ -42,7 +45,7 @@ export function validateOptions(optionsOrStreamId) {
options.streamId = optionsOrStreamId.id
}

if (optionsOrStreamId.partition == null && optionsOrStreamId.streamPartition == null) {
if (optionsOrStreamId.partition != null && optionsOrStreamId.streamPartition == null) {
options.streamPartition = optionsOrStreamId.partition
}

Expand Down Expand Up @@ -89,7 +92,7 @@ export async function waitForMatchingMessage({
rejectOnTimeout = true,
timeoutMessage,
cancelTask,
}) {
}: Todo) {
if (typeof matchFn !== 'function') {
throw new Error(`matchFn required, got: (${typeof matchFn}) ${matchFn}`)
}
Expand All @@ -98,7 +101,7 @@ export async function waitForMatchingMessage({
let cleanup = () => {}

const matchTask = new Promise((resolve, reject) => {
const tryMatch = (...args) => {
const tryMatch = (...args: Todo[]) => {
try {
return matchFn(...args)
} catch (err) {
Expand All @@ -107,19 +110,20 @@ export async function waitForMatchingMessage({
return false
}
}
let onDisconnected
const onResponse = (res) => {
let onDisconnected: Todo
const onResponse = (res: Todo) => {
if (!tryMatch(res)) { return }
// clean up err handler
cleanup()
resolve(res)
}

const onErrorResponse = (res) => {
const onErrorResponse = (res: Todo) => {
if (!tryMatch(res)) { return }
// clean up success handler
cleanup()
const error = new Error(res.errorMessage)
// @ts-expect-error
error.code = res.errorCode
reject(error)
}
Expand All @@ -128,19 +132,20 @@ export async function waitForMatchingMessage({
if (cancelTask) { cancelTask.catch(() => {}) } // ignore
connection.off('disconnected', onDisconnected)
connection.off(ControlMessage.TYPES.ErrorResponse, onErrorResponse)
types.forEach((type) => {
types.forEach((type: Todo) => {
connection.off(type, onResponse)
})
}

types.forEach((type) => {
types.forEach((type: Todo) => {
connection.on(type, onResponse)
})

connection.on(ControlMessage.TYPES.ErrorResponse, onErrorResponse)

onDisconnected = () => {
cleanup()
// @ts-expect-error
resolve() // noop
}

Expand Down Expand Up @@ -171,7 +176,7 @@ export async function waitForMatchingMessage({
* Wait for matching response types to requestId, or ErrorResponse.
*/

export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }) {
export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }: Todo) {
if (requestId == null) {
throw new Error(`requestId required, got: (${typeof requestId}) ${requestId}`)
}
Expand All @@ -180,13 +185,13 @@ export async function waitForResponse({ requestId, timeoutMessage = `Waiting for
...opts,
requestId,
timeoutMessage,
matchFn(res) {
matchFn(res: Todo) {
return res.requestId === requestId
}
})
}

export async function waitForRequestResponse(client, request, opts = {}) {
export async function waitForRequestResponse(client: StreamrClient, request: Todo, opts: Todo = {}) {
return waitForResponse({
connection: client.connection,
types: PAIRS.get(request.type),
Expand Down
Loading