Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(useAsyncQueue): add options.signal parameter #3033

Merged
merged 1 commit into from May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions packages/core/useAsyncQueue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,36 @@ describe('useAsyncQueue', () => {
expect(finalTaskSpy).toHaveBeenCalledOnce()
})
})

it('should cancel the tasks', async () => {
const controller = new AbortController()
const { activeIndex, result } = useAsyncQueue([p1], {
signal: controller.signal,
})
controller.abort()
await retry(() => {
expect(activeIndex.value).toBe(0)
expect(result).toHaveLength(1)
expect(result[activeIndex.value]).toMatchInlineSnapshot(`
{
"data": [Error: aborted],
"state": "aborted",
}
`)
})
})

it('should abort the tasks when AbortSignal.abort is triggered', async () => {
const controller = new AbortController()
const abort = () => controller.abort()
const finalTaskSpy = vi.fn(() => Promise.resolve('data'))
const { activeIndex, result } = useAsyncQueue([p1, abort, finalTaskSpy], {
signal: controller.signal,
})
await retry(() => {
expect(activeIndex.value).toBe(2)
expect(result).toHaveLength(3)
expect(finalTaskSpy).not.toHaveBeenCalled()
})
})
})
81 changes: 62 additions & 19 deletions packages/core/useAsyncQueue/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { noop } from '@vueuse/shared'
import type { Ref } from 'vue-demi'
import { reactive, ref } from 'vue-demi'
import { noop } from '@vueuse/shared'

export type UseAsyncQueueTask<T> = (...args: any[]) => T | Promise<T>

export interface UseAsyncQueueResult<T> {
state: 'pending' | 'fulfilled' | 'rejected'
state: 'aborted' | 'fulfilled' | 'pending' | 'rejected'
data: T | null
}

Expand Down Expand Up @@ -33,6 +33,11 @@ export interface UseAsyncQueueOptions {
*
*/
onFinished?: () => void

/**
* A AbortSignal that can be used to abort the task.
*/
signal?: AbortSignal
}

/**
Expand All @@ -53,14 +58,21 @@ export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options:
interrupt = true,
onError = noop,
onFinished = noop,
signal,
} = options

const promiseState: Record<UseAsyncQueueResult<T>['state'], UseAsyncQueueResult<T>['state']> = {
const promiseState: Record<
UseAsyncQueueResult<T>['state'],
UseAsyncQueueResult<T>['state']
> = {
aborted: 'aborted',
fulfilled: 'fulfilled',
pending: 'pending',
rejected: 'rejected',
fulfilled: 'fulfilled',
}

const initialResult = Array.from(new Array(tasks.length), () => ({ state: promiseState.pending, data: null }))

const result = reactive(initialResult) as UseAsyncQueueResult<T>[]

const activeIndex = ref<number>(-1)
Expand All @@ -80,26 +92,57 @@ export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options:
}

tasks.reduce((prev, curr) => {
return prev.then((prevRes) => {
if (result[activeIndex.value]?.state === promiseState.rejected && interrupt) {
onFinished()
return
}

return curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
return prev
.then((prevRes) => {
if (signal?.aborted) {
updateResult(promiseState.aborted, new Error('aborted'))
return
}

if (
result[activeIndex.value]?.state === promiseState.rejected
&& interrupt
) {
onFinished()
return
}

const done = curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
})

if (!signal)
return done

return Promise.race([done, whenAborted(signal)])
})
.catch((e) => {
if (signal?.aborted) {
updateResult(promiseState.aborted, e)
return e
}

updateResult(promiseState.rejected, e)
onError()
return e
})
}).catch((e) => {
updateResult(promiseState.rejected, e)
onError()
return e
})
}, Promise.resolve())

return {
activeIndex,
result,
}
}

function whenAborted(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
const error = new Error('aborted')

if (signal.aborted)
reject(error)
else
signal.addEventListener('abort', () => reject(error), { once: true })
})
}