Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
537062d
Add end parameter to PushQueue.from & pipeline. Prevents premature cl…
timoxley Feb 1, 2021
1d95da2
Add tests for PushQueue.from end parameter. PushQueue.from should buf…
timoxley Feb 1, 2021
e5857ff
Add autoEnd option to PushQueue.
timoxley Feb 1, 2021
468eb55
Use streamr-client-protocol@8.0.0-beta.1
timoxley Feb 1, 2021
08f9280
Fix decryption transform.
timoxley Feb 1, 2021
9f13e02
Add resend gapfill test. WIP.
timoxley Feb 8, 2021
d334d58
Add runtime-corejs3 to dependencies, update babel runtime deps.
timoxley Feb 10, 2021
942f7c4
Fix jest module resolution with npm linked deps.
timoxley Feb 11, 2021
5be869d
Default onFinally is async for type hinting.
timoxley Feb 11, 2021
2898004
Fix broken OrderMessages output.
timoxley Feb 11, 2021
7d1a91f
Convert jest.setup to ESM.
timoxley Feb 11, 2021
1147ee5
Refresh package-lock.
timoxley Feb 11, 2021
3ac9c57
Fix issues with gaps in resends. Test gap failure mode.
timoxley Feb 11, 2021
2ec2963
Ignore failed messages explicitly rather than treat them as unfillabl…
timoxley Feb 11, 2021
02c4d03
Fix handling for failed gapfills.
timoxley Feb 12, 2021
45d6174
Add onError option to Scaffold.
timoxley Feb 12, 2021
3e04753
Give each test message a count out of total so it's clear what it is.
timoxley Feb 12, 2021
d177205
Exclude gap related tests from 'npm run test-integration-no-resend'
timoxley Feb 12, 2021
a6f1055
Concatenate error messages with passed-in message in AggregatedError.
timoxley Feb 12, 2021
9b2cf39
Tune collecting of subscription errors.
timoxley Feb 12, 2021
f523e57
Ignore connection errors if should be disconnected during subscriptio…
timoxley Feb 12, 2021
5354b9e
Add another variant of flakey disconnect/reconnect subscribe test.
timoxley Feb 12, 2021
46c3a35
Make test/utils getPublishTestMessages timestamp option into optional…
timoxley Feb 16, 2021
106d57b
Rename pipeline steps.
timoxley Feb 16, 2021
3517ef0
Update flakey disconnect/reconnect subscribe test.
timoxley Feb 16, 2021
55c2b51
Use streamr-client-protocol@8.0.0-beta.2.
timoxley Feb 16, 2021
cb878f7
Linting
timoxley Feb 16, 2021
cd45f51
Line-length linting.
timoxley Feb 17, 2021
421d3ee
Remove superfluous @ts-expect-error directives preventing build.
timoxley Feb 17, 2021
f3f986a
Add client.options.maxGapRequests = 5 to client config, use to config…
timoxley Feb 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ module.exports = {
// globals: {},

// An array of directory names to be searched recursively up from the requiring module's location
// moduleDirectories: [
// "node_modules"
// ],

moduleDirectories: [
'node_modules',
path.resolve('./node_modules'), // makes npm link work.
],
// An array of file extensions your modules use
// moduleFileExtensions: [
// "js",
Expand Down
18 changes: 9 additions & 9 deletions jest.setup.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
const Debug = require('debug')
const GitRevisionPlugin = require('git-revision-webpack-plugin')
import GitRevisionPlugin from 'git-revision-webpack-plugin'
import Debug from 'debug'

const pkg = require('./package.json')

if (process.env.DEBUG_CONSOLE) {
// Use debug as console log
// This prevents jest messing with console output
// Ensuring debug messages are printed alongside console messages, in the correct order
console.log = Debug('Streamr::CONSOLE') // eslint-disable-line no-console
}
export default async () => {
if (process.env.DEBUG_CONSOLE) {
// Use debug as console log
// This prevents jest messing with console output
// Ensuring debug messages are printed alongside console messages, in the correct order
console.log = Debug('Streamr::CONSOLE') // eslint-disable-line no-console
}

module.exports = async () => {
if (!process.env.GIT_VERSION) {
const gitRevisionPlugin = new GitRevisionPlugin()
const [GIT_VERSION, GIT_COMMITHASH, GIT_BRANCH] = await Promise.all([
Expand Down
1,438 changes: 704 additions & 734 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"test-unit": "jest test/unit --detectOpenHandles",
"coverage": "jest --coverage",
"test-integration": "jest --forceExit test/integration",
"test-integration-no-resend": "jest --testTimeout=10000 --testPathIgnorePatterns='resend|Resend' --testNamePattern='^((?!(resend|Resend|resent|Resent)).)*$' test/integration/*.test.js",
"test-integration-no-resend": "jest --testTimeout=10000 --testPathIgnorePatterns='resend|Resend' --testNamePattern='^((?!(resend|Resend|resent|Resent|gap|Gap)).)*$' test/integration/*.test.js",
"test-integration-resend": "jest --testTimeout=15000 --testNamePattern='(resend|Resend|resent|Resent)' test/integration/*.test.js",
"test-integration-dataunions": "jest --testTimeout=15000 --runInBand test/integration/DataUnionEndpoints",
"test-flakey": "jest --forceExit test/flakey/*",
Expand All @@ -49,7 +49,7 @@
"@babel/plugin-proposal-class-properties": "^7.12.1",
"@babel/plugin-transform-classes": "^7.12.1",
"@babel/plugin-transform-modules-commonjs": "^7.12.1",
"@babel/plugin-transform-runtime": "^7.12.10",
"@babel/plugin-transform-runtime": "^7.12.15",
"@babel/preset-env": "^7.12.11",
"@babel/preset-typescript": "^7.12.13",
"@types/debug": "^4.1.5",
Expand Down Expand Up @@ -87,7 +87,8 @@
},
"#IMPORTANT": "babel-runtime must be in dependencies, not devDependencies",
"dependencies": {
"@babel/runtime": "^7.12.5",
"@babel/runtime": "^7.12.13",
"@babel/runtime-corejs3": "^7.12.13",
"@ethersproject/address": "^5.0.9",
"@ethersproject/bignumber": "^5.0.13",
"@ethersproject/bytes": "^5.0.9",
Expand All @@ -112,7 +113,7 @@
"qs": "^6.9.6",
"quick-lru": "^5.1.1",
"readable-stream": "^3.6.0",
"streamr-client-protocol": "^7.1.2",
"streamr-client-protocol": "^8.0.0-beta.2",
"typescript": "^4.1.4",
"uuid": "^8.3.2",
"webpack-node-externals": "^2.5.2",
Expand Down
2 changes: 1 addition & 1 deletion src/Config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import qs from 'qs'
// @ts-expect-error
import { ControlLayer, MessageLayer } from 'streamr-client-protocol'
import Debug from 'debug'

Expand Down Expand Up @@ -28,6 +27,7 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
orderMessages: true,
retryResendAfter: 5000,
gapFillTimeout: 5000,
maxGapRequests: 5,
maxPublishQueueSize: 10000,

// Encryption options
Expand Down
2 changes: 1 addition & 1 deletion src/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
// add global support for pretty millisecond formatting with %n
Debug.formatters.n = (v) => Debug.humanize(v)

class ConnectionError extends Error {
export class ConnectionError extends Error {
constructor(err, ...args) {
if (err instanceof ConnectionError) {
return err
Expand Down
16 changes: 12 additions & 4 deletions src/Session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@ export default class Session extends EventEmitter {
// TODO: move loginFunction to StreamrClient constructor where "auth type" is checked
if (typeof this.options.privateKey !== 'undefined') {
const wallet = new Wallet(this.options.privateKey)
this.loginFunction = async () => this._client.loginEndpoints.loginWithChallengeResponse((d: string) => wallet.signMessage(d), wallet.address)
this.loginFunction = async () => (
this._client.loginEndpoints.loginWithChallengeResponse((d: string) => wallet.signMessage(d), wallet.address)
)
} else if (typeof this.options.ethereum !== 'undefined') {
const provider = new Web3Provider(this.options.ethereum)
const signer = provider.getSigner()
this.loginFunction = async () => this._client.loginEndpoints.loginWithChallengeResponse((d: string) => signer.signMessage(d), await signer.getAddress())
this.loginFunction = async () => (
this._client.loginEndpoints.loginWithChallengeResponse((d: string) => signer.signMessage(d), await signer.getAddress())
)
} else if (typeof this.options.apiKey !== 'undefined') {
this.loginFunction = async () => this._client.loginEndpoints.loginWithApiKey(this.options.apiKey!)
this.loginFunction = async () => (
this._client.loginEndpoints.loginWithApiKey(this.options.apiKey!)
)
} else if (typeof this.options.username !== 'undefined' && typeof this.options.password !== 'undefined') {
this.loginFunction = async () => this._client.loginEndpoints.loginWithUsernamePassword(this.options.username!, this.options.password!)
this.loginFunction = async () => (
this._client.loginEndpoints.loginWithUsernamePassword(this.options.username!, this.options.password!)
)
} else {
if (!this.options.sessionToken) {
this.options.unauthenticated = true
Expand Down
7 changes: 3 additions & 4 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import EventEmitter from 'eventemitter3'
// @ts-expect-error
import { ControlLayer } from 'streamr-client-protocol'
import Debug from 'debug'

Expand All @@ -8,7 +7,7 @@ import { validateOptions } from './stream/utils'
import Config from './Config'
import StreamrEthereum from './Ethereum'
import Session from './Session'
import Connection from './Connection'
import Connection, { ConnectionError } from './Connection'
import Publisher from './publish'
import Subscriber from './subscribe'
import { getUserId } from './user'
Expand Down Expand Up @@ -237,12 +236,12 @@ export default class StreamrClient extends EventEmitter {
}

onConnectionError(err: Todo) {
this.emit('error', new Connection.ConnectionError(err))
this.emit('error', new ConnectionError(err))
}

getErrorEmitter(source: Todo) {
return (err: Todo) => {
if (!(err instanceof Connection.ConnectionError || err.reason instanceof Connection.ConnectionError)) {
if (!(err instanceof ConnectionError || err.reason instanceof ConnectionError)) {
// emit non-connection errors
this.emit('error', err)
} else {
Expand Down
7 changes: 6 additions & 1 deletion src/rest/DataUnionEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ async function transportSignatures(client: StreamrClient, messageHash: Todo, opt

// template for withdraw functions
// client could be replaced with AMB (mainnet and sidechain)
async function untilWithdrawIsComplete(client: StreamrClient, getWithdrawTxFunc: (options: DataUnionOptions) => Todo, getBalanceFunc: (options: DataUnionOptions) => Todo, options: DataUnionOptions = {}) {
async function untilWithdrawIsComplete(
client: StreamrClient,
getWithdrawTxFunc: (options: DataUnionOptions) => Todo,
getBalanceFunc: (options: DataUnionOptions) => Todo,
options: DataUnionOptions = {}
) {
const {
pollingIntervalMs = 1000,
retryTimeoutMs = 60000,
Expand Down
9 changes: 5 additions & 4 deletions src/rest/StreamEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ export class StreamEndpoints {
streamPartition,
count,
})
const query = {
count,
}

const url = getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last') + `?${qs.stringify(query)}`
const url = (
getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last')
+ `?${qs.stringify({ count })}`
)

const json = await authFetch(url, this.client.session)
return json
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/Encryption.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { MessageLayer } from 'streamr-client-protocol'

import { uuid } from '../utils'

class UnableToDecryptError extends Error {
export class UnableToDecryptError extends Error {
constructor(message = '', streamMessage) {
super(`Unable to decrypt. ${message} ${util.inspect(streamMessage)}`)
this.streamMessage = streamMessage
Expand Down
17 changes: 13 additions & 4 deletions src/subscribe/Decrypt.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MessageLayer } from 'streamr-client-protocol'

import PushQueue from '../utils/PushQueue'
import EncryptionUtil from '../stream/Encryption'
import EncryptionUtil, { UnableToDecryptError } from '../stream/Encryption'
import { SubscriberKeyExchange } from '../stream/KeyExchange'

const { StreamMessage } = MessageLayer
Expand All @@ -26,7 +26,7 @@ export default function Decrypt(client, options = {}) {
}
})

async function* decrypt(src) {
async function* decrypt(src, onError = async (err) => { throw err }) {
yield* PushQueue.transform(src, async (streamMessage) => {
if (!streamMessage.groupKeyId) {
return streamMessage
Expand All @@ -36,8 +36,17 @@ export default function Decrypt(client, options = {}) {
return streamMessage
}

const groupKey = await requestKey(streamMessage)
await EncryptionUtil.decryptStreamMessage(streamMessage, groupKey)
try {
const groupKey = await requestKey(streamMessage)
if (!groupKey) {
throw new UnableToDecryptError(`Group key not found: ${streamMessage.groupKeyId}`, streamMessage)
}
await EncryptionUtil.decryptStreamMessage(streamMessage, groupKey)
return streamMessage
} catch (err) {
await onError(err, streamMessage)
}

return streamMessage
})
}
Expand Down
48 changes: 38 additions & 10 deletions src/subscribe/OrderMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ import resendStream from './resendStream'

const { OrderingUtil } = Utils

let ID = 0

/**
* Wraps OrderingUtil into a pipeline.
* Implements gap filling
*/

export default function OrderMessages(client, options = {}) {
const { gapFillTimeout, retryResendAfter, gapFill = true } = client.options
const { streamId, streamPartition } = validateOptions(options)
const { gapFillTimeout, retryResendAfter, maxGapRequests } = client.options
const { streamId, streamPartition, gapFill = true } = validateOptions(options)
const debug = client.debug.extend(`OrderMessages::${ID}`)
ID += 1

const outStream = new PushQueue() // output buffer
// output buffer
const outStream = new PushQueue([], {
autoEnd: false,
})

let done = false
const resendStreams = new Set() // holds outstanding resends for cleanup
Expand All @@ -26,11 +33,10 @@ export default function OrderMessages(client, options = {}) {
if (!outStream.isWritable() || done) {
return
}

outStream.push(orderedMessage)
}, async (from, to, publisherId, msgChainId) => {
if (done || !gapFill) { return }
client.debug('gap %o', {
debug('gap %o', {
streamId, streamPartition, publisherId, msgChainId, from, to,
})

Expand All @@ -53,22 +59,44 @@ export default function OrderMessages(client, options = {}) {
resendStreams.delete(resendMessageStream)
await resendMessageStream.cancel()
}
}, gapFillTimeout, retryResendAfter)
}, gapFillTimeout, retryResendAfter, gapFill ? maxGapRequests : 0)

const markMessageExplicitly = orderingUtil.markMessageExplicitly.bind(orderingUtil)

let inputClosed = false

function maybeClose() {
// we can close when:
// input has closed (i.e. all messages sent)
// AND
// no gaps are pending
// AND
// gaps have been filled or failed
// NOTE ordering util cannot have gaps if queue is empty
if (inputClosed && orderingUtil.isEmpty()) {
outStream.end()
}
}

orderingUtil.on('drain', () => {
maybeClose()
})

orderingUtil.on('error', () => {
// TODO: handle gapfill errors without closing stream or logging
maybeClose() // probably noop
})

return Object.assign(pipeline([
// eslint-disable-next-line require-yield
async function* WriteToOrderingUtil(src) {
for await (const msg of src) {
if (!gapFill) {
orderingUtil.markMessageExplicitly(msg)
}

orderingUtil.add(msg)
// note no yield
// orderingUtil writes to outStream itself
}
inputClosed = true
maybeClose()
},
outStream, // consumer gets outStream
], async (err) => {
Expand Down
Loading