diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 02d684a3c6..a4caf858ea 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -459,6 +459,7 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(), RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS: z.coerce.number().int().default(10), RUN_ENGINE_WORKER_REDIS_HOST: z .string() diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index ad9e1c9aeb..4788da8463 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -58,6 +58,7 @@ function createRunEngine() { maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT, tracer, }, + maxDequeueLoopAttempts: env.RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS, }, runLock: { redis: { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 91f3d12dfd..29354fe104 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -109,6 +109,7 @@ export class RunEngine { logger: new Logger("RunQueue", "debug"), redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` }, retryOptions: options.queue?.retryOptions, + maxDequeueLoopAttempts: options.queue?.maxDequeueLoopAttempts ?? 10, }); this.worker = new Worker({ diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index c7837eadd4..ccf06434ae 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -29,6 +29,7 @@ export type RunEngineOptions = { FairQueueSelectionStrategyOptions, "parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount" >; + maxDequeueLoopAttempts?: number; }; runLock: { redis: RedisOptions; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 57ba1b75b7..f2677c6490 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -51,6 +51,7 @@ export type RunQueueOptions = { verbose?: boolean; logger?: Logger; retryOptions?: RetryOptions; + maxDequeueLoopAttempts?: number; }; type DequeuedMessage = { @@ -77,6 +78,7 @@ export class RunQueue { private redis: Redis; public keys: RunQueueKeyProducer; private queueSelectionStrategy: RunQueueSelectionStrategy; + private maxDequeueLoopAttempts: number; constructor(private readonly options: RunQueueOptions) { this.retryOptions = options.retryOptions ?? defaultRetrySettings; @@ -92,6 +94,7 @@ export class RunQueue { this.keys = options.keys; this.queueSelectionStrategy = options.queueSelectionStrategy; + this.maxDequeueLoopAttempts = options.maxDequeueLoopAttempts ?? 10; this.subscriber = createRedisClient(options.redis, { onError: (error) => { @@ -393,6 +396,7 @@ export class RunQueue { let attemptedEnvs = 0; let attemptedQueues = 0; + let dequeueLoopAttempts = 0; const messages: DequeuedMessage[] = []; @@ -404,16 +408,13 @@ export class RunQueue { tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array } - // Track if we successfully dequeued any message in a complete cycle - let successfulDequeueInCycle = false; - // Continue until we've hit max count or all tenants have empty queue lists while ( messages.length < maxCount && - Object.values(tenantQueues).some((queues) => queues.length > 0) + Object.values(tenantQueues).some((queues) => queues.length > 0) && + dequeueLoopAttempts < this.maxDequeueLoopAttempts ) { - // Reset the success flag at the start of each cycle - successfulDequeueInCycle = false; + dequeueLoopAttempts++; for (const env of envQueues) { attemptedEnvs++; @@ -434,7 +435,6 @@ export class RunQueue { if (message) { messages.push(message); - successfulDequeueInCycle = true; // Re-add this queue at the end, since it might have more messages tenantQueues[env.envId].push(queue); } @@ -445,14 +445,6 @@ export class RunQueue { break; } } - - // If we completed a full cycle through all tenants with no successful dequeues, - // exit early as we're likely hitting concurrency limits or have no ready messages - if (!successfulDequeueInCycle) { - // IMPORTANT: Keep this log message as it's used in tests - this.logger.log("No successful dequeues in a full cycle, exiting..."); - break; - } } span.setAttributes({ diff --git a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts index 3d756d7692..846ad5f308 100644 --- a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts @@ -262,95 +262,4 @@ describe("RunQueue.dequeueMessageFromMasterQueue", () => { } } ); - - redisTest( - "should exit early when no messages can be dequeued in a full cycle", - async ({ redisContainer }) => { - const mockLogger = { - log: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - name: "test-logger", - level: "debug", - filteredKeys: [], - additionalFields: {}, - setLevel: vi.fn(), - setFilteredKeys: vi.fn(), - setAdditionalFields: vi.fn(), - child: vi.fn(), - }; - - const queue = new RunQueue({ - ...testOptions, - queueSelectionStrategy: new FairQueueSelectionStrategy({ - redis: { - keyPrefix: "runqueue:test:", - host: redisContainer.getHost(), - port: redisContainer.getPort(), - }, - keys: testOptions.keys, - }), - redis: { - keyPrefix: "runqueue:test:", - host: redisContainer.getHost(), - port: redisContainer.getPort(), - }, - // @ts-expect-error - logger: mockLogger, - }); - - try { - const envMasterQueue = `env:${authenticatedEnvDev.id}`; - const queueCount = 10; // Reduced for simplicity - - // First, create all queues and enqueue initial messages - for (let i = 0; i < queueCount; i++) { - const queueName = `${messageDev.queue}_${i}`; - // Set each queue's concurrency limit to 0 (this guarantees dequeue will fail) - await queue.updateQueueConcurrencyLimits(authenticatedEnvDev, queueName, 0); - - // Enqueue a message to each queue - await queue.enqueueMessage({ - env: authenticatedEnvDev, - message: { ...messageDev, runId: `r${4321 + i}`, queue: queueName }, - masterQueues: ["main", envMasterQueue], - }); - } - - // Try to dequeue messages - this should exit early due to concurrency limits - const startTime = Date.now(); - const dequeued = await queue.dequeueMessageFromMasterQueue( - "test_12345", - envMasterQueue, - queueCount - ); - const endTime = Date.now(); - - // Verify no messages were dequeued - expect(dequeued.length).toBe(0); - - // Verify the operation completed quickly (under 1000ms) - const duration = endTime - startTime; - expect(duration).toBeLessThan(1000); - - // Verify we only logged one early exit message - expect(mockLogger.log).toHaveBeenCalledWith( - expect.stringContaining("No successful dequeues in a full cycle, exiting") - ); - expect(mockLogger.log.mock.calls.length).toBeLessThanOrEqual(2); - - // Verify all messages are still in queues - let totalRemaining = 0; - for (let i = 0; i < queueCount; i++) { - const queueName = `${messageDev.queue}_${i}`; - const length = await queue.lengthOfQueue(authenticatedEnvDev, queueName); - totalRemaining += length; - } - expect(totalRemaining).toBe(queueCount); - } finally { - await queue.quit(); - } - } - ); });