Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Conversation

@timoxley
Copy link
Contributor

@timoxley timoxley commented Sep 12, 2020

Changes subscribe/unsubscribe to resolve when the subscribed/unsubscribed events fire respectively.

Simplifies consumer API significantly:

After

const subscription = await client.subscribe('stream1', onMessage)
// subscribed
await client.unsubscribe(subscription)
// unsubscribed

Before

const subscription = client.subscribe('stream1')
subscription.once('subscribed', () => {
  // subscribed
  subscription.once('unsubscribed', () => {
    // unsubscribed
  })
  client.unsubscribe(subscription)
})

I've got something similar as a WIP for resends.

Also should make the error handling story a bit clearer, since subscribe/unsubscribe could throw synchronously or maybe emit an error event.

e.g. to catch all possible errors with the existing streamr-client, you basically have to do something like the below,

client.on('error', onError)
let subscription
try {
    subscription = client.subscribe('stream1', onMessage)
    subscription.on('error', onError)
    subscription.once('subscribed', () => {
        // subscribed
        subscription.once('unsubscribed', () => {
            // unsubscribed
        })
        try {
            client.unsubscribe(subscription)
        } catch (err) {
            onError(err)
        }
    })
} catch (err) {
    onError(err)
}

And it gets even more hairy if you want to robustly detect which call was in progress when the error was thrown/emitted:

let subscription
try {
    subscription = client.subscribe('stream1', onMessage)
} catch (err) {
    onSubscribeError(err) // catch sync errors
}

if (subscription) {
    let onSubscribed

    const onSubscribeErrorAsync = (err) => {
        subscription.off('subscribed', onSubscribed) // remember to remove the success handler if you have an error
        onUnsubscribeError(err)
    }

    onSubscribed = () => {
        subscription.off('error', onSubscribeErrorAsync) // stop catching async errors after success
        // subscribed

        let success = false
        try {
            client.unsubscribe(subscription)
            success = true
        } catch (err) {
            onUnsubscribeError(err)
        }

        // need to detect sync error above so we don't attach spurrious listeners
        if (success) {
            subscription.once('error', onSubscribeErrorAsync) // catch async errors
            let onUnsubscribed
            // same err pattern for unsubscribed
            const onUnsubscribeErrorAsync = (err) => {
                subscription.off('unsubscribed', onUnsubscribed)
                onUnsubscribeError(err)
            }

            onUnsubscribed = () => {
                subscription.off('error', onUnsubscribeErrorAsync)
                // unsubscribed
            }
            subscription.once('unsubscribed', onUnsubscribed)
            subscription.once('error', onUnsubscribeError)
        }
    }

    subscription.once('subscribed', onSubscribed)
}

With these new changes, the above pattern is equivalent to:

const subscription = await client.subscribe('stream1', onMessage).catch(onSubscribeError)

if (subscription) {
  await client.unsubscribe(subscription).catch(onUnsubscribeError)
}

Also related to error handling, this should probably be updated to leverage the new requestId property so it can more accurately detect & reject errors, currently it always rejects if it sees any error event event on the subscription during a subscribe/unsubscribe, meaning it's not impossible for an unrelated error to cause rejection.

@timoxley timoxley force-pushed the promisify-subscription-methods branch from 2146b5f to b294a03 Compare September 14, 2020 19:16
@timoxley timoxley changed the base branch from master to refactor-connection September 14, 2020 19:22
@timoxley timoxley force-pushed the promisify-subscription-methods branch 4 times, most recently from d009473 to 78b52a7 Compare September 15, 2020 18:40
@timoxley timoxley force-pushed the promisify-subscription-methods branch from 78b52a7 to 2615d55 Compare September 15, 2020 18:45
@timoxley timoxley force-pushed the promisify-subscription-methods branch from 2615d55 to d73804d Compare September 18, 2020 16:58
@timoxley timoxley changed the title Promisify subscribe/unsubscribe methods 3. Promisify subscribe/unsubscribe methods Sep 21, 2020
return Promise.resolve()
}

return Promise.all([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how does this work if we are in the middle of awaiting method subscribe above?

await Promise.all([
            sub.waitForSubscribed(),
            this._resendAndSubscribe(sub),
        ])

this.once('subscribed', onSubscribed)
this.once('unsubscribed', onUnsubscribed)
this.once('error', reject)
}).then(() => this).finally(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this .then(() => this) needed for?

* Refactor & co-locate publish code.

* Avoid using expensive ethers.Wallet.createRandom() calls in test. e.g. 1000x calls with ethers: 14s, randomBytes: 3.5ms.

* Ensure messageCreationUtil is cleaned up after test.

* Fix non-functional MessageCreationUtil test.

* Swap out receptacle for more flexible mem/p-memoize/quick-lru.

* Convert LoginEndpoints test to async/await.

* Remove calls to ensureConnected/ensureDisconnected in test.

* 5. Message Sequencing – Guarantee sequence follows publish order & Prevent backdated messages silently breaking future publishes (#166)

* Improve authFetch logging.

* Update message sequencer to strictly enforce message order.

* Queue publishes per-stream otherwise can skip forward then back even when messages published in correct sequence.

* Add partial solution to broken backdated messages, at least doesn't break regular sequential publishes.

* Tidy up, add some comments.

* Move publish queue fn into utils.
@timoxley timoxley merged commit 05064cc into refactor-connection Nov 24, 2020
@timoxley timoxley deleted the promisify-subscription-methods branch November 24, 2020 15:51
@timoxley timoxley mentioned this pull request Nov 24, 2020
@timoxley timoxley mentioned this pull request Jan 20, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants