From f243eab9c9537cf2dbb80ba92c3e935df0bbd9a3 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 21 May 2024 08:35:35 +0100 Subject: [PATCH] v3: fix version locking (#1113) * fix prod version locking * fix worker version display --- .../app/v3/marqs/devQueueConsumer.server.ts | 1 + .../v3/marqs/sharedQueueConsumer.server.ts | 14 +++- .../app/v3/models/workerDeployment.server.ts | 84 +++++++++++++++++-- 3 files changed, 89 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 90ffcd92dc..d26759ec2a 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -421,6 +421,7 @@ export class DevQueueConsumer { lockedAt: new Date(), lockedById: backgroundTask.id, status: "EXECUTING", + lockedToVersionId: backgroundWorker.id, }, include: { attempts: { diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index d56af33aaa..b71b1176d0 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -25,7 +25,11 @@ import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { socketIo } from "../handleSocketIo.server"; -import { findCurrentWorkerDeployment } from "../models/workerDeployment.server"; +import { + findCurrentWorkerDeployment, + getWorkerDeploymentFromWorker, + getWorkerDeploymentFromWorkerTask, +} from "../models/workerDeployment.server"; import { RestoreCheckpointService } from "../services/restoreCheckpoint.server"; import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server"; import { CrashTaskRunService } from "../services/crashTaskRun.server"; @@ -299,7 +303,12 @@ export class SharedQueueConsumer { return; } - const deployment = await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId); + // Check if the task run is locked to a specific worker, if not, use the current worker deployment + const deployment = existingTaskRun.lockedById + ? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById) + : existingTaskRun.lockedToVersionId + ? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId) + : await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId); if (!deployment || !deployment.worker) { logger.error("No matching deployment found for task run", { @@ -374,6 +383,7 @@ export class SharedQueueConsumer { data: { lockedAt: new Date(), lockedById: backgroundTask.id, + lockedToVersionId: deployment.worker.id, }, include: { runtimeEnvironment: true, diff --git a/apps/webapp/app/v3/models/workerDeployment.server.ts b/apps/webapp/app/v3/models/workerDeployment.server.ts index e9e19c1b5e..211a20f5cd 100644 --- a/apps/webapp/app/v3/models/workerDeployment.server.ts +++ b/apps/webapp/app/v3/models/workerDeployment.server.ts @@ -1,16 +1,30 @@ import type { Prettify } from "@trigger.dev/core"; import { CURRENT_DEPLOYMENT_LABEL } from "~/consts"; -import { prisma } from "~/db.server"; +import { Prisma, prisma } from "~/db.server"; -export type CurrentWorkerDeployment = Prettify>>>; +export type CurrentWorkerDeployment = Prettify< + NonNullable>> +>; -export async function findCurrentWorkerDeployment(environmentId: string) { +type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{ + include: { + worker: { + include: { + tasks: true; + }; + }; + }; +}>; + +export async function findCurrentWorkerDeployment( + environmentId: string +): Promise { const promotion = await prisma.workerDeploymentPromotion.findUnique({ where: { environmentId_label: { environmentId, label: CURRENT_DEPLOYMENT_LABEL, - } + }, }, include: { deployment: { @@ -20,10 +34,64 @@ export async function findCurrentWorkerDeployment(environmentId: string) { tasks: true, }, }, - } - } - } + }, + }, + }, }); return promotion?.deployment; -} \ No newline at end of file +} + +export async function getWorkerDeploymentFromWorker( + workerId: string +): Promise { + const worker = await prisma.backgroundWorker.findUnique({ + where: { + id: workerId, + }, + include: { + deployment: true, + tasks: true, + }, + }); + + if (!worker?.deployment) { + return; + } + + const { deployment, ...workerWithoutDeployment } = worker; + + return { + ...deployment, + worker: workerWithoutDeployment, + }; +} + +export async function getWorkerDeploymentFromWorkerTask( + workerTaskId: string +): Promise { + const workerTask = await prisma.backgroundWorkerTask.findUnique({ + where: { + id: workerTaskId, + }, + include: { + worker: { + include: { + deployment: true, + tasks: true, + }, + }, + }, + }); + + if (!workerTask?.worker.deployment) { + return; + } + + const { deployment, ...workerWithoutDeployment } = workerTask.worker; + + return { + ...deployment, + worker: workerWithoutDeployment, + }; +}