Skip to content

Conversation

nicktrn
Copy link
Collaborator

@nicktrn nicktrn commented Sep 1, 2025

Replaces static queue consumers with a dynamic pool that intelligently scales based on queue load using exponentially weighted moving averages and configurable scaling strategies. This improves resource utilization and response times by automatically adjusting consumer count based on actual demand.

Environment Variables

Supervisor

  • TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT - Minimum number of queue consumers, defaults to 1.
  • TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT - Maximum number of queue consumers, defaults to 10.
  • TRIGGER_DEQUEUE_SCALING_STRATEGY - Scaling strategy to use (none, smooth, aggressive), defaults to none.
  • TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS - Cooldown period after scaling up in milliseconds, defaults to 10000 (10 seconds).
  • TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS - Cooldown period after scaling down in milliseconds, defaults to 60000 (60 seconds).
  • TRIGGER_DEQUEUE_SCALING_TARGET_RATIO - Target ratio of queue items to consumers, defaults to 1.0 (1 item per consumer).
  • TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA - EWMA smoothing factor (0-1), defaults to 0.3. Lower values provide more smoothing, higher values are more responsive to recent changes.
  • TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS - Batch processing window in milliseconds, defaults to 1000. Controls how frequently scaling decisions are evaluated.

Backwards Compatibility

All new scaling configuration is optional and maintains existing behavior when not specified.

Copy link

changeset-bot bot commented Sep 1, 2025

⚠️ No Changeset found

Latest commit: 6fef333

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Sep 1, 2025

Walkthrough

Introduces a dynamic consumer pool and scaling subsystem for runEngineWorker. Adds a RunQueueConsumerPool with pluggable ScalingStrategies (“none”, “smooth”, “aggressive”), a QueueMetricsProcessor (EWMA + batch window), and Prometheus-backed ConsumerPoolMetrics. Updates SupervisorSession to use the pool and a new scaling options object, replacing the previous maxConsumerCount. Exposes new public types/exports for pool, strategies, metrics processor, and queue consumer interface. Expands env configuration to include scaling controls and consumer count bounds. Wires metrics registry and scaling config in apps/supervisor. Adds comprehensive unit tests for the pool, metrics processor, and strategies.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/dynamic-queue-consumer-pool

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (28)
.changeset/angry-frogs-fix.md (1)

5-5: Tighten the release note to reflect scope and defaults

Make the note clearer about the pool and opt-in nature.

-Add dynamic queue consumer to supervisor
+Supervisor: add dynamic queue consumer pool with EWMA-based scaling (opt‑in via env vars; defaults preserve current behavior)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (4)

6-10: Prefer a type alias over an interface for small shapes (team convention)

Aligns with “types over interfaces” guideline.

-export interface QueueConsumer {
-  start(): void;
-  stop(): void;
-}
+export type QueueConsumer = {
+  start(): void;
+  stop(): void;
+};

30-33: Store timeout handle to allow cancellation on stop()

Avoids a stray wake-up after stop; negligible but cleaner and test-friendly.

   private intervalMs: number;
   private idleIntervalMs: number;
   private isEnabled: boolean;
+  private timeoutHandle?: ReturnType<typeof setTimeout>;

54-61: Clear any pending timer in stop()

Prevents an extra dequeue tick after stop.

   stop() {
     if (!this.isEnabled) {
       return;
     }
 
     this.isEnabled = false;
+    if (this.timeoutHandle) {
+      clearTimeout(this.timeoutHandle);
+      this.timeoutHandle = undefined;
+    }
   }

139-145: Capture the timeout handle when scheduling next dequeue

Complements the stop() change.

-    setTimeout(this.dequeue.bind(this), delayMs);
+    this.timeoutHandle = setTimeout(this.dequeue.bind(this), delayMs);
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts (5)

7-7: Use type-only import for SupervisorHttpClient

Avoids pulling a runtime value that isn’t used.

-import { SupervisorHttpClient } from "./http.js";
+import type { SupervisorHttpClient } from "./http.js";

68-73: Await pool.stop() in afterEach to avoid cross-test leakage

Ensures deterministic cleanup with fake timers.

-afterEach(() => {
+afterEach(async () => {
   vi.useRealTimers();
   if (pool) {
-    pool.stop();
+    await pool.stop();
   }
 });

75-78: Avoid injecting a sentinel “0” sample to trigger batch processing

Adding 0 skews the median/EWMA and can mask regressions. Trigger batch processing without altering the sample set (e.g., re-send last value or expose a test-only tick).

Example minimal change (re-send last known value tracked in a test var):

-function advanceTimeAndTriggerBatch(ms: number) {
-  vi.advanceTimersByTime(ms);
-  pool.updateQueueLength(0);
-}
+let lastQueueLength: number | undefined;
+function recordUpdate(n: number) {
+  lastQueueLength = n;
+  pool.updateQueueLength(n);
+}
+function advanceTimeAndTriggerBatch(ms: number) {
+  vi.advanceTimersByTime(ms);
+  if (lastQueueLength !== undefined) {
+    pool.updateQueueLength(lastQueueLength);
+  }
+}

Then replace calls to pool.updateQueueLength(x) with recordUpdate(x) in this file.


420-450: Jitter test is disabled and has no assertions

disableJitter: true disables what the test claims to validate; also, no expectations are asserted.

-            disableJitter: true,
+            disableJitter: false,

Add a basic expectation to ensure scaling isn’t synchronized (illustrative):

expect(scaleTimes.length).toBeGreaterThan(0);
// Check at least some variability
const deltas = scaleTimes.map((t, i, a) => (i ? t - a[i - 1] : 0)).slice(1);
expect(new Set(deltas).size).toBeGreaterThan(1);

1-1: Type for Mock may differ across Vitest versions

If the named Mock type isn’t available in your version, switch to vi.Mock or a typed function signature (e.g., vi.fn<[], Promise<void>>()).

packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)

61-66: Reject NaN/Infinity in samples

Prevents corrupting the EWMA with non-finite values.

   addSample(value: number, timestamp: number = Date.now()): void {
-    if (value < 0) {
+    if (!Number.isFinite(value) || value < 0) {
       throw new Error("Queue length cannot be negative");
     }
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts (4)

103-120: Use toBeCloseTo for floating-point assertions.

Floating math can be non-deterministic; prefer tolerant comparisons.

-      expect(processor.getSmoothedValue()).toBe(13);
+      expect(processor.getSmoothedValue()).toBeCloseTo(13, 6);
@@
-      expect(processor.getSmoothedValue()).toBe(10.6);
+      expect(processor.getSmoothedValue()).toBeCloseTo(10.6, 6);

165-167: Fix contradictory median comment (even sample count).

Comment says “lower middle (index 1)” but code uses index 2 (upper middle). Align the comment.

-      // With even count, we take the lower middle value (index 1)
-      // Sorted: [1, 5, 8, 10], median index = floor(4/2) = 2, so median = 8
+      // With even count, we take the upper middle value (index 2)
+      // Sorted: [1, 5, 8, 10], median index = floor(4/2) = 2, so median = 8

7-13: Fake timers aren’t used—remove for simplicity.

You’re passing explicit timestamps and not advancing timers; the fake timers setup can be dropped.

-  beforeEach(() => {
-    vi.useFakeTimers();
-  });
@@
-  afterEach(() => {
-    vi.useRealTimers();
-  });

294-299: Remove unused ‘expected’ fields in scenarios.

They’re misleading and unused.

-      const scenarios = [
-        { samples: [5, 5, 5], expected: 5 }, // Baseline
-        { samples: [50, 50, 50], expected: 18.5 }, // Spike
-        { samples: [5, 5, 5], expected: 10.05 }, // Recovery
-      ];
+      const scenarios = [
+        { samples: [5, 5, 5] }, // Baseline
+        { samples: [50, 50, 50] }, // Spike
+        { samples: [5, 5, 5] }, // Recovery
+      ];
apps/supervisor/src/index.ts (1)

131-140: Validate and log scaling config to prevent misconfiguration.

Clamp/validate bounds (alpha 0..1, targetRatio > 0, min <= max) and log the resolved config for observability.

-      scaling: {
-        strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
-        minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT,
-        maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
-        scaleUpCooldownMs: env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS,
-        scaleDownCooldownMs: env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS,
-        targetRatio: env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO,
-        ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA,
-        batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS,
-      },
+      scaling: {
+        strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
+        minConsumerCount: Math.max(1, env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT),
+        maxConsumerCount: Math.max(
+          Math.max(1, env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT),
+          env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT
+        ),
+        scaleUpCooldownMs: Math.max(0, env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS),
+        scaleDownCooldownMs: Math.max(0, env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS),
+        targetRatio: Math.max(0.0001, env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO),
+        ewmaAlpha: Math.min(1, Math.max(0, env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA)),
+        batchWindowMs: Math.max(1, env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS),
+      },

Optional: after constructing SupervisorSession, log the resolved scaling:

this.logger.log("Dequeue scaling config", this.workerSession.getScalingConfig?.());

Please confirm env.ts already enforces these via zod; if so, clamp isn’t needed here.

packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts (2)

18-24: Edge case: median 0 sample.

QueueMetricsProcessor.processBatch() treats median === 0 as falsy; a zero sample would return null. Consider adding a test to surface this and/or guarding in the processor.

Would you like a PR to add a zero-median test and fix?


161-206: Nice integration checks; consider table-driven for broader ratios.

Optional: add parameterized cases for different targetRatio values to validate thresholds’ scaling.

apps/supervisor/src/env.ts (1)

38-47: Expose jitter control to ops (optional).

Pool supports disableJitter; wire an env to allow deterministic scale cooldowns during single-replica debugging.

   TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // EWMA smoothing factor (0-1)
   TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms)
+  TRIGGER_DEQUEUE_SCALING_DISABLE_JITTER: BoolEnv.default(false),

And ensure it’s passed through to ScalingOptions.disableJitter.

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (6)

83-89: Duplicate param validation; keep single source of truth.

EWMA/batch window validation is already enforced by QueueMetricsProcessor. Consider removing one layer to avoid drift.

-    // Validate EWMA parameters
-    if (this.ewmaAlpha < 0 || this.ewmaAlpha > 1) {
-      throw new Error(`ewmaAlpha must be between 0 and 1, got: ${this.ewmaAlpha}`);
-    }
-    if (this.batchWindowMs <= 0) {
-      throw new Error(`batchWindowMs must be positive, got: ${this.batchWindowMs}`);
-    }

125-145: Method needn’t be async.

start() does not await anything; same for stop(). Make them synchronous to simplify the API.

-  async start() {
+  start() {
     if (this.isEnabled) {
       return;
     }
-  async stop() {
+  stop() {
     if (!this.isEnabled) {
       return;
     }

166-183: Guard against non-finite queue length.

Defensively ignore NaN/Infinity to keep the metrics stable.

-  updateQueueLength(queueLength: number | undefined) {
-    if (queueLength === undefined) {
+  updateQueueLength(queueLength: number | undefined) {
+    if (queueLength === undefined || !Number.isFinite(queueLength)) {
       return;
     }

220-229: Minor log noise.

“actualCount” duplicates “currentCount” here. Drop one.

-      this.logger.debug("Scaling blocked - operation already in progress", {
-        currentCount: this.consumers.size,
-        targetCount: this.metrics.targetConsumerCount,
-        actualCount: this.consumers.size,
-      });
+      this.logger.debug("Scaling blocked - operation already in progress", {
+        currentCount: this.consumers.size,
+        targetCount: this.metrics.targetConsumerCount,
+      });

269-276: Log strategy name, not the object.

This prints a large object; use .name.

-      strategy: this.scalingStrategy,
+      strategy: this.scalingStrategy.name,

354-374: Deterministic removal policy.

Currently removes newest (insertion order). If LRU/oldest-first is desired, slice(0, count). If “newest-first” is intentional, add a short comment.

-    const consumerIds = allIds.slice(-count); // Take from the end
+    // Removing newest first; switch to slice(0, count) to drop oldest
+    const consumerIds = allIds.slice(-count);
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (3)

12-15: Prefer type aliases over interfaces (repo style).

Project guideline favors types over interfaces. Optional swap for consistency.

-export interface ScalingStrategyOptions {
+export type ScalingStrategyOptions = {
   metricsProcessor?: QueueMetricsProcessor;
   dampingFactor?: number;
-}
+};

80-96: Rounding choice may induce oscillation around boundaries.

Round -> toggles near integers. Ceil/floor depending on movement direction can reduce chatter; or keep round but add small hysteresis.

-    const dampedTarget = currentCount + (targetConsumers - currentCount) * this.dampingFactor;
-    return Math.min(
-      Math.max(Math.round(dampedTarget), context.minConsumerCount),
-      context.maxConsumerCount
-    );
+    const dampedTarget = currentCount + (targetConsumers - currentCount) * this.dampingFactor;
+    const proposed = targetConsumers >= currentCount ? Math.ceil(dampedTarget) : Math.floor(dampedTarget);
+    return Math.min(Math.max(proposed, context.minConsumerCount), context.maxConsumerCount);

159-167: Public helper is fine; document intended use.

Add brief JSDoc on who consumes getThresholds (tests/telemetry).

-  getThresholds(targetRatio: number) {
+  /** Exposed for tests/telemetry visualizations. */
+  getThresholds(targetRatio: number) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2b095b1 and 7728f94.

📒 Files selected for processing (12)
  • .changeset/angry-frogs-fix.md (1 hunks)
  • apps/supervisor/src/env.ts (1 hunks)
  • apps/supervisor/src/index.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/index.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (3 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts (6 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts
  • packages/core/src/v3/runEngineWorker/index.ts
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts
  • apps/supervisor/src/env.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
  • apps/supervisor/src/index.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts
  • packages/core/src/v3/runEngineWorker/index.ts
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
🧬 Code graph analysis (7)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (4)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (3)
  • RunQueueConsumerOptions (11-19)
  • QueueConsumer (6-9)
  • RunQueueConsumer (21-146)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (3)
  • ScalingStrategyKind (3-3)
  • ScalingStrategyOptions (12-15)
  • ScalingContext (5-10)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (37-170)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts (1)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (37-170)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (1)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (2)
  • ScalingOptions (13-24)
  • RunQueueConsumerPool (40-389)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts (2)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (4)
  • ScalingContext (5-10)
  • NoneScalingStrategy (44-55)
  • SmoothScalingStrategy (62-97)
  • AggressiveScalingStrategy (104-167)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (37-170)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts (2)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (1)
  • QueueConsumer (6-9)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (3)
  • RunQueueConsumerPool (40-389)
  • ConsumerPoolOptions (26-30)
  • QueueConsumerFactory (11-11)
apps/supervisor/src/index.ts (1)
apps/supervisor/src/env.ts (1)
  • env (112-112)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (37-170)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: build (supervisor)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (17)
packages/core/src/v3/runEngineWorker/index.ts (1)

5-5: Re-export looks good. No duplicate exports of consumerPool detected.

packages/core/src/v3/runEngineWorker/supervisor/session.ts (4)

36-36: LGTM: pool field replaces per-consumer array cleanly.

No issues with visibility or lifecycle concerns.


190-191: LGTM: pool start + heartbeat sequencing.

Starting the pool before heartbeat is sensible; logs mirror state.


205-205: LGTM: idempotent stop path.

Pool stop() guards on isEnabled; safe even if never started.


24-25: Confirmed single SupervisorSession instantiation in apps/supervisor/src/index.ts:122; omitting scaling is safe.

packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts (4)

1-9: LGTM: Vitest setup and imports.

Conforms to test guidelines.


26-38: LGTM: none strategy expectations.

Covers constant target and name.


40-84: LGTM: smooth strategy damping and validation.

Good coverage including zero-current case and bounds checks.


86-159: LGTM: aggressive strategy zones and caps.

Validates under/optimal/over zones, 50% cap, and thresholds.

apps/supervisor/src/env.ts (2)

38-47: Defaults sanity-check.

New defaults (max run count = 1, min=1, max=10, strategy=none, cooldowns 10s/60s, ratio=1.0, alpha=0.3, batch=1000ms) align with pool defaults.


38-47: All new env vars are mapped into SupervisorSession. Verified TRIGGER_DEQUEUE_MAX_RUN_COUNT, MIN_CONSUMER_COUNT, MAX_CONSUMER_COUNT and all SCALING_* fields are passed into the scaling object in apps/supervisor/src/index.ts. There is no DISABLE_JITTER env var defined, so no mapping is needed.

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (3)

376-381: Return read-only object literal is fine.

Good defensive copy for metrics.


386-388: Getter reads live size from Map. LGTM.


280-293: Potential partial-add leak on consumerFactory error.

If the factory throws mid-loop, earlier-created consumers remain inserted (not started) and never cleaned up. Stage creations and commit atomically.

-  private addConsumers(count: number) {
-    const newConsumers: QueueConsumer[] = [];
-    for (let i = 0; i < count; i++) {
-      const consumerId = `consumer-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`;
-      const consumer = this.consumerFactory({
-        ...this.consumerOptions,
-        onDequeue: async (messages) => {
-          if (messages.length > 0 && messages[0]?.workerQueueLength !== undefined) {
-            this.updateQueueLength(messages[0]?.workerQueueLength);
-          }
-          await this.consumerOptions.onDequeue(messages);
-        },
-      });
-      this.consumers.set(consumerId, consumer);
-      newConsumers.push(consumer);
-    }
-    newConsumers.forEach((c) => c.start());
+  private addConsumers(count: number) {
+    const staged: Array<{ id: string; consumer: QueueConsumer }> = [];
+    try {
+      for (let i = 0; i < count; i++) {
+        const id = `consumer-${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
+        const consumer = this.consumerFactory({
+          ...this.consumerOptions,
+          onDequeue: async (messages) => {
+            if (messages.length > 0 && messages[0]?.workerQueueLength !== undefined) {
+              this.updateQueueLength(messages[0].workerQueueLength);
+            }
+            await this.consumerOptions.onDequeue(messages);
+          },
+        });
+        staged.push({ id, consumer });
+      }
+      // Commit and start
+      for (const { id, consumer } of staged) {
+        this.consumers.set(id, consumer);
+        consumer.start();
+      }
+    } catch (e) {
+      // Best-effort cleanup of any created-but-not-started consumers
+      for (const { consumer } of staged) {
+        try { consumer.stop(); } catch {}
+      }
+      throw e;
+    }

Likely an incorrect or invalid review comment.

packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (3)

44-55: Static strategy returns max; consistent with pool init. LGTM.


67-78: Validate options once; explicit error helps.

Good validation for dampingFactor and required metricsProcessor.


116-157: Division-by-zero guarded; zone logic readable. LGTM.

currentConsumerCount falls back to minConsumerCount before divide; thresholds are clear.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (3)

25-27: Make scaling optional to match PR’s backward-compatibility promise.
Required here = breaking change for existing callers.

-  scaling: ScalingOptions;
+  scaling?: ScalingOptions;
   metricsRegistry?: Registry;

50-62: Pass a safe default when opts.scaling is absent.
Prevents runtime errors in RunQueueConsumerPool constructor.

-      scaling: opts.scaling,
+      scaling: opts.scaling ?? {},
       metricsRegistry: opts.metricsRegistry,

186-193: Don’t access private fields on consumerPool (TS compile error).
Log from config instead; or add public getters on the pool.

-      this.logger.log("Queue consumer enabled", {
-        scalingStrategy: this.consumerPool["scalingStrategy"],
-        minConsumers: this.consumerPool["minConsumerCount"],
-        maxConsumers: this.consumerPool["maxConsumerCount"],
-      });
+      this.logger.log("Queue consumer enabled", {
+        scalingStrategy: this.opts.scaling?.strategy ?? "none",
+        minConsumers: Math.max(1, this.opts.scaling?.minConsumerCount ?? 1),
+        maxConsumers: Math.max(
+          Math.max(1, this.opts.scaling?.minConsumerCount ?? 1),
+          this.opts.scaling?.maxConsumerCount ?? 10
+        ),
+      });
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1)

212-215: Use explicit null-check to avoid future regressions.

result is an object today, but make the intent clear.

-    if (!result) {
+    if (result === null) {
       this.logger.debug("No queue length samples in batch window - skipping scaling evaluation");
       return;
     }
🧹 Nitpick comments (10)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (1)

4-4: Use type-only import for types to avoid bundling overhead.
Change ScalingOptions to a type-only import.

-import { RunQueueConsumerPool, ScalingOptions } from "./consumerPool.js";
+import { RunQueueConsumerPool } from "./consumerPool.js";
+import type { ScalingOptions } from "./consumerPool.js";
apps/supervisor/src/index.ts (1)

131-141: Wire scaling config, plus two small hardening tweaks.

  • Pass undefined when dequeueing is disabled to reflect “optional” scaling.
  • Optional guard: warn if min > max (even though pool clamps).
-      metricsRegistry: register,
-      scaling: {
+      metricsRegistry: register,
+      scaling: env.TRIGGER_DEQUEUE_ENABLED
+        ? {
           strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
           minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT,
           maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
           scaleUpCooldownMs: env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS,
           scaleDownCooldownMs: env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS,
           targetRatio: env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO,
           ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA,
           batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS,
-      },
+        }
+        : undefined,

If you want the min/max sanity warning here, I can add it right above the SupervisorSession construction.

packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (2)

88-123: Use the samples parameter in calculateMedian (avoid hidden coupling).
Currently ignores the parameter and reads this.samples.

-  private calculateMedian(samples: number[]): number | null {
-    const sortedSamples = [...this.samples].sort((a, b) => a - b);
+  private calculateMedian(samples: number[]): number | null {
+    const sortedSamples = [...samples].sort((a, b) => a - b);

64-75: Validate finite numbers in addSample.
Reject NaN/Infinity early to keep metrics sane.

-  addSample(value: number, timestamp: number = Date.now()): void {
-    if (value < 0) {
+  addSample(value: number, timestamp: number = Date.now()): void {
+    if (!Number.isFinite(value) || value < 0) {
       throw new Error("Queue length cannot be negative");
     }
packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts (3)

1-1: Remove unused import.

Histogram isn’t used.

-import { Counter, Gauge, Histogram, Registry } from "prom-client";
+import { Counter, Gauge, Registry } from "prom-client";

3-6: Prefer type over interface per repo guidelines.

-export interface ConsumerPoolMetricsOptions {
+export type ConsumerPoolMetricsOptions = {
   register?: Registry;
   prefix?: string;
-}
+};

126-130: Hardcoded strategy labels can drift from ScalingStrategyKind.

If a new strategy is added, this gauge won’t reflect it until updated. Consider centralizing the list or exporting it from scalingStrategies.ts, or document the need to keep these in sync.

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (3)

217-231: Emit metrics for processed batches and update gauges even without scaling.

Currently batchesProcessedTotal is never incremented, and gauges aren’t refreshed unless scaling occurs.

     this.metrics.queueLength = result.median;
     this.metrics.smoothedQueueLength = result.smoothedValue;
     this.metrics.lastQueueLengthUpdate = new Date();
+
+    // Metrics: record processed batch and refresh gauges
+    this.promMetrics?.batchesProcessedTotal.inc();
+    this.promMetrics?.updateState({
+      consumerCount: this.consumers.size,
+      queueLength: this.metrics.queueLength,
+      smoothedQueueLength: this.metrics.smoothedQueueLength,
+      targetConsumerCount: this.metrics.targetConsumerCount,
+      strategy: this.scalingStrategy.name,
+    });

294-300: Log strategy name instead of the object.

-      strategy: this.scalingStrategy,
+      strategy: this.scalingStrategy.name,

172-180: Update Prometheus gauges on stop().

Reflect zero/cleared state after pool shutdown.

     // Stop all consumers
     Array.from(this.consumers.values()).forEach((consumer) => consumer.stop());
 
     this.consumers.clear();
 
     this.logger.log("Stopped dynamic consumer pool");
+
+    // Metrics: reflect stopped state
+    this.promMetrics?.updateState({
+      consumerCount: this.consumers.size,
+      queueLength: this.metrics.queueLength,
+      smoothedQueueLength: this.metrics.smoothedQueueLength,
+      targetConsumerCount: this.metrics.targetConsumerCount,
+      strategy: this.scalingStrategy.name,
+    });
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 7728f94 and f11ea5a.

📒 Files selected for processing (6)
  • apps/supervisor/src/index.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts
  • apps/supervisor/src/index.ts
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts
  • packages/core/src/v3/runEngineWorker/supervisor/session.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
🧬 Code graph analysis (4)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (5)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (3)
  • RunQueueConsumerOptions (11-19)
  • QueueConsumer (6-9)
  • RunQueueConsumer (21-146)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (3)
  • ScalingStrategyKind (3-3)
  • ScalingStrategyOptions (12-15)
  • ScalingContext (5-10)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts (1)
  • ConsumerPoolMetrics (8-160)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (39-209)
apps/supervisor/src/index.ts (2)
apps/supervisor/src/metrics.ts (1)
  • register (3-3)
apps/supervisor/src/env.ts (1)
  • env (112-112)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (1)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (2)
  • ScalingOptions (15-26)
  • RunQueueConsumerPool (43-427)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / packages / 📊 Merge Reports
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (1)

208-208: LGTM: stop() safely handles not-started pools.
RunQueueConsumerPool.stop() no-ops if not enabled.

packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)

132-167: Median + EWMA batch processing looks solid.
Correct even-length median and explicit null check are in place.

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1)

369-372: No change required. Confirmed WorkerApiDequeueResponseBody is an array of DequeuedMessage, which defines an optional workerQueueLength field, so the check and updateQueueLength call are valid.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1)

211-214: Explicit null check for clarity.

Avoid conflating a valid result with median 0 with “no result”.

-    if (!result) {
+    if (result === null) {
       this.logger.debug("No queue length samples in batch window - skipping scaling evaluation");
       return;
     }
🧹 Nitpick comments (5)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (2)

138-144: Defensive: treat NaN median as invalid.

If a bad sample slips through, avoid updating state with NaN.

-    const median = this.calculateMedian(this.samples);
-    if (median === null) {
+    const median = this.calculateMedian(this.samples);
+    if (median === null || Number.isNaN(median)) {
       // We already logged a more specific error message
       return null;
     }

18-30: Type vs interface (house style).

Our TS style favors types over interfaces. Optional to align here.

packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1)

5-11: Nit: prefer type over interface per guidelines.

No behavior change; aligns with repo conventions.

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (2)

293-300: Log the strategy name, not the object.

Current log serializes the strategy instance.

-    this.logger.info("Scaling consumer pool", {
+    this.logger.info("Scaling consumer pool", {
       from: this.consumers.size,
       to: targetCount,
       queueLength: this.metrics.queueLength,
       smoothedQueueLength: this.metrics.smoothedQueueLength,
-      strategy: this.scalingStrategy,
+      strategy: this.scalingStrategy.name,
     });

117-133: Minor: include targetRatio in init log for visibility.

Helps correlate behavior with config at runtime.

     this.logger.log("Initialized consumer pool", {
       minConsumerCount: this.minConsumerCount,
       maxConsumerCount: this.maxConsumerCount,
       scalingStrategy: this.scalingStrategy.name,
       mode: this.scalingStrategy.name === "none" ? "static" : "dynamic",
       ewmaAlpha: this.ewmaAlpha,
       batchWindowMs: this.batchWindowMs,
+      targetRatio,
     });
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f11ea5a and 93fc0cd.

📒 Files selected for processing (5)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts
  • packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts
🧬 Code graph analysis (3)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (5)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (3)
  • RunQueueConsumerOptions (11-19)
  • QueueConsumer (6-9)
  • RunQueueConsumer (21-146)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (2)
  • ScalingStrategyKind (3-3)
  • ScalingStrategyOptions (5-11)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts (1)
  • ConsumerPoolMetrics (8-160)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (39-209)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (39-209)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1)

356-366: No action needed: workerQueueLength is defined as number? on DequeuedMessage, so the existing check is correct.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
apps/supervisor/src/env.ts (1)

39-40: Enforce valid consumer bounds and add cross-field validation (min ≤ max).

  • Add .min(1) to prevent zero/negative counts.
  • Add a schema-level check to reject min > max. This prevents misconfiguration at startup.

Apply within these lines:

-  TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1),
-  TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10),
+  TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().min(1).default(1),
+  TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().min(1).default(10),

And add cross-field validation (outside the hunk):

// after defining `const Env = z.object({ ... })`, change to:
const Env = z
  .object({ /* ...existing fields... */ })
  .superRefine((v, ctx) => {
    if (v.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT > v.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT) {
      ctx.addIssue({
        code: z.ZodIssueCode.custom,
        message: "TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT cannot exceed TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT",
        path: ["TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT"],
      });
    }
  });
🧹 Nitpick comments (1)
apps/supervisor/src/env.ts (1)

41-41: Make strategy value case-insensitive (matches protocol handling pattern).

Prevents parse failures on values like "Smooth" or "AGGRESSIVE".

-  TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"),
+  TRIGGER_DEQUEUE_SCALING_STRATEGY: z
+    .string()
+    .transform((s) => z.enum(["none", "smooth", "aggressive"]).parse(s.toLowerCase()))
+    .default("none"),
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 93fc0cd and 16ab40a.

📒 Files selected for processing (3)
  • apps/supervisor/src/env.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/supervisor/src/env.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
apps/supervisor/src/env.ts (2)

42-43: Defaults mismatch with PR description (cooldowns).

PR text says 10s/60s; code sets 5s/30s. Confirm intended values or align code.

If aligning to PR text:

-  TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds
-  TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds
+  TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(10000), // 10 seconds
+  TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(60000), // 60 seconds

38-38: Verify TRIGGER_DEQUEUE_MAX_RUN_COUNT default change (10 → 1)
This reduces per-consumer concurrency and may impact throughput; confirm that lowering the default from 10 to 1 is intentional and acceptable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (2)

203-211: Explicit null-check and increment processed-batches metric.

Make the “no result” check explicit and record batch processing.

-    const result = this.metricsProcessor.processBatch();
-
-    if (!result) {
+    const result = this.metricsProcessor.processBatch();
+    if (result == null) {
       this.logger.debug("No queue length samples in batch window - skipping scaling evaluation");
       return;
     }
@@
     this.logger.verbose("Queue metrics batch processed", {
       samples: result.sampleCount,
       median: result.median,
       smoothed: result.smoothedValue,
       currentConsumerCount: this.consumers.size,
     });
+    this.promMetrics?.batchesProcessedTotal.inc();

Also applies to: 217-223


194-201: Don’t let invalid samples crash the pool.

Wrap addSample; log and skip on error.

-    // Add sample to metrics processor
-    this.metricsProcessor.addSample(queueLength);
+    // Add sample to metrics processor
+    try {
+      if (!Number.isFinite(queueLength)) throw new Error("Queue length must be a finite number");
+      this.metricsProcessor.addSample(queueLength);
+    } catch (error) {
+      this.logger.error("Invalid queue length sample", { queueLength, error });
+      return;
+    }
🧹 Nitpick comments (5)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (5)

289-295: Fix log value: log strategy name, not the object.

Current log prints the object reference.

-    this.logger.info("Scaling consumer pool", {
+    this.logger.info("Scaling consumer pool", {
       from: this.consumers.size,
       to: targetCount,
       queueLength: this.metrics.queueLength,
       smoothedQueueLength: this.metrics.smoothedQueueLength,
-      strategy: this.scalingStrategy,
+      strategy: this.scalingStrategy.name,
     });

101-103: Validate targetRatio > 0 (defensive).

Prevents undefined behavior in strategies dividing by or scaling with targetRatio.

     const targetRatio = opts.scaling.targetRatio ?? 1.0;
     const dampingFactor = opts.scaling.dampingFactor;
+    if (!(targetRatio > 0)) {
+      throw new Error(`targetRatio must be > 0, got: ${targetRatio}`);
+    }

Also applies to: 87-94


152-154: Accurate startup log message.

Pool can be static (“none”). Use a neutral message.

-    this.logger.log("Started dynamic consumer pool", {
+    this.logger.log("Started consumer pool", {
       initialConsumerCount: this.consumers.size,
     });

346-365: Use a monotonic ID to avoid rare collisions and aid debugging.

Random-based IDs can collide; a counter is simpler and stable.

   private consumers: Map<string, QueueConsumer> = new Map();
   private readonly consumerFactory: QueueConsumerFactory;
   private isEnabled: boolean = false;
   private isScaling: boolean = false;
+  private idSeq: number = 0;
@@
     for (let i = 0; i < count; i++) {
-      const consumerId = `consumer-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`;
+      const consumerId = `consumer-${Date.now()}-${++this.idSeq}`;

Also applies to: 53-57


248-252: Keep Prometheus gauges fresh even when no scaling occurs.

When target equals current, gauges may lag. Update state in the “no-op” path.

     if (targetCount === this.consumers.size) {
-      return;
+      this.promMetrics?.updateState({
+        consumerCount: this.consumers.size,
+        queueLength: this.metrics.queueLength,
+        smoothedQueueLength: this.metrics.smoothedQueueLength,
+        targetConsumerCount: this.metrics.targetConsumerCount,
+        strategy: this.scalingStrategy.name,
+      });
+      return;
     }

Also applies to: 336-344

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 16ab40a and 6fef333.

📒 Files selected for processing (3)
  • apps/supervisor/src/env.ts (1 hunks)
  • apps/supervisor/src/index.ts (1 hunks)
  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • apps/supervisor/src/index.ts
  • apps/supervisor/src/env.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts
🧬 Code graph analysis (1)
packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts (5)
packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts (3)
  • RunQueueConsumerOptions (11-19)
  • QueueConsumer (6-9)
  • RunQueueConsumer (21-146)
packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts (1)
  • ScalingStrategyKind (3-3)
packages/core/src/v3/utils/structuredLogger.ts (1)
  • SimpleStructuredLogger (21-102)
packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts (1)
  • ConsumerPoolMetrics (8-160)
packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts (1)
  • QueueMetricsProcessor (39-209)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: build (supervisor)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)

@nicktrn nicktrn merged commit 0d1eac9 into main Sep 2, 2025
32 checks passed
@nicktrn nicktrn deleted the feat/dynamic-queue-consumer-pool branch September 2, 2025 12:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants