diff --git a/packages/shared/sdk-client/__tests__/async/AsyncTaskQueue.test.ts b/packages/shared/sdk-client/__tests__/async/AsyncTaskQueue.test.ts new file mode 100644 index 0000000000..029c44cd21 --- /dev/null +++ b/packages/shared/sdk-client/__tests__/async/AsyncTaskQueue.test.ts @@ -0,0 +1,205 @@ +import { AsyncTaskQueue } from '../../src/async/AsyncTaskQueue'; + +it.each([true, false])( + 'executes the initial task it is given: sheddable: %s', + async (sheddable) => { + const queue = new AsyncTaskQueue(); + const task = jest.fn().mockResolvedValue('test'); + const result = await queue.execute(task, sheddable); + expect(queue.pendingCount()).toBe(0); + expect(result).toEqual({ + status: 'complete', + result: 'test', + }); + expect(task).toHaveBeenCalled(); + }, +); + +it.each([true, false])( + 'executes the next task in the queue when the previous task completes: sheddable: %s', + async (sheddable) => { + const queue = new AsyncTaskQueue(); + const task1 = jest.fn().mockResolvedValue('test1'); + const task2 = jest.fn().mockResolvedValue('test2'); + const promise1 = queue.execute(task1, sheddable); + const promise2 = queue.execute(task2, sheddable); + // We have not awaited, so there has not been an opportunity to execute any tasks. + expect(queue.pendingCount()).toBe(1); + + const [result1, result2] = await Promise.all([promise1, promise2]); + expect(result1).toEqual({ + status: 'complete', + result: 'test1', + }); + expect(result2).toEqual({ + status: 'complete', + result: 'test2', + }); + expect(task1).toHaveBeenCalled(); + expect(task2).toHaveBeenCalled(); + }, +); + +it('can shed pending sheddable tasks', async () => { + const queue = new AsyncTaskQueue(); + const task1 = jest.fn().mockResolvedValue('test1'); + const task2 = jest.fn().mockResolvedValue('test2'); + const task3 = jest.fn().mockResolvedValue('test3'); + const promise1 = queue.execute(task1, true); + const promise2 = queue.execute(task2, true); + const promise3 = queue.execute(task3, true); + + const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]); + expect(result1).toEqual({ + status: 'complete', + result: 'test1', + }); + expect(result2).toEqual({ + status: 'shed', + }); + expect(result3).toEqual({ + status: 'complete', + result: 'test3', + }); + expect(task1).toHaveBeenCalled(); + expect(task2).not.toHaveBeenCalled(); + expect(task3).toHaveBeenCalled(); +}); + +it('does not shed pending non-sheddable tasks', async () => { + const queue = new AsyncTaskQueue(); + const task1 = jest.fn().mockResolvedValue('test1'); + const task2 = jest.fn().mockResolvedValue('test2'); + const task3 = jest.fn().mockResolvedValue('test3'); + const promise1 = queue.execute(task1, false); + const promise2 = queue.execute(task2, false); + const promise3 = queue.execute(task3, false); + + const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]); + expect(result1).toEqual({ + status: 'complete', + result: 'test1', + }); + expect(result2).toEqual({ + status: 'complete', + result: 'test2', + }); + expect(result3).toEqual({ + status: 'complete', + result: 'test3', + }); + expect(task1).toHaveBeenCalled(); + expect(task2).toHaveBeenCalled(); + expect(task3).toHaveBeenCalled(); +}); + +it('can handle errors from tasks', async () => { + const queue = new AsyncTaskQueue(); + const task1 = jest.fn().mockRejectedValue(new Error('test')); + const task2 = jest.fn().mockResolvedValue('test2'); + const promise1 = queue.execute(task1, true); + const promise2 = queue.execute(task2, true); + const [result1, result2] = await Promise.all([promise1, promise2]); + expect(result1).toEqual({ + status: 'error', + error: new Error('test'), + }); + expect(result2).toEqual({ + status: 'complete', + result: 'test2', + }); + expect(task1).toHaveBeenCalled(); + expect(task2).toHaveBeenCalled(); +}); + +it('handles mix of sheddable and non-sheddable tasks correctly', async () => { + const queue = new AsyncTaskQueue(); + const task1 = jest.fn().mockResolvedValue('test1'); + const task2 = jest.fn().mockResolvedValue('test2'); + const task3 = jest.fn().mockResolvedValue('test3'); + const task4 = jest.fn().mockResolvedValue('test4'); + + // Add tasks in order: sheddable, non-sheddable, sheddable, non-sheddable + const promise1 = queue.execute(task1, true); + const promise2 = queue.execute(task2, false); + const promise3 = queue.execute(task3, true); + const promise4 = queue.execute(task4, false); + + const [result1, result2, result3, result4] = await Promise.all([ + promise1, + promise2, + promise3, + promise4, + ]); + + // First task should complete + expect(result1).toEqual({ + status: 'complete', + result: 'test1', + }); + + // Second task should complete (not sheddable) + expect(result2).toEqual({ + status: 'complete', + result: 'test2', + }); + + // Third task should be shed + expect(result3).toEqual({ + status: 'shed', + }); + + // Fourth task should complete + expect(result4).toEqual({ + status: 'complete', + result: 'test4', + }); + + expect(task1).toHaveBeenCalled(); + expect(task2).toHaveBeenCalled(); + expect(task3).not.toHaveBeenCalled(); + expect(task4).toHaveBeenCalled(); +}); + +it('executes tasks in order regardless of time to complete', async () => { + const queue = new AsyncTaskQueue(); + const timedPromise = (ms: number) => + new Promise((resolve) => { + setTimeout(resolve, ms); + }); + const callOrder: string[] = []; + const task1 = jest.fn().mockImplementation(() => { + callOrder.push('task1Start'); + return timedPromise(10).then(() => { + callOrder.push('task1End'); + return 'test1'; + }); + }); + const task2 = jest.fn().mockImplementation(() => { + callOrder.push('task2Start'); + return timedPromise(5).then(() => { + callOrder.push('task2End'); + return 'test2'; + }); + }); + const task3 = jest.fn().mockImplementation(() => { + callOrder.push('task3Start'); + return timedPromise(20).then(() => { + callOrder.push('task3End'); + return 'test3'; + }); + }); + const promise1 = queue.execute(task1, false); + const promise2 = queue.execute(task2, false); + const promise3 = queue.execute(task3, false); + + await Promise.all([promise1, promise2, promise3]); + expect(callOrder).toEqual([ + 'task1Start', + 'task1End', + 'task2Start', + 'task2End', + 'task3Start', + 'task3End', + ]); +}); diff --git a/packages/shared/sdk-client/src/async/AsyncTaskQueue.ts b/packages/shared/sdk-client/src/async/AsyncTaskQueue.ts new file mode 100644 index 0000000000..d6fcd9777f --- /dev/null +++ b/packages/shared/sdk-client/src/async/AsyncTaskQueue.ts @@ -0,0 +1,179 @@ +import { LDLogger } from '@launchdarkly/js-sdk-common'; + +/** + * Represents a task that has been shed from the queue. + * This task will never be executed. + */ +export interface ShedTask { + status: 'shed'; +} + +/** + * Represents a task that has been ran to completion. + */ +export interface CompletedTask { + status: 'complete'; + result: TTaskResult; +} + +/** + * Represents a task that has errored. + */ +export interface ErroredTask { + status: 'error'; + error: Error; +} + +/** + * Represents the result of a task. + */ +export type TaskResult = CompletedTask | ErroredTask | ShedTask; + +/** + * Represents a pending task. This encapsulates the async function that needs to be executed as well as a promise that represents its state. + * The promise is not directly the promise associated with the async function, because we will not execute the async function until some point in the future, if at all. + * */ +interface PendingTask { + sheddable: boolean; + execute: () => void; + shed: () => void; + promise: Promise>; +} + +const duplicateExecutionError = new Error( + 'Task has already been executed or shed. This is likely an implementation error. The task will not be executed again.', +); + +/** + * Creates a pending task. + * @param task The async function to execute. + * @param sheddable Whether the task can be shed from the queue. + * @returns A pending task. + */ +function makePending( + task: () => Promise, + _logger?: LDLogger, + sheddable: boolean = false, +): PendingTask { + let res: (value: TaskResult) => void; + + const promise = new Promise>((resolve) => { + res = resolve; + }); + + let executedOrShed = false; + return { + execute: () => { + if (executedOrShed) { + // This should never happen. If it does, then it represents an implementation error in the SDK. + _logger?.error(duplicateExecutionError); + } + executedOrShed = true; + task() + .then((result) => res({ status: 'complete', result })) + .catch((error) => res({ status: 'error', error })); + }, + shed: () => { + if (executedOrShed) { + // This should never happen. If it does, then it represents an implementation error in the SDK. + _logger?.error(duplicateExecutionError); + } + executedOrShed = true; + res({ status: 'shed' }); + }, + promise, + sheddable, + }; +} + +/** + * An asynchronous task queue with the ability to replace pending tasks. + * + * This is useful when you have asynchronous operations which much execute in order, and for cases where intermediate + * operations can be discarded. + * + * For instance, the SDK can only have one active context at a time, if you request identification of many contexts, + * then the ultimate state will be based on the last request. The intermediate identifies can be discarded. + * + * This class will always begin execution of the first item added to the queue, at that point the item itself is not + * queued, but active. If another request is made while that item is still active, then it is added to the queue. + * A third request would then replace the second request if the second request had not yet become active, and it was + * sheddable. + * + * Once a task is active the queue will complete it. It doesn't cancel tasks that it has started, but it can shed tasks + * that have not started. + * + * TTaskResult Is the return type of the task to be executed. Tasks accept no parameters. So if you need parameters + * you should use a lambda to capture them. + * + * Exceptions from tasks are always handled and the execute method will never reject a promise. + * + * Queue management should be done synchronously. There should not be asynchronous operations between checking the queue + * and acting on the results of said check. + */ +export class AsyncTaskQueue { + private _activeTask?: Promise>; + private _queue: PendingTask[] = []; + + constructor(private readonly _logger?: LDLogger) {} + + /** + * Execute a task using the queue. + * + * @param task The async function to execute. + * @param sheddable Whether the task can be shed from the queue. + * @returns A promise that resolves to the result of the task. + */ + execute( + task: () => Promise, + sheddable: boolean = false, + ): Promise> { + const pending = makePending(task, this._logger, sheddable); + + if (!this._activeTask) { + this._activeTask = pending.promise.finally(() => { + this._activeTask = undefined; + this._checkPending(); + }); + pending.execute(); + } else { + // If the last pending task is sheddable, we need to shed it before adding the new task. + if (this._queue[this._queue.length - 1]?.sheddable) { + this._queue.pop()?.shed(); + } + this._queue.push(pending); + } + + return pending.promise; + } + + private _checkPending() { + // There is an existing active task, so we don't need to do anything. + if (this._activeTask) { + return; + } + + // There are pending tasks, so we need to execute the next one. + if (this._queue.length > 0) { + const nextTask = this._queue.shift()!; + + this._activeTask = nextTask.promise.finally(() => { + this._activeTask = undefined; + this._checkPending(); + }); + + nextTask.execute(); + } + } + + /** + * Returns the number of pending tasks in the queue. + * Intended for testing purposes only. + * + * @internal + * @returns The number of pending tasks in the queue. + */ + public pendingCount(): number { + return this._queue.length; + } +}