Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions packages/store/src/core/feature.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { UnionToIntersection } from '@videojs/utils/types';
import type { EnsureTaskRecord } from './queue';
import type {
Request,
RequestConfig,
Expand Down Expand Up @@ -78,10 +77,6 @@ export type UnionFeatureRequests<Features extends Feature<any, any, any>[]> = Un
ResolveFeatureRequestHandlers<Features[number]>
>;

export type UnionFeatureTasks<Features extends Feature<any, any, any>[]> = EnsureTaskRecord<
UnionToIntersection<InferFeatureRequests<Features[number]>>
>;

// ----------------------------------------
// createFeature
// ----------------------------------------
Expand Down
67 changes: 0 additions & 67 deletions packages/store/src/core/guard.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import { isBoolean } from '@videojs/utils/predicate';

import { StoreError } from './errors';

/**
* Result of a guard check.
*
Expand All @@ -25,66 +21,3 @@ export interface GuardContext<Target> {
* A guard gates request execution.
*/
export type Guard<Target> = (ctx: GuardContext<Target>) => GuardResult;

/**
* Combine guards: All must pass (truthy).
*/
export function all<Target>(...guards: Guard<Target>[]): Guard<Target> {
return async (ctx) => {
for (const guard of guards) {
const result = await guard(ctx);
if (!result) return false;
}

return true;
};
}

/**
* Combine guards: Any must pass (first truthy wins).
*/
export function any<Target>(...guards: Guard<Target>[]): Guard<Target> {
return (ctx) => {
const results = guards.map((g) => g(ctx));

// Check sync results first
if (results.includes(true)) return true;

// Filter to promises only
const promises = results.filter((r): r is Promise<unknown> => !isBoolean(r));

if (promises.length === 0) return false;

// Race: first truthy wins, all falsy = false
return new Promise((resolve, reject) => {
let pending = promises.length;
for (const p of promises) {
p.then((value) => {
if (value) resolve(value);
else if (--pending === 0) resolve(false);
}, reject);
}
});
};
}

/**
* Add timeout to a guard.
*/
export function timeout<Target>(guard: Guard<Target>, ms: number, name = 'guard'): Guard<Target> {
return async (ctx) => {
const result = guard(ctx);

if (isBoolean(result)) {
return result;
}

return Promise.race([
result,
new Promise<never>((_, reject) => {
const timer = setTimeout(() => reject(new StoreError('TIMEOUT', { message: `Timeout: ${name}` })), ms);
ctx.signal.addEventListener('abort', () => clearTimeout(timer));
}),
]);
};
}
5 changes: 2 additions & 3 deletions packages/store/src/core/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
export * from './computed';
export * from './errors';
export * from './feature';
export * from './guard';
export * from './queue';
export type { Guard, GuardContext, GuardResult } from './guard';
export type { TaskKey } from './queue';
export * from './request';
export * from './state';
export * from './store';
export * from './task';
229 changes: 29 additions & 200 deletions packages/store/src/core/queue.ts
Original file line number Diff line number Diff line change
@@ -1,250 +1,79 @@
import { abortable } from '@videojs/utils/events';
import { isUndefined } from '@videojs/utils/predicate';
import { StoreError } from './errors';
import type { Request, RequestMeta, RequestMode } from './request';
import type { StateChange, WritableState } from './state';
import { createState } from './state';
import type { ErrorTask, PendingTask, SuccessTask, Task, TaskContext, TaskKey } from './task';
import type { RequestMode } from './request';

// ----------------------------------------
// Types
// ----------------------------------------

export type TaskRecord = {
[K in TaskKey]: Request<any, any>;
};
export type TaskKey = string | symbol;

export type DefaultTaskRecord = Record<TaskKey, Request<unknown, unknown>>;

export type EnsureTaskRecord<T> = T extends TaskRecord ? T : never;

export interface QueueTask<Key extends TaskKey = TaskKey, Input = unknown, Output = unknown> {
name: string;
key: Key;
export interface QueueTask<Output = unknown> {
key: TaskKey;
mode?: RequestMode;
input?: Input;
meta?: RequestMeta | null;
handler: (ctx: TaskContext<Input>) => Promise<Output>;
handler: (ctx: { signal: AbortSignal }) => Promise<Output>;
}

export type TasksRecord<Tasks extends TaskRecord> = {
[K in keyof Tasks]?: Task<TaskKey<K>, Tasks[K]['input'], Tasks[K]['output']>;
};

// ----------------------------------------
// Implementation
// ----------------------------------------

export class Queue<Tasks extends TaskRecord = DefaultTaskRecord> {
readonly #tasks: WritableState<TasksRecord<Tasks>>;
readonly #sharedPromises = new Map<TaskKey, Promise<unknown>>();

export class Queue {
#pending = new Map<TaskKey, AbortController>();
#shared = new Map<TaskKey, Promise<unknown>>();
#destroyed = false;

/** Current task records. */
get tasks(): Readonly<TasksRecord<Tasks>> {
return this.#tasks.current;
}

/** Subscribe to task changes. */
subscribe(callback: StateChange): () => void {
return this.#tasks.subscribe(callback);
}

constructor() {
this.#tasks = createState({});
}

get destroyed(): boolean {
return this.#destroyed;
}

/** Clear settled task(s). If name provided, clears that task. If no name, clears all settled. */
reset(name?: keyof Tasks): void {
if (!isUndefined(name)) {
const task = this.#tasks.current[name];
if (!task || task.status === 'pending') return;

this.#tasks.delete(name);

return;
}

for (const key of Reflect.ownKeys(this.#tasks.current) as (keyof Tasks)[]) {
const task = this.#tasks.current[key];

if (task && task.status !== 'pending') {
this.#tasks.delete(key);
}
}
}

enqueue<K extends keyof Tasks>(
task: QueueTask<TaskKey<K>, Tasks[K]['input'], Tasks[K]['output']>
): Promise<Tasks[K]['output']> {
const { name, key, mode = 'exclusive', input, meta = null, handler } = task;

enqueue<Output>({ key, mode = 'exclusive', handler }: QueueTask<Output>): Promise<Output> {
if (this.#destroyed) {
return Promise.reject(new StoreError('DESTROYED'));
}

// Shared mode: join existing pending task with same key
// Shared mode: join existing
if (mode === 'shared') {
const existingPromise = this.#sharedPromises.get(key);
if (existingPromise) {
return existingPromise;
}
const existing = this.#shared.get(key);
if (existing) return existing as Promise<Output>;
}

// Supersede any pending task with the same key (may have different name)
for (const existingTask of Object.values(this.#tasks.current)) {
if (existingTask?.key === key && existingTask.status === 'pending') {
existingTask.abort.abort(new StoreError('SUPERSEDED'));
}
}
// Supersede pending with same key
this.#pending.get(key)?.abort(new StoreError('SUPERSEDED'));

const abort = new AbortController();
this.#pending.set(key, abort);

const promise = new Promise<Tasks[K]['output']>((resolve, reject) => {
this.#executeNow({
id: Symbol('@videojs/task'),
name,
key,
input,
meta,
handler,
resolve,
reject,
});
// Wrap with abortable so promise rejects on abort even if handler doesn't handle signal
const promise = abortable(handler({ signal: abort.signal }), abort.signal).finally(() => {
this.#pending.delete(key);
this.#shared.delete(key);
});

// Track promise for shared mode
if (mode === 'shared') {
this.#sharedPromises.set(key, promise);
// Use .then() to avoid unhandled rejection from .finally() propagating errors
promise.then(
() => this.#sharedPromises.delete(key),
() => this.#sharedPromises.delete(key)
);
this.#shared.set(key, promise);
}

return promise;
}

/** Abort task(s). If name provided, aborts that task. If no name, aborts all. */
abort(name?: keyof Tasks): void {
if (!isUndefined(name)) {
const task = this.#tasks.current[name];
if (task?.status === 'pending') {
task.abort.abort(new StoreError('ABORTED'));
}

abort(key?: TaskKey): void {
if (key !== undefined) {
this.#pending.get(key)?.abort(new StoreError('ABORTED'));
return;
}

const error = new StoreError('ABORTED');

for (const task of Object.values(this.#tasks.current)) {
if (task?.status === 'pending') {
task.abort.abort(error);
}
for (const controller of this.#pending.values()) {
controller.abort(error);
}
}

destroy(): void {
if (this.#destroyed) return;

this.#destroyed = true;
this.abort();

// Clear all tasks
for (const key of Reflect.ownKeys(this.#tasks.current)) {
this.#tasks.delete(key);
}

this.#sharedPromises.clear();
this.#pending.clear();
this.#shared.clear();
}

async #executeNow<K extends keyof Tasks>(params: {
id: symbol;
name: string;
key: TaskKey<K>;
input: Tasks[K]['input'];
meta: RequestMeta | null;
handler: (ctx: TaskContext<Tasks[K]['input']>) => Promise<Tasks[K]['output']>;
resolve: (value: Tasks[K]['output']) => void;
reject: (error: unknown) => void;
}): Promise<void> {
const { id, name, key, input, meta, handler, resolve, reject } = params;

const abort = new AbortController();
const startedAt = Date.now();

const pendingTask: PendingTask = {
status: 'pending',
id,
name,
key,
input,
startedAt,
abort,
meta,
};

// Store tasks by name for controller access
this.#tasks.set(name as keyof Tasks, pendingTask);

try {
const result = await abortable(handler({ input, signal: abort.signal }), abort.signal);

resolve(result);

// Only update if we're still the current task for this name
const currentTask = this.#tasks.current[name];

if (currentTask?.id === id) {
this.#tasks.set(name as keyof Tasks, {
...currentTask,
status: 'success',
settledAt: Date.now(),
output: result,
} satisfies SuccessTask);
}
} catch (error) {
reject(error);

// Only update if we're still the current task for this name
const currentTask = this.#tasks.current[name as keyof Tasks];

if (currentTask?.id === id) {
this.#tasks.set(name as keyof Tasks, {
...currentTask,
status: 'error',
settledAt: Date.now(),
error,
cancelled: abort.signal.aborted,
} satisfies ErrorTask);
}
}
}
}

// ----------------------------------------
// Factory
// ----------------------------------------

/**
* Create a queue for managing task execution.
*
* @example
* // Loose typing (default)
* const queue = createQueue();
*
* @example
* // Strongly typed keys
* const queue = createQueue<{
* 'playback': Request;
* 'volume': Request<number>;
* }>();
*/
export function createQueue<Tasks extends TaskRecord = DefaultTaskRecord>(): Queue<Tasks> {
return new Queue<Tasks>();
}
2 changes: 1 addition & 1 deletion packages/store/src/core/request.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { EventLike } from '@videojs/utils/events';
import { isFunction, isObject } from '@videojs/utils/predicate';
import type { Guard } from './guard';
import type { TaskKey } from './task';
import type { TaskKey } from './queue';

// ----------------------------------------
// Symbols
Expand Down
Loading