diff --git a/internal-packages/database/prisma/migrations/20250902112516_add_locked_retry_config_to_task_run/migration.sql b/internal-packages/database/prisma/migrations/20250902112516_add_locked_retry_config_to_task_run/migration.sql new file mode 100644 index 0000000000..22cde96c6d --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250902112516_add_locked_retry_config_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskRun" ADD COLUMN "lockedRetryConfig" JSONB; diff --git a/internal-packages/database/prisma/migrations/migration_lock.toml b/internal-packages/database/prisma/migrations/migration_lock.toml index fbffa92c2b..99e4f20090 100644 --- a/internal-packages/database/prisma/migrations/migration_lock.toml +++ b/internal-packages/database/prisma/migrations/migration_lock.toml @@ -1,3 +1,3 @@ # Please do not edit this file manually # It should be added in your version-control system (i.e. Git) -provider = "postgresql" \ No newline at end of file +provider = "postgresql" diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index ba5a8de266..be10d47f71 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -151,7 +151,7 @@ model OrganizationAccessToken { /// This is used to find the token in the database hashedToken String @unique - organization Organization @relation(fields: [organizationId], references: [id]) + organization Organization @relation(fields: [organizationId], references: [id]) organizationId String /// Optional expiration date for the token @@ -648,11 +648,12 @@ model TaskRun { concurrencyKey String? - delayUntil DateTime? - queuedAt DateTime? - ttl String? - expiredAt DateTime? - maxAttempts Int? + delayUntil DateTime? + queuedAt DateTime? + ttl String? + expiredAt DateTime? + maxAttempts Int? + lockedRetryConfig Json? /// optional token that can be used to authenticate the task run oneTimeUseToken String? diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts index a621552e92..8001c5d8a2 100644 --- a/internal-packages/run-engine/src/engine/retrying.ts +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -3,6 +3,7 @@ import { isOOMRunError, RetryOptions, sanitizeError, + shouldLookupRetrySettings, shouldRetryError, TaskRunError, taskRunErrorEnhancer, @@ -72,13 +73,11 @@ export async function retryOutcomeFromCompletion( }; } - // No retry settings - if (!retrySettings) { - return { outcome: "fail_run", sanitizedError }; - } + const enhancedError = taskRunErrorEnhancer(error); // Not a retriable error: fail - const retriableError = shouldRetryError(taskRunErrorEnhancer(error)); + const retriableError = shouldRetryError(enhancedError); + if (!retriableError) { return { outcome: "fail_run", sanitizedError }; } @@ -95,6 +94,7 @@ export async function retryOutcomeFromCompletion( }, select: { maxAttempts: true, + lockedRetryConfig: true, }, }); @@ -112,6 +112,48 @@ export async function retryOutcomeFromCompletion( return { outcome: "fail_run", sanitizedError }; } + // No retry settings + if (!retrySettings) { + const shouldLookup = shouldLookupRetrySettings(enhancedError); + + if (!shouldLookup) { + return { outcome: "fail_run", sanitizedError }; + } + + const retryConfig = run.lockedRetryConfig; + + if (!retryConfig) { + return { outcome: "fail_run", sanitizedError }; + } + + const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig); + + if (!parsedRetryConfig.success) { + return { outcome: "fail_run", sanitizedError }; + } + + if (!parsedRetryConfig.data) { + return { outcome: "fail_run", sanitizedError }; + } + + const nextDelay = calculateNextRetryDelay(parsedRetryConfig.data, attemptNumber ?? 1); + + if (!nextDelay) { + return { outcome: "fail_run", sanitizedError }; + } + + const retrySettings = { + timestamp: Date.now() + nextDelay, + delay: nextDelay, + }; + + return { + outcome: "retry", + method: "queue", // we'll always retry on the queue because usually having no settings means something bad happened + settings: retrySettings, + }; + } + return { outcome: "retry", method: retryUsingQueue ? "queue" : "immediate", @@ -130,19 +172,15 @@ async function retryOOMOnMachine( }, select: { machinePreset: true, - lockedBy: { - select: { - retryConfig: true, - }, - }, + lockedRetryConfig: true, }, }); - if (!run || !run.lockedBy || !run.machinePreset) { + if (!run || !run.lockedRetryConfig || !run.machinePreset) { return; } - const retryConfig = run.lockedBy?.retryConfig; + const retryConfig = run.lockedRetryConfig; const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig); if (!parsedRetryConfig.success) { diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 922aef4e45..1fdc485543 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -403,6 +403,9 @@ export class DequeueSystem { result.run.maxDurationInSeconds, result.task.maxDurationInSeconds ); + const lockedRetryConfig = result.run.lockedRetryConfig + ? undefined + : result.task.retryConfig; const lockedTaskRun = await prisma.taskRun.update({ where: { @@ -413,6 +416,7 @@ export class DequeueSystem { lockedById: result.task.id, lockedToVersionId: result.worker.id, lockedQueueId: result.queue.id, + lockedRetryConfig: lockedRetryConfig ?? undefined, status: "DEQUEUED", startedAt, baseCostInCents: this.options.machines.baseCostInCents, diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index b585b938c8..35cbc3ea4d 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -346,6 +346,32 @@ export function shouldRetryError(error: TaskRunError): boolean { } } +export function shouldLookupRetrySettings(error: TaskRunError): boolean { + switch (error.type) { + case "INTERNAL_ERROR": { + switch (error.code) { + case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE": + return true; + + default: + return false; + } + } + case "STRING_ERROR": { + return false; + } + case "BUILT_IN_ERROR": { + return false; + } + case "CUSTOM_ERROR": { + return false; + } + default: { + assertExhaustive(error); + } + } +} + export function correctErrorStackTrace( stackTrace: string, projectDir?: string,