From b158668383284a7521633cf4105527b0d3e120b6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 26 Sep 2025 11:50:00 +0100 Subject: [PATCH] chore(run-engine): add additional logging around dequeueing and worker queues --- .../run-engine/src/engine/index.ts | 5 +++++ .../src/engine/systems/dequeueSystem.ts | 9 +++++++++ .../run-engine/src/run-queue/index.ts | 17 +++++++++-------- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index f9b3061e91..0ce4db8859 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1205,6 +1205,11 @@ export class RunEngine { throw new NotImplementedError("There shouldn't be a heartbeat for QUEUED_EXECUTING"); } case "PENDING_EXECUTING": { + this.logger.log("RunEngine stalled snapshot PENDING_EXECUTING", { + runId, + snapshotId: latestSnapshot.id, + }); + //the run didn't start executing, we need to requeue it const run = await prisma.taskRun.findFirst({ where: { id: runId }, diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index a19c80dd78..a8316a9880 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -143,6 +143,15 @@ export class DequeueSystem { const orgId = message.message.orgId; const runId = message.messageId; + this.$.logger.info("DequeueSystem.dequeueFromWorkerQueue dequeued message", { + runId, + orgId, + environmentId: message.message.environmentId, + environmentType: message.message.environmentType, + workerQueueLength: message.workerQueueLength ?? 0, + workerQueue, + }); + span.setAttribute("run_id", runId); span.setAttribute("org_id", orgId); span.setAttribute("environment_id", message.message.environmentId); diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 81cbb0379c..1d6295cb00 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -1369,27 +1369,28 @@ export class RunQueue { const pipeline = this.redis.pipeline(); - const workerQueueKeys = new Set(); + const operations = []; for (const message of messages) { const workerQueueKey = this.keys.workerQueueKey( this.#getWorkerQueueFromMessage(message.message) ); - workerQueueKeys.add(workerQueueKey); - const messageKeyValue = this.keys.messageKey(message.message.orgId, message.messageId); + operations.push({ + workerQueueKey: workerQueueKey, + messageId: message.messageId, + }); + pipeline.rpush(workerQueueKey, messageKeyValue); } - span.setAttribute("worker_queue_count", workerQueueKeys.size); - span.setAttribute("worker_queue_keys", Array.from(workerQueueKeys)); + span.setAttribute("operations_count", operations.length); - this.logger.debug("enqueueMessagesToWorkerQueues pipeline", { + this.logger.info("enqueueMessagesToWorkerQueues", { service: this.name, - messages, - workerQueueKeys: Array.from(workerQueueKeys), + operations, }); await pipeline.exec();