Skip to content

Commit

Permalink
v3: checkpoint failover and misc fixes (#1157)
Browse files Browse the repository at this point in the history
* configurable checkpoint registry namespace

* add missing task create await

* remove unused messages

* changeset

* update self-hosting docs

* capture and display stderr for failed deploys

* add missing lockfile changes

* stderr changeset

* fix cli stderr message

* update error logs label
  • Loading branch information
nicktrn committed Jun 11, 2024
1 parent 36ac79a commit 68d3242
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 105 deletions.
7 changes: 7 additions & 0 deletions .changeset/rude-toys-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core-apps": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Capture and display stderr on index failures
7 changes: 7 additions & 0 deletions .changeset/slow-sloths-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core-apps": patch
"@trigger.dev/core": patch
---

- Fix uncaught provider exception
- Remove unused provider messages
3 changes: 2 additions & 1 deletion apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const SIMULATE_CHECKPOINT_FAILURE_SECONDS = parseInt(
);

const REGISTRY_HOST = process.env.REGISTRY_HOST || "localhost:5000";
const REGISTRY_NAMESPACE = process.env.REGISTRY_NAMESPACE || "trigger";
const CHECKPOINT_PATH = process.env.CHECKPOINT_PATH || "/checkpoints";
const REGISTRY_TLS_VERIFY = process.env.REGISTRY_TLS_VERIFY === "false" ? "false" : "true";

Expand Down Expand Up @@ -179,7 +180,7 @@ class Checkpointer {
}

#getImageRef(projectRef: string, deploymentVersion: string, shortCode: string) {
return `${REGISTRY_HOST}/trigger/${projectRef}:${deploymentVersion}.prod-${shortCode}`;
return `${REGISTRY_HOST}/${REGISTRY_NAMESPACE}/${projectRef}:${deploymentVersion}.prod-${shortCode}`;
}

#getExportLocation(projectRef: string, deploymentVersion: string, shortCode: string) {
Expand Down
48 changes: 17 additions & 31 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,23 @@ class DockerTaskOperations implements TaskOperations {
port: COORDINATOR_PORT,
});

try {
logger.debug(
await execa("docker", [
"run",
"--network=host",
"--rm",
`--env=INDEX_TASKS=true`,
`--env=TRIGGER_SECRET_KEY=${opts.apiKey}`,
`--env=TRIGGER_API_URL=${opts.apiUrl}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}`,
`--env=POD_NAME=${containerName}`,
`--env=COORDINATOR_HOST=${COORDINATOR_HOST}`,
`--env=COORDINATOR_PORT=${COORDINATOR_PORT}`,
`--name=${containerName}`,
`${opts.imageRef}`,
])
);
} catch (error: any) {
if (!isExecaChildProcess(error)) {
throw error;
}

logger.error("Index failed:", {
opts,
exitCode: error.exitCode,
escapedCommand: error.escapedCommand,
stdout: error.stdout,
stderr: error.stderr,
});
}
logger.debug(
await execa("docker", [
"run",
"--network=host",
"--rm",
`--env=INDEX_TASKS=true`,
`--env=TRIGGER_SECRET_KEY=${opts.apiKey}`,
`--env=TRIGGER_API_URL=${opts.apiUrl}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}`,
`--env=POD_NAME=${containerName}`,
`--env=COORDINATOR_HOST=${COORDINATOR_HOST}`,
`--env=COORDINATOR_PORT=${COORDINATOR_PORT}`,
`--name=${containerName}`,
`${opts.imageRef}`,
])
);
}

async create(opts: TaskOperationsCreateOptions) {
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/components/runs/v3/DeploymentError.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ export function DeploymentError({ errorData }: DeploymentErrorProps) {
maxLines={20}
/>
)}
{errorData.stderr && (
<>
<DeploymentErrorHeader title="Error logs:" />
<CodeBlock
showCopyButton={false}
showLineNumbers={false}
code={errorData.stderr}
maxLines={20}
/>
</>
)}
</div>
);
}
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type ErrorData = {
name: string;
message: string;
stack?: string;
stderr?: string;
};

export class DeploymentPresenter {
Expand Down Expand Up @@ -177,17 +178,20 @@ export class DeploymentPresenter {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stack: createTaskMetadataFailedErrorStack(parsedError.data),
stderr: parsedErrorData.data.stderr,
};
} else {
return {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stderr: parsedErrorData.data.stderr,
};
}
} else {
return {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stderr: parsedErrorData.data.stderr,
};
}
}
Expand All @@ -196,6 +200,7 @@ export class DeploymentPresenter {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stack: parsedErrorData.data.stack,
stderr: parsedErrorData.data.stderr,
};
}
}
Expand Down
41 changes: 37 additions & 4 deletions apps/webapp/app/v3/services/deploymentIndexFailed.server.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,63 @@
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { WorkerDeploymentStatus } from "@trigger.dev/database";

const FINAL_DEPLOYMENT_STATUSES: WorkerDeploymentStatus[] = [
"CANCELED",
"DEPLOYED",
"FAILED",
"TIMED_OUT",
];

export class DeploymentIndexFailed extends BaseService {
public async call(
maybeFriendlyId: string,
error: { name: string; message: string; stack?: string }
error: {
name: string;
message: string;
stack?: string;
stderr?: string;
}
) {
const isFriendlyId = maybeFriendlyId.startsWith("deployment_");

const deployment = await this._prisma.workerDeployment.update({
const deployment = await this._prisma.workerDeployment.findUnique({
where: isFriendlyId
? {
friendlyId: maybeFriendlyId,
}
: {
id: maybeFriendlyId,
},
});

if (!deployment) {
logger.error("Worker deployment not found", { maybeFriendlyId });
return;
}

if (FINAL_DEPLOYMENT_STATUSES.includes(deployment.status)) {
logger.error("Worker deployment already in final state", {
id: deployment.id,
status: deployment.status,
});
return;
}

const failedDeployment = await this._prisma.workerDeployment.update({
where: {
id: deployment.id,
},
data: {
status: "FAILED",
failedAt: new Date(),
errorData: error,
},
});

await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma);
await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma);

return deployment;
return failedDeployment;
}
}
4 changes: 3 additions & 1 deletion docs/v3/open-source-self-hosting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ scp -3 root@<webapp_machine>:docker/.env root@<worker_machine>:docker/.env
Checkpointing allows you to save the state of a running container to disk and restore it later. This can be useful for
long-running tasks that need to be paused and resumed without losing state. Think fan-out and fan-in, or long waits in email campaigns.

The checkpoints will be pushed to the same registry as the deployed images. Please see the [Registry setup](#registry-setup) section for more information.

### Requirements

- Debian, **NOT** a derivative like Ubuntu
Expand All @@ -225,7 +227,7 @@ sudo apt-get install criu
2. Tweak the config so we can successfully checkpoint our workloads

```bash
mkdir /etc/criu
mkdir -p /etc/criu

cat << EOF >/etc/criu/runc.conf
tcp-close
Expand Down
4 changes: 4 additions & 0 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {

await preExitTasks();

if (finishedDeployment.errorData.stderr) {
log.error(`stderr:\n${finishedDeployment.errorData.stderr}`);
}

throw new SkipLoggingError(
`Deployment encountered an error: ${finishedDeployment.errorData.name}`
);
Expand Down
34 changes: 18 additions & 16 deletions packages/cli-v3/src/workers/prod/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class ProdBackgroundWorker {
private _onClose: Evt<void> = new Evt();

public tasks: Array<TaskMetadataWithFilePath> = [];
public stderr: Array<string> = [];

_taskRunProcess: TaskRunProcess | undefined;
private _taskRunProcessesBeingKilled: Map<number, TaskRunProcess> = new Map();
Expand Down Expand Up @@ -161,6 +162,23 @@ export class ProdBackgroundWorker {
reject(new Error("Worker timed out"));
}, 10_000);

child.stdout?.on("data", (data) => {
console.log(data.toString());
});

child.stderr?.on("data", (data) => {
console.error(data.toString());
this.stderr.push(data.toString());
});

child.on("exit", (code) => {
if (!resolved) {
clearTimeout(timeout);
resolved = true;
reject(new Error(`Worker exited with code ${code}`));
}
});

new ZodIpcConnection({
listenSchema: ProdChildToWorkerMessages,
emitSchema: ProdWorkerToChildMessages,
Expand Down Expand Up @@ -192,22 +210,6 @@ export class ProdBackgroundWorker {
},
},
});

child.stdout?.on("data", (data) => {
console.log(data.toString());
});

child.stderr?.on("data", (data) => {
console.error(data.toString());
});

child.on("exit", (code) => {
if (!resolved) {
clearTimeout(timeout);
resolved = true;
reject(new Error(`Worker exited with code ${code}`));
}
});
});

this._initialized = true;
Expand Down
7 changes: 7 additions & 0 deletions packages/cli-v3/src/workers/prod/entry-point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ class ProdWorker {
process.exit(1);
}
} catch (e) {
const stderr = this.#backgroundWorker.stderr.join("\n");

if (e instanceof TaskMetadataParseError) {
logger.error("tasks metadata parse error", {
zodIssues: e.zodIssues,
Expand All @@ -647,13 +649,15 @@ class ProdWorker {
name: "TaskMetadataParseError",
message: "There was an error parsing the task metadata",
stack: JSON.stringify({ zodIssues: e.zodIssues, tasks: e.tasks }),
stderr,
},
});
} else if (e instanceof UncaughtExceptionError) {
const error = {
name: e.originalError.name,
message: e.originalError.message,
stack: e.originalError.stack,
stderr,
};

logger.error("uncaught exception", { originalError: error });
Expand All @@ -668,6 +672,7 @@ class ProdWorker {
name: e.name,
message: e.message,
stack: e.stack,
stderr,
};

logger.error("error", { error });
Expand All @@ -686,6 +691,7 @@ class ProdWorker {
error: {
name: "Error",
message: e,
stderr,
},
});
} else {
Expand All @@ -697,6 +703,7 @@ class ProdWorker {
error: {
name: "Error",
message: "Unknown error",
stderr,
},
});
}
Expand Down
Loading

0 comments on commit 68d3242

Please sign in to comment.