Skip to content

Commit

Permalink
handle more cases of unrecoverable runs
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktrn committed May 29, 2024
1 parent 0e7e0df commit 66c9186
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,20 @@ class TaskCoordinator {
onConnection: async (socket, handler, sender) => {
const logger = new SimpleLogger(`[prod-worker][${socket.id}]`);

const crashRun = async (error: { name: string; message: string; stack?: string }) => {
try {
this.#platformSocket?.send("RUN_CRASHED", {
version: "v1",
runId: socket.data.runId,
error,
});
} finally {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}
};

const checkpointInProgress = () => {
return this.#checkpointableTasks.has(socket.data.runId);
};
Expand Down Expand Up @@ -741,8 +755,9 @@ class TaskCoordinator {
if (!executionAck) {
logger.error("no execution ack", { runId: socket.data.runId });

socket.emit("REQUEST_EXIT", {
version: "v1",
await crashRun({
name: "ReadyForExecutionError",
message: "No execution ack",
});

return;
Expand All @@ -751,8 +766,9 @@ class TaskCoordinator {
if (!executionAck.success) {
logger.error("failed to get execution payload", { runId: socket.data.runId });

socket.emit("REQUEST_EXIT", {
version: "v1",
await crashRun({
name: "ReadyForExecutionError",
message: "Failed to get execution payload",
});

return;
Expand Down Expand Up @@ -781,8 +797,9 @@ class TaskCoordinator {
if (!lazyAttempt) {
logger.error("no lazy attempt ack", { runId: socket.data.runId });

socket.emit("REQUEST_EXIT", {
version: "v1",
await crashRun({
name: "ReadyForLazyAttemptError",
message: "No lazy attempt ack",
});

return;
Expand All @@ -791,8 +808,9 @@ class TaskCoordinator {
if (!lazyAttempt.success) {
logger.error("failed to get lazy attempt payload", { runId: socket.data.runId });

socket.emit("REQUEST_EXIT", {
version: "v1",
await crashRun({
name: "ReadyForLazyAttemptError",
message: "Failed to get lazy attempt payload",
});

return;
Expand Down Expand Up @@ -1140,15 +1158,7 @@ class TaskCoordinator {
socket.on("UNRECOVERABLE_ERROR", async (message) => {
logger.log("[UNRECOVERABLE_ERROR]", message);

this.#platformSocket?.send("RUN_CRASHED", {
version: "v1",
runId: socket.data.runId,
error: message.error,
});

socket.emit("REQUEST_EXIT", {
version: "v1",
});
await crashRun(message.error);
});
},
onDisconnect: async (socket, handler, sender, logger) => {
Expand Down

0 comments on commit 66c9186

Please sign in to comment.