diff --git a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts index f2216ceb0..93dfe9f04 100644 --- a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts +++ b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts @@ -108,4 +108,36 @@ describe('trustless-gateway sessions', () => { await expect(session.retrieve(cid)).to.eventually.deep.equal(block) expect(queryProviderSpy.callCount).to.equal(1) }) + + it('should ignore duplicate providers when unable to retrieve a block', async () => { + const session = createTrustlessGatewaySession(components, { + allowInsecure: true, + allowLocal: true + }) + + // changed the CID to end in `aa` instead of `aq` + const cid = CID.parse('bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aa') + + const queryProviderSpy = Sinon.spy(session, 'queryProvider') + const findNewProvidersSpy = Sinon.spy(session, 'findNewProviders') + const hasProviderSpy = Sinon.spy(session, 'hasProvider') + + const prov = { + id: await createEd25519PeerId(), + multiaddrs: [ + uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') + ] + } + + components.routing.findProviders.callsFake(async function * () { + yield prov + }) + + await expect(session.retrieve(cid)).to.eventually.be.rejected() + expect(hasProviderSpy.callCount).to.be.greaterThanOrEqual(2) + expect(hasProviderSpy.getCall(0).returnValue).to.be.false() + expect(hasProviderSpy.getCall(1).returnValue).to.be.true() + expect(findNewProvidersSpy.callCount).to.be.greaterThanOrEqual(2) + expect(queryProviderSpy.callCount).to.equal(1) + }) }) diff --git a/packages/utils/src/abstract-session.ts b/packages/utils/src/abstract-session.ts index 41718411b..4cf9d809c 100644 --- a/packages/utils/src/abstract-session.ts +++ b/packages/utils/src/abstract-session.ts @@ -32,6 +32,8 @@ export abstract class AbstractSession + queryProviderQueue: Queue constructor (components: AbstractSessionComponents, init: AbstractCreateSessionOptions) { super() @@ -45,6 +47,12 @@ export abstract class AbstractSession = {}): Promise { @@ -56,10 +64,18 @@ export abstract class AbstractSession = pDefer() this.requests.set(cidStr, deferred.promise) + const peerAddedToSessionListener = (event: CustomEvent): void => { + this.log('peer added to session...') + this.addQueryProviderJob(cid, event.detail, options) + } + + // add new session peers to query as they are discovered + this.addEventListener('provider', peerAddedToSessionListener) + if (this.providers.length === 0) { let first = false @@ -74,106 +90,127 @@ export abstract class AbstractSession { + this.log('querying existing provider %o', this.toEvictionKey(provider)) + return this.addQueryProviderJob(cid, provider, options) + })) } - let foundBlock = false + let findProvidersErrored = false + this.findProviderQueue.addEventListener('failure', (evt) => { + this.log.error('error finding new providers for %c', cid, evt.detail.error) - // this queue manages outgoing requests - as new peers are added to the - // session they will be added to the queue so we can request the current - // block from multiple peers as they are discovered - const queue = new Queue({ - concurrency: this.maxProviders + findProvidersErrored = true + if (foundBlock) { + // we found the block, so we can ignore this error + return + } + if (['ERR_INSUFFICIENT_PROVIDERS_FOUND'].includes((evt.detail.error as CodeError).code)) { + this.log.error('insufficient providers found for %c', cid) + if (this.queryProviderQueue.running === 0) { + // only reject if we're not currently querying any providers + deferred.reject(evt.detail.error) + } + } }) - queue.addEventListener('error', () => {}) - queue.addEventListener('failure', (evt) => { + + this.findProviderQueue.addEventListener('idle', () => { + this.log.trace('findProviderQueue idle') + if (options.signal?.aborted === true && !foundBlock) { + deferred.reject(new CodeError(options.signal.reason, 'ABORT_ERR')) + return + } + + if (foundBlock || findProvidersErrored || options.signal?.aborted === true) { + return + } + // continuously find new providers while we haven't found the block and signal is not aborted + this.addFindProviderJob(cid, options) + }) + + this.queryProviderQueue.addEventListener('failure', (evt) => { this.log.error('error querying provider %o, evicting from session', evt.detail.job.options.provider, evt.detail.error) this.evict(evt.detail.job.options.provider) }) - queue.addEventListener('success', (evt) => { - // peer has sent block, return it to the caller + + this.queryProviderQueue.addEventListener('success', (evt) => { + this.log.trace('queryProviderQueue success') foundBlock = true deferred.resolve(evt.detail.result) }) - queue.addEventListener('idle', () => { - if (foundBlock || options.signal?.aborted === true) { - // we either found the block or the user gave up + + this.queryProviderQueue.addEventListener('idle', () => { + this.log.trace('queryProviderQueue is idle') + if (foundBlock) { return } - - // find more session peers and retry - Promise.resolve() - .then(async () => { - this.log('no session peers had block for for %c, finding new providers', cid) - - // evict this.minProviders random providers to make room for more - for (let i = 0; i < this.minProviders; i++) { - if (this.providers.length === 0) { - break - } - - const provider = this.providers[Math.floor(Math.random() * this.providers.length)] - this.evict(provider) - } - - // find new providers for the CID - await this.findProviders(cid, this.minProviders, options) - - // keep trying until the abort signal fires - this.log('found new providers re-retrieving %c', cid) - this.requests.delete(cidStr) - deferred.resolve(await this.retrieve(cid, options)) - }) - .catch(err => { - this.log.error('could not find new providers for %c', cid, err) - deferred.reject(err) - }) + if (options.signal?.aborted === true) { + // if the signal was aborted, we should reject the request + deferred.reject(options.signal.reason) + return + } + // we're done querying found providers.. if we can't find new providers, we should reject + if (findProvidersErrored) { + deferred.reject(new CodeError('Done querying all found providers and unable to retrieve the block', 'ERR_NO_PROVIDERS_HAD_BLOCK')) + return + } + // otherwise, we're still waiting for more providers to query + this.log('waiting for more providers to query') + // if this.findProviders is not running, start it + if (this.findProviderQueue.running === 0) { + this.addFindProviderJob(cid, options) + } }) - const peerAddedToSessionListener = (event: CustomEvent): void => { - queue.add(async () => { - return this.queryProvider(cid, event.detail, options) - }, { - provider: event.detail - }) - .catch(err => { - if (options.signal?.aborted === true) { - // skip logging error if signal was aborted because abort can happen - // on success (e.g. another session found the block) - return - } - - this.log.error('error retrieving session block for %c', cid, err) - }) + try { + return await deferred.promise + } finally { + this.log.trace('retrieve finished, cleaning up session') + this.removeEventListener('provider', peerAddedToSessionListener) + this.findProviderQueue.clear() + this.queryProviderQueue.clear() + this.requests.delete(cidStr) } + } - // add new session peers to query as they are discovered - this.addEventListener('provider', peerAddedToSessionListener) - - // query each session peer directly - Promise.all([...this.providers].map(async (provider) => { - return queue.add(async () => { - return this.queryProvider(cid, provider, options) - }, { - provider - }) - })) + addFindProviderJob (cid: CID, options: AbortOptions): any { + return this.findProviderQueue.add(async () => { + await this.findProviders(cid, this.minProviders, options) + }, { signal: options.signal }) .catch(err => { if (options.signal?.aborted === true) { // skip logging error if signal was aborted because abort can happen // on success (e.g. another session found the block) return } - - this.log.error('error retrieving session block for %c', cid, err) + this.log.error('could not find new providers for %c', cid, err) }) + } - try { - return await deferred.promise - } finally { - this.removeEventListener('provider', peerAddedToSessionListener) - queue.clear() - this.requests.delete(cidStr) - } + addQueryProviderJob (cid: CID, provider: Provider, options: AbortOptions): any { + return this.queryProviderQueue.add(async () => { + return this.queryProvider(cid, provider, options) + }, { + provider, + signal: options.signal + }).catch(err => { + if (options.signal?.aborted === true) { + // skip logging error if signal was aborted because abort can happen + // on success (e.g. another session found the block) + return + } + this.log.error('error retrieving session block for %c', cid, err) + }) } evict (provider: Provider): void { @@ -181,6 +218,7 @@ export abstract class AbstractSession this.equals(prov, provider)) if (index === -1) { + this.log.trace('tried to evict provider, but it was not in this.providers') return } @@ -193,7 +231,7 @@ export abstract class AbstractSession this.equals(prov, provider)) != null) { + if (this.providers.some(prov => this.equals(prov, provider))) { return true } @@ -205,6 +243,11 @@ export abstract class AbstractSession { const deferred: DeferredPromise = pDefer() let found = 0 @@ -216,15 +259,18 @@ export abstract class AbstractSession { await expect(session.retrieve(cid)).to.eventually.be.rejected() - expect(session.findNewProviders).to.have.property('callCount', 2) + expect(session.findNewProviders).to.have.property('callCount', 3) expect(session.queryProvider).to.have.property('callCount', 1) }) })