Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement auto fork joining #3407

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions packages/toolkit/src/listenerMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
TaskResult,
AbortSignalWithReason,
UnsubscribeListenerOptions,
ForkOptions,
} from './types'
import {
abortControllerWithReason,
Expand Down Expand Up @@ -78,13 +79,19 @@ const INTERNAL_NIL_TOKEN = {} as const

const alm = 'listenerMiddleware' as const

const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
const createFork = (
parentAbortSignal: AbortSignalWithReason<unknown>,
parentBlockingPromises: Promise<any>[]
) => {
const linkControllers = (controller: AbortController) =>
addAbortSignalListener(parentAbortSignal, () =>
abortControllerWithReason(controller, parentAbortSignal.reason)
)

return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
return <T>(
taskExecutor: ForkedTaskExecutor<T>,
opts?: ForkOptions
): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()

Expand All @@ -105,6 +112,10 @@ const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
() => abortControllerWithReason(childAbortController, taskCompleted)
)

if (opts?.autoJoin) {
parentBlockingPromises.push(result)
}

return {
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
cancel() {
Expand Down Expand Up @@ -376,6 +387,7 @@ export function createListenerMiddleware<
startListening,
internalTaskController.signal
)
const autoJoinPromises: Promise<any>[] = []

try {
entry.pending.add(internalTaskController)
Expand All @@ -394,7 +406,7 @@ export function createListenerMiddleware<
pause: createPause<any>(internalTaskController.signal),
extra,
signal: internalTaskController.signal,
fork: createFork(internalTaskController.signal),
fork: createFork(internalTaskController.signal, autoJoinPromises),
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
Expand All @@ -417,6 +429,8 @@ export function createListenerMiddleware<
})
}
} finally {
await Promise.allSettled(autoJoinPromises)

abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed
entry.pending.delete(internalTaskController)
}
Expand Down
71 changes: 52 additions & 19 deletions packages/toolkit/src/listenerMiddleware/tests/fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
listenerCancelled,
listenerCompleted,
taskCancelled,
taskCompleted,
} from '../exceptions'

function delay(ms: number) {
Expand Down Expand Up @@ -349,28 +350,60 @@ describe('fork', () => {
)
})

test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
let deferredResult = deferred()
it.each([
{
autoJoin: true,
expectedAbortReason: taskCompleted,
cancelListener: false,
},
{
autoJoin: false,
expectedAbortReason: listenerCompleted,
cancelListener: false,
},
{
autoJoin: true,
expectedAbortReason: listenerCancelled,
cancelListener: true,
},
{
autoJoin: false,
expectedAbortReason: listenerCancelled,
cancelListener: true,
},
])(
'signal is $expectedAbortReason when autoJoin: $autoJoin, cancelListener: $cancelListener',
async ({ autoJoin, cancelListener, expectedAbortReason }) => {
let deferredResult = deferred()

const unsubscribe = startListening({
actionCreator: increment,
async effect(_, listenerApi) {
listenerApi.fork(
async (forkApi) => {
forkApi.signal.addEventListener('abort', () => {
deferredResult.resolve(
(forkApi.signal as AbortSignalWithReason<unknown>).reason
)
})

await forkApi.delay(10)
},
{ autoJoin }
)
},
})

startListening({
actionCreator: increment,
async effect(_, listenerApi) {
const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
async (forkApi) => {
forkApi.signal.addEventListener('abort', () => {
deferredResult.resolve(
(forkApi.signal as AbortSignalWithReason<unknown>).reason
)
})
}
)
},
})
store.dispatch(increment())

store.dispatch(increment())
// let task start
await Promise.resolve()

expect(await deferredResult).toBe(listenerCompleted)
})
if (cancelListener) unsubscribe({ cancelActive: true })

expect(await deferredResult).toBe(expectedAbortReason)
}
)

test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
let deferredCompletedEvt = deferred()
Expand Down
12 changes: 11 additions & 1 deletion packages/toolkit/src/listenerMiddleware/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ export interface ForkedTask<T> {
cancel(): void
}

/** @public */
export interface ForkOptions {
/**
* If true, causes the parent task to not be marked as complete until
* all autoJoined forks have completed or failed.
*/
autoJoin: boolean;
}

/** @public */
export interface ListenerEffectAPI<
State,
Expand Down Expand Up @@ -238,8 +247,9 @@ export interface ListenerEffectAPI<
/**
* Queues in the next microtask the execution of a task.
* @param executor
* @param options
*/
fork<T>(executor: ForkedTaskExecutor<T>): ForkedTask<T>
fork<T>(executor: ForkedTaskExecutor<T>, options?: ForkOptions): ForkedTask<T>
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the listener has been cancelled or is completed.
Expand Down