Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions packages/shared/sdk-client/__tests__/async/AsyncTaskQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import { AsyncTaskQueue } from '../../src/async/AsyncTaskQueue';

it.each([true, false])(
'executes the initial task it is given: sheddable: %s',
async (sheddable) => {
const queue = new AsyncTaskQueue<string>();
const task = jest.fn().mockResolvedValue('test');
const result = await queue.execute(task, sheddable);
expect(queue.pendingCount()).toBe(0);
expect(result).toEqual({
status: 'complete',
result: 'test',
});
expect(task).toHaveBeenCalled();
},
);

it.each([true, false])(
'executes the next task in the queue when the previous task completes: sheddable: %s',
async (sheddable) => {
const queue = new AsyncTaskQueue<string>();
const task1 = jest.fn().mockResolvedValue('test1');
const task2 = jest.fn().mockResolvedValue('test2');
const promise1 = queue.execute(task1, sheddable);
const promise2 = queue.execute(task2, sheddable);
// We have not awaited, so there has not been an opportunity to execute any tasks.
expect(queue.pendingCount()).toBe(1);

const [result1, result2] = await Promise.all([promise1, promise2]);
expect(result1).toEqual({
status: 'complete',
result: 'test1',
});
expect(result2).toEqual({
status: 'complete',
result: 'test2',
});
expect(task1).toHaveBeenCalled();
expect(task2).toHaveBeenCalled();
},
);

it('can shed pending sheddable tasks', async () => {
const queue = new AsyncTaskQueue<string>();
const task1 = jest.fn().mockResolvedValue('test1');
const task2 = jest.fn().mockResolvedValue('test2');
const task3 = jest.fn().mockResolvedValue('test3');
const promise1 = queue.execute(task1, true);
const promise2 = queue.execute(task2, true);
const promise3 = queue.execute(task3, true);

const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]);
expect(result1).toEqual({
status: 'complete',
result: 'test1',
});
expect(result2).toEqual({
status: 'shed',
});
expect(result3).toEqual({
status: 'complete',
result: 'test3',
});
expect(task1).toHaveBeenCalled();
expect(task2).not.toHaveBeenCalled();
expect(task3).toHaveBeenCalled();
});

it('does not shed pending non-sheddable tasks', async () => {
const queue = new AsyncTaskQueue<string>();
const task1 = jest.fn().mockResolvedValue('test1');
const task2 = jest.fn().mockResolvedValue('test2');
const task3 = jest.fn().mockResolvedValue('test3');
const promise1 = queue.execute(task1, false);
const promise2 = queue.execute(task2, false);
const promise3 = queue.execute(task3, false);

const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]);
expect(result1).toEqual({
status: 'complete',
result: 'test1',
});
expect(result2).toEqual({
status: 'complete',
result: 'test2',
});
expect(result3).toEqual({
status: 'complete',
result: 'test3',
});
expect(task1).toHaveBeenCalled();
expect(task2).toHaveBeenCalled();
expect(task3).toHaveBeenCalled();
});

it('can handle errors from tasks', async () => {
const queue = new AsyncTaskQueue<string>();
const task1 = jest.fn().mockRejectedValue(new Error('test'));
const task2 = jest.fn().mockResolvedValue('test2');
const promise1 = queue.execute(task1, true);
const promise2 = queue.execute(task2, true);
const [result1, result2] = await Promise.all([promise1, promise2]);
expect(result1).toEqual({
status: 'error',
error: new Error('test'),
});
expect(result2).toEqual({
status: 'complete',
result: 'test2',
});
expect(task1).toHaveBeenCalled();
expect(task2).toHaveBeenCalled();
});

it('handles mix of sheddable and non-sheddable tasks correctly', async () => {
const queue = new AsyncTaskQueue<string>();
const task1 = jest.fn().mockResolvedValue('test1');
const task2 = jest.fn().mockResolvedValue('test2');
const task3 = jest.fn().mockResolvedValue('test3');
const task4 = jest.fn().mockResolvedValue('test4');

// Add tasks in order: sheddable, non-sheddable, sheddable, non-sheddable
const promise1 = queue.execute(task1, true);
const promise2 = queue.execute(task2, false);
const promise3 = queue.execute(task3, true);
const promise4 = queue.execute(task4, false);

const [result1, result2, result3, result4] = await Promise.all([
promise1,
promise2,
promise3,
promise4,
]);

// First task should complete
expect(result1).toEqual({
status: 'complete',
result: 'test1',
});

// Second task should complete (not sheddable)
expect(result2).toEqual({
status: 'complete',
result: 'test2',
});

// Third task should be shed
expect(result3).toEqual({
status: 'shed',
});

// Fourth task should complete
expect(result4).toEqual({
status: 'complete',
result: 'test4',
});

expect(task1).toHaveBeenCalled();
expect(task2).toHaveBeenCalled();
expect(task3).not.toHaveBeenCalled();
expect(task4).toHaveBeenCalled();
});

it('executes tasks in order regardless of time to complete', async () => {
const queue = new AsyncTaskQueue<string>();
const timedPromise = (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});
const callOrder: string[] = [];
const task1 = jest.fn().mockImplementation(() => {
callOrder.push('task1Start');
return timedPromise(10).then(() => {
callOrder.push('task1End');
return 'test1';
});
});
const task2 = jest.fn().mockImplementation(() => {
callOrder.push('task2Start');
return timedPromise(5).then(() => {
callOrder.push('task2End');
return 'test2';
});
});
const task3 = jest.fn().mockImplementation(() => {
callOrder.push('task3Start');
return timedPromise(20).then(() => {
callOrder.push('task3End');
return 'test3';
});
});
const promise1 = queue.execute(task1, false);
const promise2 = queue.execute(task2, false);
const promise3 = queue.execute(task3, false);

await Promise.all([promise1, promise2, promise3]);
expect(callOrder).toEqual([
'task1Start',
'task1End',
'task2Start',
'task2End',
'task3Start',
'task3End',
]);
});
179 changes: 179 additions & 0 deletions packages/shared/sdk-client/src/async/AsyncTaskQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { LDLogger } from '@launchdarkly/js-sdk-common';

/**
* Represents a task that has been shed from the queue.
* This task will never be executed.
*/
export interface ShedTask {
status: 'shed';
}

/**
* Represents a task that has been ran to completion.
*/
export interface CompletedTask<TTaskResult> {
status: 'complete';
result: TTaskResult;
}

/**
* Represents a task that has errored.
*/
export interface ErroredTask {
status: 'error';
error: Error;
}

/**
* Represents the result of a task.
*/
export type TaskResult<TTaskResult> = CompletedTask<TTaskResult> | ErroredTask | ShedTask;

/**
* Represents a pending task. This encapsulates the async function that needs to be executed as well as a promise that represents its state.
* The promise is not directly the promise associated with the async function, because we will not execute the async function until some point in the future, if at all.
* */
interface PendingTask<TTaskResult> {
sheddable: boolean;
execute: () => void;
shed: () => void;
promise: Promise<TaskResult<TTaskResult>>;
}

const duplicateExecutionError = new Error(
'Task has already been executed or shed. This is likely an implementation error. The task will not be executed again.',
);

/**
* Creates a pending task.
* @param task The async function to execute.
* @param sheddable Whether the task can be shed from the queue.
* @returns A pending task.
*/
function makePending<TTaskResult>(
task: () => Promise<TTaskResult>,
_logger?: LDLogger,
sheddable: boolean = false,
): PendingTask<TTaskResult> {
let res: (value: TaskResult<TTaskResult>) => void;

const promise = new Promise<TaskResult<TTaskResult>>((resolve) => {
res = resolve;
});

let executedOrShed = false;
return {
execute: () => {
if (executedOrShed) {
// This should never happen. If it does, then it represents an implementation error in the SDK.
_logger?.error(duplicateExecutionError);
}
executedOrShed = true;
task()
.then((result) => res({ status: 'complete', result }))
.catch((error) => res({ status: 'error', error }));
},
shed: () => {
if (executedOrShed) {
// This should never happen. If it does, then it represents an implementation error in the SDK.
_logger?.error(duplicateExecutionError);
}
executedOrShed = true;
res({ status: 'shed' });
},
promise,
sheddable,
};
}

/**
* An asynchronous task queue with the ability to replace pending tasks.
*
* This is useful when you have asynchronous operations which much execute in order, and for cases where intermediate
* operations can be discarded.
*
* For instance, the SDK can only have one active context at a time, if you request identification of many contexts,
* then the ultimate state will be based on the last request. The intermediate identifies can be discarded.
*
* This class will always begin execution of the first item added to the queue, at that point the item itself is not
* queued, but active. If another request is made while that item is still active, then it is added to the queue.
* A third request would then replace the second request if the second request had not yet become active, and it was
* sheddable.
*
* Once a task is active the queue will complete it. It doesn't cancel tasks that it has started, but it can shed tasks
* that have not started.
*
* TTaskResult Is the return type of the task to be executed. Tasks accept no parameters. So if you need parameters
* you should use a lambda to capture them.
*
* Exceptions from tasks are always handled and the execute method will never reject a promise.
*
* Queue management should be done synchronously. There should not be asynchronous operations between checking the queue
* and acting on the results of said check.
*/
export class AsyncTaskQueue<TTaskResult> {
private _activeTask?: Promise<TaskResult<TTaskResult>>;
private _queue: PendingTask<TTaskResult>[] = [];

constructor(private readonly _logger?: LDLogger) {}

/**
* Execute a task using the queue.
*
* @param task The async function to execute.
* @param sheddable Whether the task can be shed from the queue.
* @returns A promise that resolves to the result of the task.
*/
execute(
task: () => Promise<TTaskResult>,
sheddable: boolean = false,
): Promise<TaskResult<TTaskResult>> {
const pending = makePending(task, this._logger, sheddable);

if (!this._activeTask) {
this._activeTask = pending.promise.finally(() => {
this._activeTask = undefined;
this._checkPending();
});
pending.execute();
} else {
// If the last pending task is sheddable, we need to shed it before adding the new task.
if (this._queue[this._queue.length - 1]?.sheddable) {
this._queue.pop()?.shed();
}
this._queue.push(pending);
}

return pending.promise;
}

private _checkPending() {
// There is an existing active task, so we don't need to do anything.
if (this._activeTask) {
return;
}

// There are pending tasks, so we need to execute the next one.
if (this._queue.length > 0) {
const nextTask = this._queue.shift()!;

this._activeTask = nextTask.promise.finally(() => {
this._activeTask = undefined;
this._checkPending();
});

nextTask.execute();
}
}

/**
* Returns the number of pending tasks in the queue.
* Intended for testing purposes only.
*
* @internal
* @returns The number of pending tasks in the queue.
*/
public pendingCount(): number {
return this._queue.length;
}
}