Skip to content

Commit

Permalink
v3: fix version locking (#1113)
Browse files Browse the repository at this point in the history
* fix prod version locking

* fix worker version display
  • Loading branch information
nicktrn committed May 21, 2024
1 parent a5cba37 commit f243eab
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 10 deletions.
1 change: 1 addition & 0 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ export class DevQueueConsumer {
lockedAt: new Date(),
lockedById: backgroundTask.id,
status: "EXECUTING",
lockedToVersionId: backgroundWorker.id,
},
include: {
attempts: {
Expand Down
14 changes: 12 additions & 2 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { socketIo } from "../handleSocketIo.server";
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
import {
findCurrentWorkerDeployment,
getWorkerDeploymentFromWorker,
getWorkerDeploymentFromWorkerTask,
} from "../models/workerDeployment.server";
import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server";
import { CrashTaskRunService } from "../services/crashTaskRun.server";
Expand Down Expand Up @@ -299,7 +303,12 @@ export class SharedQueueConsumer {
return;
}

const deployment = await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId);
// Check if the task run is locked to a specific worker, if not, use the current worker deployment
const deployment = existingTaskRun.lockedById
? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById)
: existingTaskRun.lockedToVersionId
? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId)
: await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId);

if (!deployment || !deployment.worker) {
logger.error("No matching deployment found for task run", {
Expand Down Expand Up @@ -374,6 +383,7 @@ export class SharedQueueConsumer {
data: {
lockedAt: new Date(),
lockedById: backgroundTask.id,
lockedToVersionId: deployment.worker.id,
},
include: {
runtimeEnvironment: true,
Expand Down
84 changes: 76 additions & 8 deletions apps/webapp/app/v3/models/workerDeployment.server.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import type { Prettify } from "@trigger.dev/core";
import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
import { prisma } from "~/db.server";
import { Prisma, prisma } from "~/db.server";

export type CurrentWorkerDeployment = Prettify<NonNullable<Awaited<ReturnType<typeof findCurrentWorkerDeployment>>>>;
export type CurrentWorkerDeployment = Prettify<
NonNullable<Awaited<ReturnType<typeof findCurrentWorkerDeployment>>>
>;

export async function findCurrentWorkerDeployment(environmentId: string) {
type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{
include: {
worker: {
include: {
tasks: true;
};
};
};
}>;

export async function findCurrentWorkerDeployment(
environmentId: string
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
const promotion = await prisma.workerDeploymentPromotion.findUnique({
where: {
environmentId_label: {
environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
}
},
},
include: {
deployment: {
Expand All @@ -20,10 +34,64 @@ export async function findCurrentWorkerDeployment(environmentId: string) {
tasks: true,
},
},
}
}
}
},
},
},
});

return promotion?.deployment;
}
}

export async function getWorkerDeploymentFromWorker(
workerId: string
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
const worker = await prisma.backgroundWorker.findUnique({
where: {
id: workerId,
},
include: {
deployment: true,
tasks: true,
},
});

if (!worker?.deployment) {
return;
}

const { deployment, ...workerWithoutDeployment } = worker;

return {
...deployment,
worker: workerWithoutDeployment,
};
}

export async function getWorkerDeploymentFromWorkerTask(
workerTaskId: string
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
const workerTask = await prisma.backgroundWorkerTask.findUnique({
where: {
id: workerTaskId,
},
include: {
worker: {
include: {
deployment: true,
tasks: true,
},
},
},
});

if (!workerTask?.worker.deployment) {
return;
}

const { deployment, ...workerWithoutDeployment } = workerTask.worker;

return {
...deployment,
worker: workerWithoutDeployment,
};
}

0 comments on commit f243eab

Please sign in to comment.