Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/add-is-replay-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `isReplay` boolean to the run context (`ctx.run.isReplay`), derived from the existing `replayedFromTaskRunFriendlyId` database field. Defaults to `false` for backwards compatibility.
15 changes: 4 additions & 11 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ export type PromptSpanData = {
config?: string;
};

function extractPromptSpanData(
properties: Record<string, unknown>
): PromptSpanData | undefined {
function extractPromptSpanData(properties: Record<string, unknown>): PromptSpanData | undefined {
// Properties come as an unflattened nested object from ClickHouse,
// e.g. { prompt: { slug: "...", version: 3, ... } }
const prompt = properties.prompt;
Expand Down Expand Up @@ -592,10 +590,7 @@ export class SpanPresenter extends BasePresenter {
triggeredRuns,
aiData:
span.properties && typeof span.properties === "object"
? extractAISpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
)
? extractAISpanData(span.properties as Record<string, unknown>, span.duration / 1_000_000)
: undefined,
};

Expand Down Expand Up @@ -739,10 +734,7 @@ export class SpanPresenter extends BasePresenter {
"ai.streamObject",
];

if (
typeof span.message === "string" &&
AI_SUMMARY_MESSAGES.includes(span.message)
) {
if (typeof span.message === "string" && AI_SUMMARY_MESSAGES.includes(span.message)) {
const aiSummaryData = extractAISummarySpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
Expand Down Expand Up @@ -899,6 +891,7 @@ export class SpanPresenter extends BasePresenter {
createdAt: run.createdAt,
tags: run.runTags,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
durationMs: run.usageDurationMs,
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ export class DevQueueConsumer {
runId: lockedTaskRun.friendlyId,
messageId: lockedTaskRun.id,
isTest: lockedTaskRun.isTest,
isReplay: !!lockedTaskRun.replayedFromTaskRunFriendlyId,
metrics: [
{
name: "start",
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,7 @@ export const AttemptForExecutionGetPayload = {
createdAt: true,
startedAt: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
metadata: true,
metadataType: true,
idempotencyKey: true,
Expand Down Expand Up @@ -1726,6 +1727,7 @@ class SharedQueueTasks {
startedAt: taskRun.startedAt ?? taskRun.createdAt,
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
isReplay: !!taskRun.replayedFromTaskRunFriendlyId,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
durationMs: taskRun.usageDurationMs,
costInCents: taskRun.costInCents,
Expand Down Expand Up @@ -2045,6 +2047,7 @@ class SharedQueueTasks {
traceContext: true,
friendlyId: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
lockedBy: {
select: {
machineConfig: true,
Expand Down Expand Up @@ -2090,6 +2093,7 @@ class SharedQueueTasks {
runId: run.friendlyId,
messageId: run.id,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
attemptCount,
metrics: [],
} satisfies TaskRunExecutionLazyAttemptPayload;
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ export class CreateTaskRunAttemptService extends BaseService {
createdAt: taskRun.createdAt,
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
isReplay: !!taskRun.replayedFromTaskRunFriendlyId,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
durationMs: taskRun.usageDurationMs,
Expand Down
3 changes: 3 additions & 0 deletions docs/context.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export const parentTask = task({
<ResponseField name="isTest" type="boolean">
Whether this is a [test run](/run-tests).
</ResponseField>
<ResponseField name="isReplay" type="boolean">
Whether this run is a [replay](/replaying) of a previous run.
</ResponseField>
<ResponseField name="createdAt" type="date">
The creation time of the task run.
</ResponseField>
Expand Down
15 changes: 15 additions & 0 deletions docs/replaying.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ description: "A replay is a copy of a run with the same payload but against the
</Tab>
</Tabs>

### Detecting replays in your task

You can check if a run is a replay using the [context](/context) object:

```ts
export const myTask = task({
id: "my-task",
run: async (payload, { ctx }) => {
if (ctx.run.isReplay) {
// This run is a replay of a previous run
}
},
});
```

### Replaying using the SDK

You can replay a run using the SDK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ export class DequeueSystem {
id: lockedTaskRun.id,
friendlyId: lockedTaskRun.friendlyId,
isTest: lockedTaskRun.isTest,
isReplay: !!lockedTaskRun.replayedFromTaskRunFriendlyId,
machine: machinePreset,
attemptNumber: nextAttemptNumber,
// Keeping this for backwards compatibility, but really this should be called workerQueue
Expand Down
88 changes: 46 additions & 42 deletions internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export class RunAttemptSystem {
machinePreset: true,
runTags: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
Expand Down Expand Up @@ -232,9 +233,9 @@ export class RunAttemptSystem {
run.lockedById
? this.#resolveTaskRunExecutionTask(run.lockedById)
: Promise.resolve({
id: run.taskIdentifier,
filePath: "unknown",
}),
id: run.taskIdentifier,
filePath: "unknown",
}),
this.#resolveTaskRunExecutionQueue({
lockedQueueId: run.lockedQueueId ?? undefined,
queueName: run.queue,
Expand All @@ -245,13 +246,13 @@ export class RunAttemptSystem {
run.lockedById
? this.#resolveTaskRunExecutionMachinePreset(run.lockedById, run.machinePreset)
: Promise.resolve(
getMachinePreset({
defaultMachine: this.options.machines.defaultMachine,
machines: this.options.machines.machines,
config: undefined,
run,
})
),
getMachinePreset({
defaultMachine: this.options.machines.defaultMachine,
machines: this.options.machines.machines,
config: undefined,
run,
})
),
run.lockedById
? this.#resolveTaskRunExecutionDeployment(run.lockedById)
: Promise.resolve(undefined),
Expand All @@ -262,6 +263,7 @@ export class RunAttemptSystem {
id: run.friendlyId,
tags: run.runTags,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
createdAt: run.createdAt,
startedAt: run.startedAt ?? run.createdAt,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
Expand Down Expand Up @@ -426,6 +428,7 @@ export class RunAttemptSystem {
payloadType: true,
runTags: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
Expand Down Expand Up @@ -459,8 +462,9 @@ export class RunAttemptSystem {
run,
snapshot: {
executionStatus: "EXECUTING",
description: `Attempt created, starting execution${isWarmStart ? " (warm start)" : ""
}`,
description: `Attempt created, starting execution${
isWarmStart ? " (warm start)" : ""
}`,
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
Expand Down Expand Up @@ -574,6 +578,7 @@ export class RunAttemptSystem {
createdAt: updatedRun.createdAt,
tags: updatedRun.runTags,
isTest: updatedRun.isTest,
isReplay: !!updatedRun.replayedFromTaskRunFriendlyId,
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
idempotencyKeyScope: extractIdempotencyKeyScope(updatedRun),
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
Expand Down Expand Up @@ -618,8 +623,8 @@ export class RunAttemptSystem {
deployment,
batch: updatedRun.batchId
? {
id: BatchId.toFriendlyId(updatedRun.batchId),
}
id: BatchId.toFriendlyId(updatedRun.batchId),
}
: undefined,
};

Expand Down Expand Up @@ -1387,8 +1392,8 @@ export class RunAttemptSystem {
error,
bulkActionGroupIds: bulkActionId
? {
push: bulkActionId,
}
push: bulkActionId,
}
: undefined,
...(usageUpdate && {
usageDurationMs: usageUpdate.usageDurationMs,
Expand Down Expand Up @@ -1876,26 +1881,26 @@ export class RunAttemptSystem {
const result = await this.cache.queues.swr(cacheKey, async () => {
const queue = params.lockedQueueId
? await this.$.readOnlyPrisma.taskQueue.findFirst({
where: {
id: params.lockedQueueId,
},
select: {
id: true,
friendlyId: true,
name: true,
},
})
where: {
id: params.lockedQueueId,
},
select: {
id: true,
friendlyId: true,
name: true,
},
})
: await this.$.readOnlyPrisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: params.runtimeEnvironmentId,
name: params.queueName,
},
select: {
id: true,
friendlyId: true,
name: true,
},
});
where: {
runtimeEnvironmentId: params.runtimeEnvironmentId,
name: params.queueName,
},
select: {
id: true,
friendlyId: true,
name: true,
},
});

if (!queue) {
// Return synthetic queue so run/span view still loads (e.g. createFailedTaskRun with fallback queue)
Expand Down Expand Up @@ -2068,13 +2073,13 @@ export class RunAttemptSystem {
if (environmentType !== "DEVELOPMENT") {
const machinePreset = machinePresetName
? machinePresetFromName(
this.options.machines.machines,
machinePresetName as MachinePresetName
)
this.options.machines.machines,
machinePresetName as MachinePresetName
)
: machinePresetFromName(
this.options.machines.machines,
this.options.machines.defaultMachine
);
this.options.machines.machines,
this.options.machines.defaultMachine
);

costInCents = currentCostInCents + attemptDurationMs * machinePreset.centsPerMs;
}
Expand All @@ -2084,7 +2089,6 @@ export class RunAttemptSystem {
costInCents,
};
}

}

export function safeParseGitMeta(git: unknown): GitMeta | undefined {
Expand Down
14 changes: 8 additions & 6 deletions packages/core/src/v3/schemas/common.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ export const TaskRun = z.object({
payloadType: z.string(),
tags: z.array(z.string()),
isTest: z.boolean().default(false),
isReplay: z.boolean().default(false),
createdAt: z.coerce.date(),
startedAt: z.coerce.date().default(() => new Date()),
/** The user-provided idempotency key (not the hash) */
Expand Down Expand Up @@ -378,6 +379,7 @@ export const V3TaskRun = z.object({
payloadType: z.string(),
tags: z.array(z.string()),
isTest: z.boolean().default(false),
isReplay: z.boolean().default(false),
createdAt: z.coerce.date(),
startedAt: z.coerce.date().default(() => new Date()),
/** The user-provided idempotency key (not the hash) */
Expand Down Expand Up @@ -538,13 +540,13 @@ export type WaitpointTokenResult = z.infer<typeof WaitpointTokenResult>;

export type WaitpointTokenTypedResult<T> =
| {
ok: true;
output: T;
}
ok: true;
output: T;
}
| {
ok: false;
error: Error;
};
ok: false;
error: Error;
};

export const SerializedError = z.object({
message: z.string(),
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export const DequeuedMessage = z.object({
id: z.string(),
friendlyId: z.string(),
isTest: z.boolean(),
isReplay: z.boolean().default(false),
Comment thread
nicktrn marked this conversation as resolved.
machine: MachinePreset,
attemptNumber: z.number(),
masterQueue: z.string(),
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ export const TaskRunExecutionLazyAttemptPayload = z.object({
attemptCount: z.number().optional(),
messageId: z.string(),
isTest: z.boolean(),
isReplay: z.boolean().default(false),
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
traceContext: z.record(z.unknown()),
environment: z.record(z.string()).optional(),
metrics: TaskRunExecutionMetrics.optional(),
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/semanticInternalAttributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const SemanticInternalAttributes = {
ATTEMPT_NUMBER: "ctx.attempt.number",
RUN_ID: "ctx.run.id",
RUN_IS_TEST: "ctx.run.isTest",
RUN_IS_REPLAY: "ctx.run.isReplay",
ORIGINAL_RUN_ID: "$original_run_id",
BATCH_ID: "ctx.batch.id",
TASK_SLUG: "ctx.task.id",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/taskContext/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class TaskContextAPI {
[SemanticInternalAttributes.QUEUE_ID]: this.ctx.queue.id,
[SemanticInternalAttributes.RUN_ID]: this.ctx.run.id,
[SemanticInternalAttributes.RUN_IS_TEST]: this.ctx.run.isTest,
[SemanticInternalAttributes.RUN_IS_REPLAY]: this.ctx.run.isReplay,
[SemanticInternalAttributes.BATCH_ID]: this.ctx.batch?.id,
[SemanticInternalAttributes.IDEMPOTENCY_KEY]: this.ctx.run.idempotencyKey,
};
Expand Down
Loading