Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ sec-db/
models/
.worktrees/
scratch/
.claude/worktrees/
3 changes: 0 additions & 3 deletions examples/web/src/status/QueueStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export function QueueStatus({ queueType }: { queueType: string }) {
const [pending, setPending] = useState<number>(0);
const [processing, setProcessing] = useState<number>(0);
const [completed, setCompleted] = useState<number>(0);
const [aborting, setAborting] = useState<number>(0);
const [errors, setErrors] = useState<number>(0);
const [disabled, setDisabled] = useState<number>(0);

Expand All @@ -26,7 +25,6 @@ export function QueueStatus({ queueType }: { queueType: string }) {
setPending(await client.size(JobStatus.PENDING));
setProcessing(await client.size(JobStatus.PROCESSING));
setCompleted(await client.size(JobStatus.COMPLETED));
setAborting(await client.size(JobStatus.ABORTING));
setErrors(await client.size(JobStatus.FAILED));
setDisabled(await client.size(JobStatus.DISABLED));
}
Expand All @@ -53,7 +51,6 @@ export function QueueStatus({ queueType }: { queueType: string }) {
setPending(0);
setProcessing(0);
setCompleted(0);
setAborting(0);
setErrors(0);
setDisabled(0);
}, [registeredQueue]);
Expand Down
8 changes: 4 additions & 4 deletions packages/ai/src/execution/QueuedExecutionStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class QueuedExecutionStrategy implements IAiExecutionStrategy {
this.limiter = new ConcurrencyLimiter(this.concurrency);
}
const limiter = this.limiter;
await this.acquireLimiterSlot(limiter, context.signal);
const token = await this.acquireLimiterSlot(limiter, context.signal);

try {
const job = new AiJob({
Expand All @@ -62,7 +62,7 @@ export class QueuedExecutionStrategy implements IAiExecutionStrategy {
emit
);
} finally {
await limiter.recordJobCompletion();
await limiter.complete(token);
}
}

Expand All @@ -75,7 +75,7 @@ export class QueuedExecutionStrategy implements IAiExecutionStrategy {
* abort. Uses {@link ILimiter.tryAcquire} so concurrent callers cannot both
* pass a check-then-record sequence and overshoot the configured limit.
*/
private async acquireLimiterSlot(limiter: ILimiter, signal: AbortSignal): Promise<void> {
private async acquireLimiterSlot(limiter: ILimiter, signal: AbortSignal): Promise<unknown> {
let token = await limiter.tryAcquire();
while (token === null || token === undefined) {
if (signal.aborted) {
Expand All @@ -86,6 +86,6 @@ export class QueuedExecutionStrategy implements IAiExecutionStrategy {
await new Promise<void>((resolve) => setTimeout(resolve, Math.max(20, Math.min(delay, 200))));
token = await limiter.tryAcquire();
}
void token;
return token;
}
}
3 changes: 1 addition & 2 deletions packages/ai/src/job/AiJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
AbortSignalJobError,
IJobExecuteContext,
Job,
JobStatus,
PermanentJobError,
RetryableJobError,
withJobErrorDiagnostics,
Expand Down Expand Up @@ -241,7 +240,7 @@ export class AiJob<
context: IJobExecuteContext,
emit: AiEmit<Output>
): Promise<void> {
if (context.signal.aborted || this.status === JobStatus.ABORTING) {
if (context.signal.aborted) {
throw new AbortSignalJobError("Abort signal aborted before execution of job");
}

Expand Down
95 changes: 95 additions & 0 deletions packages/indexeddb/src/job-queue/IndexedDbJobStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

import type { IJobStore, JobRecord, JobStatus, MessageId } from "@workglow/job-queue";
import type { PendingIndexedDbWrite } from "./IndexedDbMessageQueue";
import type { IndexedDbQueueStorage } from "./IndexedDbQueueStorage";

export class IndexedDbJobStore<Input, Output> implements IJobStore<Input, Output> {
/** @internal — shared with the paired message queue */
public readonly core: IndexedDbQueueStorage<Input, Output>;

/** @internal — shared transient buffer for saveResult/saveError. */
private readonly pending: Map<unknown, PendingIndexedDbWrite<Output>>;

constructor(
core: IndexedDbQueueStorage<Input, Output>,
pending: Map<unknown, PendingIndexedDbWrite<Output>>
) {
this.core = core;
this.pending = pending;
}

get(id: MessageId): Promise<JobRecord<Input, Output> | undefined> {
return this.core.get(id);
}

async peek(status?: JobStatus, num?: number): Promise<readonly JobRecord<Input, Output>[]> {
return this.core.peek(status as any, num);
}

size(status?: JobStatus): Promise<number> {
return this.core.size(status as any);
}

async getByRunId(runId: string): Promise<readonly JobRecord<Input, Output>[]> {
return this.core.getByRunId(runId);
}

outputForInput(input: Input): Promise<Output | null> {
return this.core.outputForInput(input);
}

async saveProgress(
id: MessageId,
progress: number,
message: string,
details: Record<string, any> | null
): Promise<void> {
await this.core.saveProgress(id, progress, message, details);
}

async saveResult(id: MessageId, output: Output): Promise<void> {
const buf = this.pending.get(id) ?? {};
buf.output = output ?? null;
this.pending.set(id, buf);
}

async saveError(
id: MessageId,
error: string,
errorCode: string | null,
abortRequested: boolean
): Promise<void> {
const buf = this.pending.get(id) ?? {};
buf.error = error;
buf.errorCode = errorCode;
buf.abortRequested = abortRequested;
this.pending.set(id, buf);
}

async deleteByStatusAndAge(status: JobStatus, olderThanMs: number): Promise<void> {
await this.core.deleteJobsByStatusAndAge(status, olderThanMs);
}

async delete(id: MessageId): Promise<void> {
this.pending.delete(id);
await this.core.delete(id);
}

async deleteAll(): Promise<void> {
this.pending.clear();
await this.core.deleteAll();
}

async abort(id: MessageId): Promise<void> {
await this.core.abort(id);
}

async saveStatus(id: MessageId, status: JobStatus): Promise<void> {
await this.core.saveStatus(id, status);
}
}
Loading
Loading