Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "lockedRetryConfig" JSONB;
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "postgresql"
provider = "postgresql"
13 changes: 7 additions & 6 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ model OrganizationAccessToken {
/// This is used to find the token in the database
hashedToken String @unique

organization Organization @relation(fields: [organizationId], references: [id])
organization Organization @relation(fields: [organizationId], references: [id])
organizationId String

/// Optional expiration date for the token
Expand Down Expand Up @@ -648,11 +648,12 @@ model TaskRun {

concurrencyKey String?

delayUntil DateTime?
queuedAt DateTime?
ttl String?
expiredAt DateTime?
maxAttempts Int?
delayUntil DateTime?
queuedAt DateTime?
ttl String?
expiredAt DateTime?
maxAttempts Int?
lockedRetryConfig Json?

/// optional token that can be used to authenticate the task run
oneTimeUseToken String?
Expand Down
62 changes: 50 additions & 12 deletions internal-packages/run-engine/src/engine/retrying.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
isOOMRunError,
RetryOptions,
sanitizeError,
shouldLookupRetrySettings,
shouldRetryError,
TaskRunError,
taskRunErrorEnhancer,
Expand Down Expand Up @@ -72,13 +73,11 @@ export async function retryOutcomeFromCompletion(
};
}

// No retry settings
if (!retrySettings) {
return { outcome: "fail_run", sanitizedError };
}
const enhancedError = taskRunErrorEnhancer(error);

// Not a retriable error: fail
const retriableError = shouldRetryError(taskRunErrorEnhancer(error));
const retriableError = shouldRetryError(enhancedError);

if (!retriableError) {
return { outcome: "fail_run", sanitizedError };
}
Expand All @@ -95,6 +94,7 @@ export async function retryOutcomeFromCompletion(
},
select: {
maxAttempts: true,
lockedRetryConfig: true,
},
});

Expand All @@ -112,6 +112,48 @@ export async function retryOutcomeFromCompletion(
return { outcome: "fail_run", sanitizedError };
}

// No retry settings
if (!retrySettings) {
const shouldLookup = shouldLookupRetrySettings(enhancedError);

if (!shouldLookup) {
return { outcome: "fail_run", sanitizedError };
}

const retryConfig = run.lockedRetryConfig;

if (!retryConfig) {
return { outcome: "fail_run", sanitizedError };
}

const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig);

if (!parsedRetryConfig.success) {
return { outcome: "fail_run", sanitizedError };
}

if (!parsedRetryConfig.data) {
return { outcome: "fail_run", sanitizedError };
}

const nextDelay = calculateNextRetryDelay(parsedRetryConfig.data, attemptNumber ?? 1);

if (!nextDelay) {
return { outcome: "fail_run", sanitizedError };
}

const retrySettings = {
timestamp: Date.now() + nextDelay,
delay: nextDelay,
};

return {
outcome: "retry",
method: "queue", // we'll always retry on the queue because usually having no settings means something bad happened
settings: retrySettings,
};
}

return {
outcome: "retry",
method: retryUsingQueue ? "queue" : "immediate",
Expand All @@ -130,19 +172,15 @@ async function retryOOMOnMachine(
},
select: {
machinePreset: true,
lockedBy: {
select: {
retryConfig: true,
},
},
lockedRetryConfig: true,
},
});

if (!run || !run.lockedBy || !run.machinePreset) {
if (!run || !run.lockedRetryConfig || !run.machinePreset) {
return;
}

const retryConfig = run.lockedBy?.retryConfig;
const retryConfig = run.lockedRetryConfig;
const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig);

if (!parsedRetryConfig.success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ export class DequeueSystem {
result.run.maxDurationInSeconds,
result.task.maxDurationInSeconds
);
const lockedRetryConfig = result.run.lockedRetryConfig
? undefined
: result.task.retryConfig;

const lockedTaskRun = await prisma.taskRun.update({
where: {
Expand All @@ -413,6 +416,7 @@ export class DequeueSystem {
lockedById: result.task.id,
lockedToVersionId: result.worker.id,
lockedQueueId: result.queue.id,
lockedRetryConfig: lockedRetryConfig ?? undefined,
status: "DEQUEUED",
startedAt,
baseCostInCents: this.options.machines.baseCostInCents,
Expand Down
26 changes: 26 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,32 @@ export function shouldRetryError(error: TaskRunError): boolean {
}
}

export function shouldLookupRetrySettings(error: TaskRunError): boolean {
switch (error.type) {
case "INTERNAL_ERROR": {
switch (error.code) {
case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE":
return true;

default:
return false;
}
}
case "STRING_ERROR": {
return false;
}
case "BUILT_IN_ERROR": {
return false;
}
case "CUSTOM_ERROR": {
return false;
}
default: {
assertExhaustive(error);
}
}
}

export function correctErrorStackTrace(
stackTrace: string,
projectDir?: string,
Expand Down
Loading