Skip to content
6 changes: 6 additions & 0 deletions .server-changes/task-metadata-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options.
24 changes: 24 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,30 @@ const EnvironmentSchema = z
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

TASK_META_CACHE_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
TASK_META_CACHE_REDIS_PORT: z.coerce
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
),
TASK_META_CACHE_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
TASK_META_CACHE_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TASK_META_CACHE_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400),
TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000),

REALTIME_STREAMS_REDIS_HOST: z
.string()
.optional()
Expand Down
238 changes: 148 additions & 90 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3";
import { ServiceValidationError } from "~/v3/services/common.server";
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
import { singleton } from "~/utils/singleton";
import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server";
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";

// LRU cache for environment queue sizes to reduce Redis calls
const queueSizeCache = singleton("queueSizeCache", () => {
Expand Down Expand Up @@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef

export class DefaultQueueManager implements QueueManager {
private readonly replicaPrisma: PrismaClientOrTransaction;
private readonly taskMetaCache: TaskMetadataCache;

constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine,
replicaPrisma?: PrismaClientOrTransaction
replicaPrisma?: PrismaClientOrTransaction,
taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance
) {
this.replicaPrisma = replicaPrisma ?? prisma;
this.taskMetaCache = taskMetaCache;
}

async resolveQueueProperties(
Expand All @@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager {
const specifiedQueueName = extractQueueName(request.body.options?.queue);

if (specifiedQueueName) {
// A specific queue name is provided, validate it exists for the locked worker
// A specific queue name is provided, validate it exists for the locked worker.
// Pre-existing query — not cached because TaskQueue rows can be added or
// removed independently of BackgroundWorkerTask, and a stale "queue exists"
// claim would silently route to the wrong queue.
const specifiedQueue = await this.prisma.taskQueue.findFirst({
where: {
name: specifiedQueueName,
Expand All @@ -107,49 +115,45 @@ export class DefaultQueueManager implements QueueManager {
queueName = specifiedQueue.name;
lockedQueueId = specifiedQueue.id;

// Always fetch the task so we can resolve `triggerSource` (which
// becomes `taskKind` on annotations and replicates to ClickHouse).
// Without this, AGENT/SCHEDULED runs triggered with
// `lockToVersion` + a queue override would be annotated as
// STANDARD and disappear from the run-list "Source" filter.
// `ttl` is read from the same row but only used when the caller
// didn't specify a per-trigger TTL.
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
slug: request.taskId,
},
select: { ttl: true, triggerSource: true },
});
// Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache.
// On cache hit this is 0 PG queries; on miss the helper falls back to
// a BackgroundWorkerTask lookup and back-fills the cache.
//
// If the task slug isn't on this locked worker version, we tolerate
// the missing row and fall through with `taskKind = undefined`
// (coalesced to "STANDARD" downstream) and `taskTtl = undefined`.
// This matches main's pre-PR behavior — the no-override branch below
// still throws because there's no queue to route to in that case,
// but here the caller already named the queue.
const lockedMeta = await this.resolveLockedTaskMetadata(
lockedBackgroundWorker.id,
request.environment.id,
request.taskId
);

if (request.body.options?.ttl === undefined) {
taskTtl = lockedTask?.ttl;
taskTtl = lockedMeta?.ttl ?? undefined;
}
taskKind = lockedTask?.triggerSource;
taskKind = lockedMeta?.triggerSource;
} else {
// No queue override - fetch task with queue to get both default queue and TTL
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
slug: request.taskId,
},
include: {
queue: true,
},
});
// No queue override - resolve default queue + TTL + triggerSource via cache,
// falling back to a single BackgroundWorkerTask lookup on miss.
const lockedMeta = await this.resolveLockedTaskMetadata(
lockedBackgroundWorker.id,
request.environment.id,
request.taskId
);

if (!lockedTask) {
if (!lockedMeta) {
throw new ServiceValidationError(
`Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
}'.`
);
}

taskTtl = lockedTask.ttl;
taskTtl = lockedMeta.ttl;

if (!lockedTask.queue) {
if (!lockedMeta.queueName) {
// This case should ideally be prevented by earlier checks or schema constraints,
// but handle it defensively.
logger.error("Task found on locked version, but has no associated queue record", {
Expand All @@ -164,9 +168,9 @@ export class DefaultQueueManager implements QueueManager {
}

// Use the task's default queue name
queueName = lockedTask.queue.name;
lockedQueueId = lockedTask.queue.id;
taskKind = lockedTask.triggerSource;
queueName = lockedMeta.queueName;
lockedQueueId = lockedMeta.queueId ?? undefined;
taskKind = lockedMeta.triggerSource;
}
} else {
// Task is not locked to a specific version, use regular logic
Expand Down Expand Up @@ -213,76 +217,130 @@ export class DefaultQueueManager implements QueueManager {

const defaultQueueName = `task/${taskId}`;

// Even when the caller provides both a queue override and a
// per-trigger TTL, we still need to fetch the task so `triggerSource`
// (which becomes `taskKind` on annotations and replicates to
// ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting
// this path get stamped as STANDARD and disappear from the
// dashboard's `Source` filter. Mirrors the locked-worker fix above
// — `taskTtl` is harmless in the returned value because the call
// site coalesces `body.options.ttl ?? taskTtl`.

// Find the current worker for the environment. Replica is fine here —
// the adjacent `backgroundWorkerTask` lookups below already use
// `replicaPrisma` (replica lag for "just deployed" is bounded the same
// way for both queries; reading the worker from the writer and the
// task from the replica would only widen the inconsistency window).
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
// Resolve the current worker's task metadata via cache (HGET on warm path,
// BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits,
// both the queue-override + TTL caller and the default-queue caller satisfy
// their full result without any database query.
const meta = await this.resolveCurrentTaskMetadata(environment, taskId);

if (overriddenQueueName) {
// Caller already named the queue. We only need triggerSource (for taskKind)
// and ttl (for the call site to coalesce against body.options.ttl).
return {
queueName: overriddenQueueName,
taskTtl: meta?.ttl ?? undefined,
taskKind: meta?.triggerSource,
};
}

if (!worker) {
logger.debug("Failed to get queue name: No worker found", {
if (!meta) {
logger.debug("Failed to get queue name: No worker or task found", {
taskId,
environmentId: environment.id,
});

return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined };
return { queueName: defaultQueueName, taskTtl: undefined };
}

// When queue is overridden, we only need TTL from the task (no queue join needed)
if (overriddenQueueName) {
const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
slug: taskId,
},
select: { ttl: true, triggerSource: true },
if (!meta.queueName) {
logger.debug("Failed to get queue name: No queue found", {
taskId,
environmentId: environment.id,
});

return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource };
return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
}

const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
slug: taskId,
},
include: {
queue: true,
return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
}

/**
* Resolve task metadata for a locked-version trigger. Reads from the
* `task-meta:by-worker:{workerId}` Redis hash; falls back to a single
* BackgroundWorkerTask findFirst on miss and back-fills the cache.
*
* Returns null when no BackgroundWorkerTask row exists.
*/
private async resolveLockedTaskMetadata(
workerId: string,
environmentId: string,
slug: string
): Promise<TaskMetadataEntry | null> {
const cached = await this.taskMetaCache.getByWorker(workerId, slug);
if (cached) return cached;

const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: { workerId, runtimeEnvironmentId: environmentId, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});

if (!task) {
console.log("Failed to get queue name: No task found", {
taskId,
environmentId: environment.id,
});
if (!row) return null;

return { queueName: defaultQueueName, taskTtl: undefined };
}
const entry: TaskMetadataEntry = {
slug,
ttl: row.ttl,
triggerSource: row.triggerSource,
queueId: row.queue?.id ?? null,
queueName: row.queue?.name ?? "",
};

if (!task.queue) {
console.log("Failed to get queue name: No queue found", {
taskId,
environmentId: environment.id,
queueConfig: task.queueConfig,
});
// Fire-and-forget back-fill — `setByWorker` upserts the single field and
// refreshes the hash TTL. Errors are logged inside the cache and swallowed.
void this.taskMetaCache.setByWorker(workerId, entry);

return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
}
return entry;
}

/**
* Resolve task metadata for a non-locked trigger. Reads from the
* `task-meta:env:{envId}` Redis hash; falls back to
* findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst
* on miss and back-fills both keyspaces.
*
* Returns null when no current worker or task can be resolved.
*/
private async resolveCurrentTaskMetadata(
environment: AuthenticatedEnvironment,
slug: string
): Promise<TaskMetadataEntry | null> {
const cached = await this.taskMetaCache.getCurrent(environment.id, slug);
if (cached) return cached;

// Cold cache: discover the current worker for the env. Replica is fine —
// the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too
// (replica lag for "just deployed" is bounded the same way for both
// queries; reading from the writer here would only widen the window).
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
if (!worker) return null;

const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});

if (!row) return null;

const entry: TaskMetadataEntry = {
slug,
ttl: row.ttl,
triggerSource: row.triggerSource,
queueId: row.queue?.id ?? null,
queueName: row.queue?.name ?? "",
};

// Fire-and-forget back-fill — atomically upserts the slug into both
// keyspaces so a subsequent locked-or-not trigger hits the cache. The
// env-keyspace TTL is preserved (promotion owns it); the by-worker TTL
// is refreshed (sliding window keeps active workers warm).
void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry);

return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
return entry;
}

async validateQueueLimits(
Expand Down
Loading