Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"@types/qs": "^6.9.5",
"@types/sinon": "^9.0.10",
"@types/uuid": "^8.3.0",
"@types/ws": "^7.4.0",
"@typescript-eslint/eslint-plugin": "^4.15.1",
"@typescript-eslint/parser": "^4.15.1",
"babel-loader": "^8.2.2",
Expand Down
102 changes: 66 additions & 36 deletions src/Connection.js → src/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,31 @@ import Debug from 'debug'
import WebSocket from 'ws'

import { Scaffold, counterId, pLimitFn, pOne } from './utils'
import { Todo } from './types'

const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))

// add global support for pretty millisecond formatting with %n
// @ts-expect-error
Debug.formatters.n = (v) => Debug.humanize(v)

export class ConnectionError extends Error {
constructor(err, ...args) {
reason?: Todo

constructor(err: Todo, ...args: Todo[]) {
if (err instanceof ConnectionError) {
return err
}

if (err && err.stack) {
const { message, stack } = err
// @ts-expect-error
super(message, ...args)
Object.assign(this, err)
this.stack = stack
this.reason = err
} else {
// @ts-expect-error
super(err, ...args)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor)
Expand All @@ -35,7 +41,7 @@ export class ConnectionError extends Error {
const openSockets = new Set()
const FORCE_CLOSED = Symbol('FORCE_CLOSED')

async function OpenWebSocket(url, opts, ...args) {
async function OpenWebSocket(url: string, opts: Todo, ...args: Todo[]) {
return new Promise((resolve, reject) => {
try {
if (!url) {
Expand All @@ -44,8 +50,9 @@ async function OpenWebSocket(url, opts, ...args) {
throw err
}

// @ts-expect-error
const socket = process.browser ? new WebSocket(url) : new WebSocket(url, opts, ...args)
let error
let error: Todo
Object.assign(socket, {
id: counterId('ws'),
binaryType: 'arraybuffer',
Expand All @@ -57,31 +64,34 @@ async function OpenWebSocket(url, opts, ...args) {
openSockets.delete(socket)
reject(new ConnectionError(error || 'socket closed'))
},
onerror(event) {
onerror(event: Todo) {
error = new ConnectionError(event.error || event)
},
})

// attach debug
// @ts-expect-error
socket.debug = opts.debug.extend(socket.id)
// @ts-expect-error
socket.debug.color = opts.debug.color // use existing colour
} catch (err) {
reject(err)
}
})
}

async function CloseWebSocket(socket) {
async function CloseWebSocket(socket: Todo) {
return new Promise((resolve, reject) => {
if (!socket || socket.readyState === WebSocket.CLOSED) {
resolve()
resolve(undefined)
return
}

const waitThenClose = () => (
resolve(CloseWebSocket(socket))
)

// @ts-expect-error
if (socket.readyState === WebSocket.OPENING) {
socket.addEventListener('error', waitThenClose)
socket.addEventListener('open', waitThenClose)
Expand Down Expand Up @@ -110,17 +120,17 @@ const STATE = {
}

/* eslint-disable no-underscore-dangle, no-param-reassign */
function SocketConnector(connection) {
let next
let socket
function SocketConnector(connection: Todo) {
let next: Todo
let socket: Todo
let startedConnecting = false
let didCloseUnexpectedly = false

const onClose = () => {
didCloseUnexpectedly = true
if (!next.pendingCount && !next.activeCount) {
// if no pending actions run next & emit any errors
next().catch((err) => {
next().catch((err: Todo) => {
connection.emit('error', err)
})
}
Expand Down Expand Up @@ -207,7 +217,7 @@ function SocketConnector(connection) {
},
// attach message handler
() => {
const onMessage = (messageEvent, ...args) => {
const onMessage = (messageEvent: Todo, ...args: Todo|[]) => {
connection.emit('message', messageEvent, ...args)
}
socket.addEventListener('message', onMessage)
Expand Down Expand Up @@ -275,22 +285,36 @@ const DEFAULT_MAX_RETRIES = 10
*/

export default class Connection extends EventEmitter {

_debug: Todo
options: Todo
retryCount: Todo
wantsState: Todo
connectionHandles: Todo
step: Todo
socket?: Todo
didDisableAutoConnect?: Todo
isWaiting?: Todo
_isReconnecting: Todo
_backoffTimeout: Todo
sendID: Todo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try removing :Todo, some of these might "just work" as of TS 4.0: https://devblogs.microsoft.com/typescript/announcing-typescript-4-0/#class-property-inference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, onReconnecting and onMessage work also ok without type definition. Marked the fields as Todo so that we'll define the actual type in the future (and the full benefit of the type system).


static getOpen() {
return openSockets.size
}

static async closeOpen() {
return Promise.all([...openSockets].map(async (socket) => {
return Promise.all([...openSockets].map(async (socket: Todo) => {
socket[FORCE_CLOSED] = true // eslint-disable-line no-param-reassign
return CloseWebSocket(socket).catch((err) => {
socket.debug(err) // ignore error
})
}))
}

constructor(options = {}, client) {
constructor(options = {}, debug?: Debug.Debugger) {
super()
this._debug = client.debug.extend(counterId(this.constructor.name))
this._debug = (debug !== undefined) ? debug.extend(counterId(this.constructor.name)) : Debug(`StreamrClient::${counterId(this.constructor.name)}`)

this.options = options
this.options.autoConnect = !!this.options.autoConnect
Expand All @@ -303,19 +327,20 @@ export default class Connection extends EventEmitter {
this.backoffWait = pLimitFn(this.backoffWait.bind(this))
this.step = SocketConnector(this)
this.debug = this.debug.bind(this)
// @ts-expect-error
this.maybeConnect = pOne(this.maybeConnect.bind(this))
this.nextConnection = pOne(this.nextConnection.bind(this))
this.nextDisconnection = pOne(this.nextDisconnection.bind(this))
}

debug(...args) {
debug(...args: Todo[]) {
if (this.socket) {
return this.socket.debug(...args)
}
return this._debug(...args)
}

emit(event, ...args) {
emit(event: Todo, ...args: Todo[]) {
if (event === 'error') {
let [err] = args
const [, ...rest] = args
Expand All @@ -342,7 +367,7 @@ export default class Connection extends EventEmitter {
return result
}

emitTransition(event, ...args) {
emitTransition(event: Todo, ...args: Todo[]) {
const prevWantsState = this.wantsState
if (prevWantsState === STATE.AUTO) {
return this.emit(event, ...args)
Expand Down Expand Up @@ -426,25 +451,25 @@ export default class Connection extends EventEmitter {

this.isWaiting = true
return new Promise((resolve, reject) => {
let onError
let onDone
let onError: Todo
let onDone: Todo
const onConnected = () => {
this.off('done', onDone)
this.off('error', onError)
this.off('_error', onError)
resolve()
resolve(undefined)
}
onDone = (err) => {
onDone = (err: Todo) => {
this.off('error', onError)
this.off('_error', onError)
this.off('connected', onConnected)
if (err) {
reject(err)
} else {
resolve()
resolve(undefined)
}
}
onError = (err) => {
onError = (err: Todo) => {
this.off('done', onDone)
this.off('connected', onConnected)
reject(err)
Expand Down Expand Up @@ -504,7 +529,7 @@ export default class Connection extends EventEmitter {
await this.step()
}

async needsConnection(msg) {
async needsConnection(msg?: Todo) {
await this.maybeConnect()
if (!this.isConnected()) {
const { autoConnect, autoDisconnect } = this.options
Expand Down Expand Up @@ -550,12 +575,12 @@ export default class Connection extends EventEmitter {
}

return new Promise((resolve, reject) => {
let onError
let onError: Todo
const onDisconnected = () => {
this.off('error', onError)
resolve()
resolve(undefined)
}
onError = (err) => {
onError = (err: Todo) => {
this.off('disconnected', onDisconnected)
reject(err)
}
Expand All @@ -576,7 +601,7 @@ export default class Connection extends EventEmitter {
debug('waiting %n', timeout)
this._backoffTimeout = setTimeout(() => {
debug('waited %n', timeout)
resolve()
resolve(undefined)
}, timeout)
})
}
Expand All @@ -585,7 +610,7 @@ export default class Connection extends EventEmitter {
* Auto Connect/Disconnect counters.
*/

async addHandle(id) {
async addHandle(id: Todo) {
if (
this.connectionHandles.has(id)
&& this.isConnected()
Expand All @@ -602,7 +627,7 @@ export default class Connection extends EventEmitter {
* When no more handles and autoDisconnect is true, disconnect.
*/

async removeHandle(id) {
async removeHandle(id: Todo) {
const hadConnection = this.connectionHandles.has(id)
this.connectionHandles.delete(id)
if (hadConnection && this._couldAutoDisconnect()) {
Expand All @@ -619,7 +644,7 @@ export default class Connection extends EventEmitter {
)
}

async send(msg) {
async send(msg: Todo) {
this.sendID = this.sendID + 1 || 1
const handle = `send${this.sendID}`
this.debug('(%s) send()', this.getState())
Expand All @@ -638,19 +663,20 @@ export default class Connection extends EventEmitter {
}
}

async _send(msg) {
async _send(msg: Todo) {
return new Promise((resolve, reject) => {
this.debug('(%s) >> %o', this.getState(), msg)
// promisify send
const data = typeof msg.serialize === 'function' ? msg.serialize() : msg
// send callback doesn't exist with browser websockets, just resolve
/* istanbul ignore next */
this.emit('_send', msg) // for informational purposes
// @ts-expect-error
if (process.browser) {
this.socket.send(data)
resolve(data)
} else {
this.socket.send(data, (err) => {
this.socket.send(data, (err: Todo) => {
/* istanbul ignore next */
if (err) {
reject(new ConnectionError(err))
Expand Down Expand Up @@ -727,9 +753,10 @@ export default class Connection extends EventEmitter {
onDisconnecting = () => {},
onDisconnected = () => {},
onDone = () => {},
// @ts-expect-error
onError,
}) {
let onDoneHandler
let onDoneHandler: Todo
const cleanUp = async () => {
this
.off('connecting', onConnecting)
Expand All @@ -742,8 +769,10 @@ export default class Connection extends EventEmitter {
}
}

onDoneHandler = async (...args) => {
onDoneHandler = async (...args: Todo[]) => {
// @ts-expect-error
cleanUp(...args)
// @ts-expect-error
return onDone(...args)
}

Expand All @@ -762,4 +791,5 @@ export default class Connection extends EventEmitter {
}
}

// @ts-expect-error
Connection.ConnectionError = ConnectionError
6 changes: 3 additions & 3 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export { StreamrClientOptions }

class StreamrConnection extends Connection {
// TODO define args type when we convert Connection class to TypeScript
constructor(options: Todo, client: StreamrClient) {
super(options, client)
constructor(options: Todo, debug?: Debug.Debugger) {
super(options, debug)
this.on('message', this.onConnectionMessage)
}

Expand Down Expand Up @@ -179,7 +179,7 @@ class StreamrClient extends EventEmitter {
this.on('error', this._onError) // attach before creating sub-components incase they fire error events

this.session = new Session(this, this.options.auth)
this.connection = connection || new StreamrConnection(this.options, this)
this.connection = connection || new StreamrConnection(this.options, this.debug)

this.connection
.on('connected', this.onConnectionConnected)
Expand Down
Loading