Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8cf5ef7
Set up client.ethereum before setting up client.publisher, publisher …
timoxley Feb 23, 2021
7f9a204
Use : as id component separator. Avoid conflicts with tools that supp…
timoxley Feb 23, 2021
35bc409
Pass value through GroupKey.from before adding/returning from store.
timoxley Mar 11, 2021
a396a93
Fix Session tests. Add client.canEncrypt(). Check can encrypt before …
timoxley Mar 11, 2021
41d8eb7
Tweak error checking in validation test.
timoxley Feb 25, 2021
b3103e7
Add default onFinally handler that throws on error.
timoxley Feb 25, 2021
639d7e9
Convert stream/PushQueue.js -> stream/PushQueue.ts.
timoxley Feb 26, 2021
81b6fba
Add own cancellation to PushQueue.
timoxley Feb 26, 2021
14e0a57
Convert stream/Encryption.js -> stream/Encryption.ts.
timoxley Feb 26, 2021
a98cf41
WIP fix error propagation & caching.
timoxley Mar 3, 2021
424e4bb
Add tests around iteratorFinally & throw.
timoxley Mar 3, 2021
71de1e4
Test errors in pipeline.
timoxley Mar 3, 2021
8dff646
Share generic iterator tests with PushQueue.
timoxley Jan 11, 2021
eb52fc1
Use WeakMap for generator end caching.
timoxley Jan 11, 2021
20bef0d
Clean up async in iterator/PushQueue tests.
timoxley Jan 11, 2021
a8a8783
Add more iterator tests around error handling.
timoxley Jan 11, 2021
aa327ca
WIP encryption/subscription error handling.
timoxley Mar 4, 2021
4272c90
Cleaned up, but still WIP encryption/subscription error handling.
timoxley Mar 4, 2021
782acda
Further cleaning on WIP encryption/subscription error handling.
timoxley Mar 4, 2021
8131cd7
Tracking down unhandled error. Decrypt in series.
timoxley Mar 5, 2021
ebbac17
Test multiple failures in Subscription pipeline.
timoxley Mar 5, 2021
93bc9be
Allow mocking sub/unsub calls in Subscription.
timoxley Mar 10, 2021
644cc0f
Add MessagePipeline.test.js.
timoxley Mar 8, 2021
630cf66
Get better stack traces & errors out of Scaffold.
timoxley Mar 10, 2021
4b50aef
Convert CancelableGenerator to return object with cancel function.
timoxley Mar 10, 2021
96ab8ac
Don't ignore PushQueue throw until finished.
timoxley Mar 10, 2021
ed4a732
Tweak iterators.
timoxley Mar 10, 2021
76e3d9c
Fix unhandled rejection in keyexchange. Improve subscription error ha…
timoxley Mar 10, 2021
7b8c6cb
Fix AggregatedError stack not including self.
timoxley Mar 11, 2021
48f9abe
Fix missing optional id in Config.
timoxley Mar 11, 2021
9245a52
Fix blocked pipeline on error.
timoxley Mar 11, 2021
468ab5d
Fix incorrect canEncrypt() result.
timoxley Mar 11, 2021
9ae1578
Linting.
timoxley Mar 11, 2021
0a98488
Prevent suppressing of rethrown error from subscription pipeline onEr…
timoxley Mar 12, 2021
1841741
fix(subscribe): Clean up type errors.
timoxley Mar 16, 2021
47e2c81
style(config): Clean up whitespace.
timoxley Mar 16, 2021
41fe830
test: Fix SubscriberResends uncontrolled setTimeout.
timoxley Mar 16, 2021
6ee58f9
test(subscribe, iterators): Fix error handling.
timoxley Mar 17, 2021
ce637a5
test(iterators): Fix error handling.
timoxley Mar 17, 2021
20825eb
fix(encryption): Give GroupKey same interface as protocol's Encrypted…
timoxley Mar 18, 2021
9f0dbb1
types(test): Fix setTimeout typing in SubscriberResends tests.
timoxley Mar 19, 2021
21cf74a
build(client-testing): Retry client-testing once, reduce timeout from…
timoxley Mar 19, 2021
e11ebd5
style: Add comments, tidy types.
timoxley Mar 19, 2021
cb576fd
fix(keyexchange): Remove bad call to cancelTask.
timoxley Mar 22, 2021
9bd0bae
refactor(encryption): Remove static validateGroupKey from Encryption …
timoxley Mar 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/test-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ jobs:
npm install
npm link streamr-client
./gradlew fatjar
- name: run-client-testing
timeout-minutes: 10
- uses: nick-invision/retry@v2
name: run-client-testing
working-directory: streamr-client-testing
run: java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES
with:
max_attempts: 2
timeout_minutes: 3
retry_on: error
command: java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES
4 changes: 3 additions & 1 deletion src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export type EthereumConfig = ExternalProvider|JsonRpcFetchFunc
* @category Important
*/
export type StrictStreamrClientOptions = {
/** Custom human-readable debug id for client. Used in logging. Unique id will be generated regardless. */
id?: string,
/**
* Authentication: identity used by this StreamrClient instance.
* Can contain member privateKey or (window.)ethereum
Expand Down Expand Up @@ -169,8 +171,8 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
...opts.dataUnion
},
cache: {
...opts.cache,
...STREAM_CLIENT_DEFAULTS.cache,
...opts.cache,
}
// NOTE: sidechain is not merged with the defaults
}
Expand Down
8 changes: 6 additions & 2 deletions src/Ethereum.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export default class StreamrEthereum {
const key = auth.privateKey
const address = getAddress(computeAddress(key))
this._getAddress = async () => address
this.getSigner = () => new Wallet(key, this.getMainnetProvider())
this.getSidechainSigner = async () => new Wallet(key, this.getSidechainProvider())
this._getSigner = () => new Wallet(key, this.getMainnetProvider())
this._getSidechainSigner = async () => new Wallet(key, this.getSidechainProvider())
} else if (auth.ethereum) {
this._getAddress = async () => {
try {
Expand Down Expand Up @@ -61,6 +61,10 @@ export default class StreamrEthereum {
}
}

canEncrypt() {
return !!(this._getAddress && this._getSigner)
}

async getAddress() {
if (!this._getAddress) {
// _getAddress is assigned in constructor
Expand Down
17 changes: 14 additions & 3 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
// TODO annotate connection parameter as internal parameter if possible?
constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) {
super()
this.id = counterId(`${this.constructor.name}:${uid}`)
this.id = counterId(`${this.constructor.name}:${uid}${options.id || ''}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the use provides the id, it is maybe unique enough to distinguish the instances? This counterId(${this.constructor.name}:${uid}) could be just a fallback if no explicit id is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, though the wrapper around the id here was added to ensure I get unique results across tests, but I can give the tests known "ids":
e.g.

const publisher = new Client({id: 'publisher'})
const subscriber = new Client({id: 'subscriber'})

Doesn't matter so much as this is 100% for debugging/logging purposes currently, but maybe id isn't the right option property, perhaps it should be name or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, name would describe it better 👍

There is the same situation in Subscription class (line 53)

this.debug = Debug(this.id)

this.options = Config(options)
Expand Down Expand Up @@ -217,9 +217,9 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
.on('disconnected', this.onConnectionDisconnected)
.on('error', this.onConnectionError)

this.ethereum = new StreamrEthereum(this)
this.publisher = Publisher(this)
this.subscriber = new Subscriber(this)
this.ethereum = new StreamrEthereum(this)

Plugin(this, new StreamEndpoints(this))
Plugin(this, new LoginEndpoints(this))
Expand Down Expand Up @@ -357,10 +357,14 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
let subTask: Todo
let sub: Todo
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
const onEnd = () => {
const onEnd = (err?: Error) => {
if (sub && typeof onMessage === 'function') {
sub.off('message', onMessage)
}

if (err) {
throw err
}
}

if (hasResend) {
Expand Down Expand Up @@ -429,6 +433,13 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
return this.getAddress()
}

/**
* True if authenticated with private key/ethereum provider
*/
canEncrypt() {
return this.ethereum.canEncrypt()
}

/**
* Get token balance in "wei" (10^-18 parts) for given address
*/
Expand Down
36 changes: 26 additions & 10 deletions src/publish/Encrypt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,27 @@ const { StreamMessage } = MessageLayer
type PublisherKeyExhangeAPI = ReturnType<typeof PublisherKeyExhange>

export default function Encrypt(client: StreamrClient) {
const publisherKeyExchange = PublisherKeyExhange(client, {
groupKeys: {
...client.options.groupKeys,
let publisherKeyExchange: ReturnType<typeof PublisherKeyExhange>

function getPublisherKeyExchange() {
if (!publisherKeyExchange) {
publisherKeyExchange = PublisherKeyExhange(client, {
groupKeys: {
...client.options.groupKeys,
}
})
}
})
return publisherKeyExchange
}

async function encrypt(streamMessage: MessageLayer.StreamMessage, stream: Stream) {
if (!client.canEncrypt()) {
return
}

if (
!publisherKeyExchange.hasAnyGroupKey(stream.id)
&& !stream.requireEncryptedData
!stream.requireEncryptedData
&& !getPublisherKeyExchange().hasAnyGroupKey(stream.id)
) {
// not needed
return
Expand All @@ -27,19 +39,23 @@ export default function Encrypt(client: StreamrClient) {
if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.MESSAGE) {
return
}
const groupKey = await publisherKeyExchange.useGroupKey(stream.id)
const groupKey = await getPublisherKeyExchange().useGroupKey(stream.id)
await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey)
}

return Object.assign(encrypt, {
setNextGroupKey(...args: Parameters<PublisherKeyExhangeAPI['setNextGroupKey']>) {
return publisherKeyExchange.setNextGroupKey(...args)
return getPublisherKeyExchange().setNextGroupKey(...args)
},
rotateGroupKey(...args: Parameters<PublisherKeyExhangeAPI['rotateGroupKey']>) {
return publisherKeyExchange.rotateGroupKey(...args)
return getPublisherKeyExchange().rotateGroupKey(...args)
},
start() {
return getPublisherKeyExchange().start()
},
stop() {
return publisherKeyExchange.stop()
if (!publisherKeyExchange) { return Promise.resolve() }
return getPublisherKeyExchange().stop()
}
})
}
7 changes: 7 additions & 0 deletions src/publish/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ function getCreateStreamMessage(client) {
[streamId, streamPartition, publisherId, msgChainId].join('|')
),
...cacheOptions,
maxAge: undefined
}), {
clear() {
mem.clear(getMsgChainer)
Expand Down Expand Up @@ -188,6 +189,9 @@ function getCreateStreamMessage(client) {
rotateGroupKey(maybeStreamId) {
return encrypt.rotateGroupKey(maybeStreamId)
},
startKeyExchange() {
return encrypt.start()
},
clear() {
computeStreamPartition.clear()
getMsgChainer.clear()
Expand Down Expand Up @@ -300,6 +304,9 @@ export default function Publisher(client) {
throw error
}
},
async startKeyExchange() {
return createStreamMessage.startKeyExchange()
},
async stop() {
sendQueue.clear()
createStreamMessage.clear()
Expand Down
Loading