Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: two explicit queues for session work #538

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
18 changes: 16 additions & 2 deletions packages/utils/src/abstract-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ export abstract class AbstractSession<Provider, RetrieveBlockProgressEvents exte
public readonly providers: Provider[]
private readonly evictionFilter: BloomFilter

/**
* Flag that is set when no new providers are found for a CID. This is used
* when foundBlock is still false, and signal is not aborted, because
* there's no new work we can do.
*/
private noNewProviders: boolean

constructor (components: AbstractSessionComponents, init: AbstractCreateSessionOptions) {
super()

Expand All @@ -45,6 +52,7 @@ export abstract class AbstractSession<Provider, RetrieveBlockProgressEvents exte
this.maxProviders = init.maxProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
this.providers = []
this.evictionFilter = BloomFilter.create(this.maxProviders)
this.noNewProviders = false
}

async retrieve (cid: CID, options: BlockRetrievalOptions<RetrieveBlockProgressEvents> = {}): Promise<Uint8Array> {
Expand Down Expand Up @@ -95,8 +103,9 @@ export abstract class AbstractSession<Provider, RetrieveBlockProgressEvents exte
deferred.resolve(evt.detail.result)
})
queue.addEventListener('idle', () => {
if (foundBlock || options.signal?.aborted === true) {
// we either found the block or the user gave up
if (foundBlock || options.signal?.aborted === true || this.noNewProviders) {
// we either found the block, the user gave up, or cannot find any more providers
this.log('session aborted')
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down Expand Up @@ -246,6 +255,11 @@ export abstract class AbstractSession<Provider, RetrieveBlockProgressEvents exte
}
}

if (found === 0) {
this.noNewProviders = true
throw new CodeError(`No new ${this.name} providers found for ${cid}`, 'ERR_NO_NEW_PROVIDERS_FOUND')
}

this.log('found %d/%d new session peers', found, this.maxProviders)

if (found < count) {
Expand Down
40 changes: 40 additions & 0 deletions packages/utils/test/abstract-session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,44 @@ describe('abstract-session', () => {
})).to.eventually.be.rejected()
.with.property('code', 'ABORT_ERR')
})

it('should not make multiple requests to the only found provider', async function () {
this.timeout(1000)
const session: Session | null = new Session()

const cid = CID.parse('bafybeifaymukvfkyw6xgh4th7tsctiifr4ea2btoznf46y6b2fnvikdczi')
const id = await createEd25519PeerId() // same provider
const providers: SessionPeer[] = [
{
id
},
{
id
}
]

session.findNewProviders.onFirstCall().callsFake(async function * () {
yield providers[0]
})
session.findNewProviders.onSecondCall().callsFake(async function * () {
yield providers[1]
})
// eslint-disable-next-line require-yield
session.findNewProviders.callsFake(async function * () {
yield providers[1]
})

session.queryProvider.callsFake(async () => {
// always fails
throw new Error('Urk!')
})

await expect(session.retrieve(cid)).to.eventually.be.rejected()

expect(session.findNewProviders.callCount).to.equal(4)
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

expect(session.providers).to.have.lengthOf(0)

expect(session.providers.includes(providers[0])).to.be.false()
})
})
Loading