Skip to content

Commit

Permalink
feat(useAsyncQueue): add options.signal parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
chirokas committed Apr 27, 2023
1 parent 81bcaa9 commit 986338d
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 40 deletions.
52 changes: 40 additions & 12 deletions packages/core/useAsyncQueue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,17 @@ describe('useAsyncQueue', () => {
}

it('should return the tasks result', async () => {
const {
activeIndex,
result,
} = useAsyncQueue([p1, p2, p3])
const { activeIndex, result } = useAsyncQueue([p1, p2, p3])
await retry(() => {
expect(activeIndex.value).toBe(2)
expect(JSON.stringify(result)).toBe('[{"state":"fulfilled","data":1000},{"state":"fulfilled","data":2000},{"state":"fulfilled","data":3000}]')
expect(JSON.stringify(result)).toBe(
'[{"state":"fulfilled","data":1000},{"state":"fulfilled","data":2000},{"state":"fulfilled","data":3000}]',
)
})
})

it('should passed the current task result to the next task', async () => {
const {
activeIndex,
result,
} = useAsyncQueue([p1, p2])
const { activeIndex, result } = useAsyncQueue([p1, p2])
await retry(() => {
expect(activeIndex.value).toBe(1)
expect(result[activeIndex.value].data).toBe(2000)
Expand All @@ -68,7 +64,7 @@ describe('useAsyncQueue', () => {
})
})

it ('should trigger onError when the tasks fails', async () => {
it('should trigger onError when the tasks fails', async () => {
const onErrorSpy = vi.fn()
const { activeIndex } = useAsyncQueue([p3, pError], {
onError: onErrorSpy,
Expand All @@ -79,7 +75,7 @@ describe('useAsyncQueue', () => {
})
})

it ('should interrupt the tasks when current task fails', async () => {
it('should interrupt the tasks when current task fails', async () => {
const finalTaskSpy = vi.fn(() => Promise.resolve('data'))
const onFinishedSpy = vi.fn()
useAsyncQueue([p1, pError, finalTaskSpy], {
Expand All @@ -92,7 +88,7 @@ describe('useAsyncQueue', () => {
})
})

it ('should not interrupt the tasks when current task fails', async () => {
it('should not interrupt the tasks when current task fails', async () => {
const finalTaskSpy = vi.fn(() => Promise.resolve('data'))
const onFinishedSpy = vi.fn()
useAsyncQueue([p1, pError, finalTaskSpy], {
Expand All @@ -104,4 +100,36 @@ describe('useAsyncQueue', () => {
expect(finalTaskSpy).toHaveBeenCalledOnce()
})
})

it('should cancel the tasks', async () => {
const controller = new AbortController()
const { activeIndex, result } = useAsyncQueue([p1], {
signal: controller.signal,
})
controller.abort()
await retry(() => {
expect(activeIndex.value).toBe(0)
expect(result).toHaveLength(1)
expect(result[activeIndex.value]).toMatchInlineSnapshot(`
{
"data": [AbortError: aborted],
"state": "canceled",
}
`)
})
})

it('should abort the tasks when AbortSignal.abort is triggered', async () => {
const controller = new AbortController()
const finalTaskSpy = vi.fn(() => Promise.resolve('data'))
const abort = () => controller.abort()
const { activeIndex, result } = useAsyncQueue([p1, abort, finalTaskSpy], {
signal: controller.signal,
})
await retry(() => {
expect(activeIndex.value).toBe(2)
expect(result).toHaveLength(3)
expect(finalTaskSpy).not.toHaveBeenCalled()
})
})
})
165 changes: 137 additions & 28 deletions packages/core/useAsyncQueue/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { noop } from '@vueuse/shared'
import type { Ref } from 'vue-demi'
import { reactive, ref } from 'vue-demi'
import { noop } from '@vueuse/shared'

export type UseAsyncQueueTask<T> = (...args: any[]) => T | Promise<T>
export type UseAsyncQueueTask<T> = (...args: any[]) => Promise<T> | T

export interface UseAsyncQueueResult<T> {
state: 'pending' | 'fulfilled' | 'rejected'
state: 'canceled' | 'fulfilled' | 'pending' | 'rejected'
data: T | null
}

Expand Down Expand Up @@ -33,6 +33,11 @@ export interface UseAsyncQueueOptions {
*
*/
onFinished?: () => void

/**
* A AbortSignal that can be used to abort the task.
*/
signal?: AbortSignal
}

/**
Expand All @@ -42,25 +47,90 @@ export interface UseAsyncQueueOptions {
* @param tasks
* @param options
*/
export function useAsyncQueue<T1>(tasks: [UseAsyncQueueTask<T1>], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>]>
export function useAsyncQueue<T1, T2>(tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>]>
export function useAsyncQueue<T1, T2, T3>(tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>, UseAsyncQueueTask<T3>], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>, UseAsyncQueueResult<T3>]>
export function useAsyncQueue<T1, T2, T3, T4>(tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>, UseAsyncQueueTask<T3>, UseAsyncQueueTask<T4>], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>, UseAsyncQueueResult<T3>, UseAsyncQueueResult<T4>]>
export function useAsyncQueue<T1, T2, T3, T4, T5>(tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>, UseAsyncQueueTask<T3>, UseAsyncQueueTask<T4>, UseAsyncQueueTask<T5>], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>, UseAsyncQueueResult<T3>, UseAsyncQueueResult<T4>, UseAsyncQueueResult<T5>]>
export function useAsyncQueue<T>(tasks: UseAsyncQueueTask<T>[], options?: UseAsyncQueueOptions): UseAsyncQueueReturn<UseAsyncQueueResult<T>[]>
export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options: UseAsyncQueueOptions = {}): UseAsyncQueueReturn<UseAsyncQueueResult<T>[]> {
export function useAsyncQueue<T1>(
tasks: [UseAsyncQueueTask<T1>],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>]>

export function useAsyncQueue<T1, T2>(
tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>]>

export function useAsyncQueue<T1, T2, T3>(
tasks: [UseAsyncQueueTask<T1>, UseAsyncQueueTask<T2>, UseAsyncQueueTask<T3>],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<
[UseAsyncQueueResult<T1>, UseAsyncQueueResult<T2>, UseAsyncQueueResult<T3>]
>

export function useAsyncQueue<T1, T2, T3, T4>(
tasks: [
UseAsyncQueueTask<T1>,
UseAsyncQueueTask<T2>,
UseAsyncQueueTask<T3>,
UseAsyncQueueTask<T4>,
],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<
[
UseAsyncQueueResult<T1>,
UseAsyncQueueResult<T2>,
UseAsyncQueueResult<T3>,
UseAsyncQueueResult<T4>,
]
>

export function useAsyncQueue<T1, T2, T3, T4, T5>(
tasks: [
UseAsyncQueueTask<T1>,
UseAsyncQueueTask<T2>,
UseAsyncQueueTask<T3>,
UseAsyncQueueTask<T4>,
UseAsyncQueueTask<T5>,
],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<
[
UseAsyncQueueResult<T1>,
UseAsyncQueueResult<T2>,
UseAsyncQueueResult<T3>,
UseAsyncQueueResult<T4>,
UseAsyncQueueResult<T5>,
]
>

export function useAsyncQueue<T>(
tasks: UseAsyncQueueTask<T>[],
options?: UseAsyncQueueOptions
): UseAsyncQueueReturn<UseAsyncQueueResult<T>[]>

export function useAsyncQueue<T = any>(
tasks: UseAsyncQueueTask<any>[],
options: UseAsyncQueueOptions = {},
): UseAsyncQueueReturn<UseAsyncQueueResult<T>[]> {
const {
interrupt = true,
onError = noop,
onFinished = noop,
signal,
} = options

const promiseState: Record<UseAsyncQueueResult<T>['state'], UseAsyncQueueResult<T>['state']> = {
const promiseState: Record<
UseAsyncQueueResult<T>['state'],
UseAsyncQueueResult<T>['state']
> = {
canceled: 'canceled',
fulfilled: 'fulfilled',
pending: 'pending',
rejected: 'rejected',
fulfilled: 'fulfilled',
}
const initialResult = Array.from(new Array(tasks.length), () => ({ state: promiseState.pending, data: null }))

const initialResult = Array.from(new Array(tasks.length), () => ({
state: promiseState.pending,
data: null,
}))

const result = reactive(initialResult) as UseAsyncQueueResult<T>[]

const activeIndex = ref<number>(-1)
Expand All @@ -80,26 +150,65 @@ export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options:
}

tasks.reduce((prev, curr) => {
return prev.then((prevRes) => {
if (result[activeIndex.value]?.state === promiseState.rejected && interrupt) {
onFinished()
return
}

return curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
return prev
.then((prevRes) => {
if (signal?.aborted) {
updateResult(promiseState.canceled, new AbortError())
return
}

if (
result[activeIndex.value]?.state === promiseState.rejected
&& interrupt
) {
onFinished()
return
}

const done = curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
})

if (!signal)
return done

return Promise.race([done, whenAborted(signal)])
})
.catch((e) => {
if (signal?.aborted) {
updateResult(promiseState.canceled, e)
return e
}

updateResult(promiseState.rejected, e)
onError()
return e
})
}).catch((e) => {
updateResult(promiseState.rejected, e)
onError()
return e
})
}, Promise.resolve())

return {
activeIndex,
result,
}
}

class AbortError extends Error {
code = 'ABORT_ERR'
name = 'AbortError'
constructor(message = 'aborted', options?: ErrorOptions) {
super(message, options)
}
}

function whenAborted(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
const error = new AbortError()

if (signal.aborted)
reject(error)
else
signal.addEventListener('abort', () => reject(error), { once: true })
})
}

0 comments on commit 986338d

Please sign in to comment.