diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index a225af5ea1..074c18ad74 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -35,8 +35,16 @@ const Env = z.object({ TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), - TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10), - TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), + TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds + TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds + TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) + TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) + TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) + TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) // Optional services TRIGGER_WARM_START_URL: z.string().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 1ed00edad6..b958897ad4 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -128,7 +128,18 @@ class ManagedSupervisor { dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS, queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT, - maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT, + metricsRegistry: register, + scaling: { + strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY, + minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT, + maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT, + scaleUpCooldownMs: env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS, + scaleDownCooldownMs: env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS, + targetRatio: env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO, + ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA, + batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS, + dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR, + }, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS, diff --git a/packages/core/src/v3/runEngineWorker/index.ts b/packages/core/src/v3/runEngineWorker/index.ts index 98566ae225..50617df11a 100644 --- a/packages/core/src/v3/runEngineWorker/index.ts +++ b/packages/core/src/v3/runEngineWorker/index.ts @@ -2,6 +2,7 @@ export * from "./consts.js"; export * from "./supervisor/http.js"; export * from "./supervisor/schemas.js"; export * from "./supervisor/session.js"; +export * from "./supervisor/consumerPool.js"; export * from "./workload/http.js"; export * from "./workload/schemas.js"; export * from "./types.js"; diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts new file mode 100644 index 0000000000..5f515b95b7 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts @@ -0,0 +1,721 @@ +import { describe, it, expect, beforeEach, afterEach, vi, Mock } from "vitest"; +import { + RunQueueConsumerPool, + type ConsumerPoolOptions, + type QueueConsumerFactory, +} from "./consumerPool.js"; +import { SupervisorHttpClient } from "./http.js"; +import type { WorkerApiDequeueResponseBody } from "./schemas.js"; +import type { QueueConsumer } from "./queueConsumer.js"; + +// Mock only the logger +vi.mock("../../utils/structuredLogger.js"); + +// Test implementation of QueueConsumer +class TestQueueConsumer implements QueueConsumer { + public started = false; + public stopped = false; + public onDequeue?: (messages: WorkerApiDequeueResponseBody) => Promise; + + constructor(opts: any) { + this.onDequeue = opts.onDequeue; + } + + start(): void { + this.started = true; + this.stopped = false; + } + + stop(): void { + this.stopped = true; + this.started = false; + } +} + +describe("RunQueueConsumerPool", () => { + let mockClient: SupervisorHttpClient; + let mockOnDequeue: Mock; + let pool: RunQueueConsumerPool; + let defaultOptions: Omit; + let testConsumers: TestQueueConsumer[]; + let testConsumerFactory: QueueConsumerFactory; + + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + + mockClient = {} as SupervisorHttpClient; + mockOnDequeue = vi.fn(); + testConsumers = []; + + testConsumerFactory = (opts) => { + const consumer = new TestQueueConsumer(opts); + testConsumers.push(consumer); + return consumer; + }; + + defaultOptions = { + consumer: { + client: mockClient, + intervalMs: 0, + idleIntervalMs: 1000, + onDequeue: mockOnDequeue, + }, + consumerFactory: testConsumerFactory, + }; + }); + + afterEach(() => { + vi.useRealTimers(); + if (pool) { + pool.stop(); + } + }); + + function advanceTimeAndProcessMetrics(ms: number) { + vi.advanceTimersByTime(ms); + + // Trigger batch processing if ready (without adding a sample) + if (pool["metricsProcessor"].shouldProcessBatch()) { + pool["processMetricsBatch"](); + } + } + + describe("Static mode (strategy='none')", () => { + it("should start with maxConsumerCount in static mode", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { strategy: "none", maxConsumerCount: 5 }, + }); + + await pool.start(); + + expect(pool.size).toBe(5); + expect(testConsumers.length).toBe(5); + }); + + it("should not scale in static mode even with queue length updates", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { strategy: "none", maxConsumerCount: 3 }, + }); + + await pool.start(); + const initialCount = pool.size; + + pool.updateQueueLength(100); + vi.advanceTimersByTime(2000); + + expect(pool.size).toBe(initialCount); + expect(pool.size).toBe(3); + }); + }); + + describe("Smooth scaling strategy", () => { + it("should scale smoothly with damping", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + pool.updateQueueLength(5); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(4); // Damped scaling + + pool.updateQueueLength(5); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(5); // Gradually approaches target + }); + + it("should respect max consumer count", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 5, + scaleUpCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + pool.updateQueueLength(100); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(5); + + pool.updateQueueLength(100); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(5); + }); + }); + + describe("Aggressive scaling strategy", () => { + it("should scale up quickly based on queue pressure", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 2, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(2); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(3); + + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(4); + }); + + it("should scale down cautiously when queue is small", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + scaleDownCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(2); + + pool.updateQueueLength(0.5); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(3); // EWMA smoothing delays scale down + + pool.updateQueueLength(0.5); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeGreaterThanOrEqual(3); // Stays in optimal zone + }); + + it("should maintain current level in optimal zone", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 3, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(3); + + pool.updateQueueLength(3); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(3); + + pool.updateQueueLength(4); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(3); + }); + }); + + describe("Smooth scaling with EWMA", () => { + it("should use exponential smoothing for stable scaling", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + const queueLengths = [10, 2, 8, 3, 9, 1, 7]; + for (const length of queueLengths) { + pool.updateQueueLength(length); + vi.advanceTimersByTime(200); + } + vi.advanceTimersByTime(900); + + const metrics = pool.getMetrics(); + expect(metrics.smoothedQueueLength).toBeGreaterThan(0); + expect(metrics.smoothedQueueLength).toBeLessThan(10); + }); + + it("should apply damping factor to avoid rapid changes", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + pool.updateQueueLength(2); + advanceTimeAndProcessMetrics(1100); + const metrics1 = pool.getMetrics(); + expect(metrics1.smoothedQueueLength).toBe(2); + + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + const metrics2 = pool.getMetrics(); + + expect(metrics2.smoothedQueueLength).toBeGreaterThan(2); + expect(metrics2.smoothedQueueLength).toBeLessThan(20); + }); + }); + + describe("High throughput parallel dequeuing", () => { + it("should handle rapid parallel queue updates", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 1, + maxConsumerCount: 20, + disableJitter: true, + }, + }); + + await pool.start(); + + const updates: number[] = []; + for (let i = 0; i < 100; i++) { + updates.push(Math.floor(Math.random() * 50) + 10); + } + + updates.forEach((length, index) => { + setTimeout(() => pool.updateQueueLength(length), index * 10); + }); + + advanceTimeAndProcessMetrics(1100); + + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBeDefined(); + }); + + it("should batch metrics updates to avoid excessive scaling", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + const evaluateScalingSpy = vi.spyOn(pool as any, "evaluateScaling"); + + pool.updateQueueLength(10); + for (let i = 1; i < 50; i++) { + pool.updateQueueLength(Math.floor(Math.random() * 20) + 5); + } + + expect(evaluateScalingSpy).not.toHaveBeenCalled(); + advanceTimeAndProcessMetrics(1000); + expect(evaluateScalingSpy).toHaveBeenCalledTimes(1); + }); + + it("should use median to filter outliers in high-frequency updates", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + const updates = [10, 11, 9, 12, 10, 100, 11, 10, 9, 11, 1]; + updates.forEach((length) => pool.updateQueueLength(length)); + advanceTimeAndProcessMetrics(1100); + + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBeGreaterThanOrEqual(9); + expect(metrics.queueLength).toBeLessThanOrEqual(12); + }); + }); + + describe("Scaling cooldowns and jitter", () => { + it("should respect scale-up cooldown", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + scaleUpCooldownMs: 5000, + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + pool["scaleToTarget"](5); + const scaleToTargetSpy = vi.spyOn(pool as any, "scaleToTarget"); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(scaleToTargetSpy).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(10000); + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + }); + + it("should respect scale-down cooldown (longer than scale-up)", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + for (let i = 0; i < 4; i++) { + pool["addConsumers"](1); + } + pool["scaleToTarget"](5); + pool["metrics"].lastScaleTime = new Date(Date.now() - 70000); + + pool.updateQueueLength(1); + advanceTimeAndProcessMetrics(1100); + + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBe(1); + }); + + it("should add random jitter to prevent thundering herd", async () => { + const pools: RunQueueConsumerPool[] = []; + const scaleTimes: number[] = []; + + for (let i = 0; i < 3; i++) { + const p = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + const originalScale = p["scaleToTarget"]; + p["scaleToTarget"] = vi.fn(async (target: number) => { + scaleTimes.push(Date.now()); + return originalScale.call(p, target); + }); + + pools.push(p); + await p.start(); + } + + pools.forEach((p) => p.updateQueueLength(20)); + advanceTimeAndProcessMetrics(1100); + vi.advanceTimersByTime(15000); + + await Promise.all(pools.map((p) => p.stop())); + }); + }); + + describe("Consumer lifecycle management", () => { + it("should properly start and stop consumers", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "none", + maxConsumerCount: 3, + disableJitter: true, + }, + }); + + await pool.start(); + + expect(pool.size).toBe(3); + expect(testConsumers.length).toBe(3); + testConsumers.forEach((consumer) => { + expect(consumer.started).toBe(true); + }); + + await pool.stop(); + + testConsumers.forEach((consumer) => { + expect(consumer.stopped).toBe(true); + }); + }); + + it("should forward dequeue messages with queue length updates", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + maxConsumerCount: 2, + disableJitter: true, + }, + }); + + await pool.start(); + + const messages: WorkerApiDequeueResponseBody = [{ workerQueueLength: 15 } as any]; + + if (testConsumers[0]?.onDequeue) { + await testConsumers[0].onDequeue(messages); + } + + expect(mockOnDequeue).toHaveBeenCalledWith(messages); + + advanceTimeAndProcessMetrics(1100); + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBe(15); + }); + }); + + describe("Memory leak prevention", () => { + it("should collect all samples within batch window without limit", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + for (let i = 0; i < 100; i++) { + pool.updateQueueLength(i); + } + + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBeUndefined(); + }); + + it("should clear consumer map on stop", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "none", + maxConsumerCount: 5, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(5); + + await pool.stop(); + expect(pool.size).toBe(0); + }); + + it("should clear recentQueueLengths after processing batch", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { strategy: "smooth" }, + }); + + await pool.start(); + + for (let i = 0; i < 5; i++) { + pool.updateQueueLength(10 + i); + } + + advanceTimeAndProcessMetrics(1100); + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBeDefined(); + }); + + it("should not accumulate scaling operations in memory", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + scaleUpCooldownMs: 100, + scaleDownCooldownMs: 100, + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + + for (let i = 0; i < 5; i++) { + pool["metrics"].lastScaleTime = new Date(0); + pool.updateQueueLength(i % 2 === 0 ? 50 : 1); + vi.advanceTimersByTime(1100); + } + + expect(pool.size).toBeGreaterThanOrEqual(1); + expect(pool.size).toBeLessThanOrEqual(10); + }); + }); + + describe("Edge cases", () => { + it("should handle empty recent queue lengths", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { strategy: "aggressive" }, + }); + + await pool.start(); + + const metrics = pool.getMetrics(); + expect(metrics.queueLength).toBeUndefined(); + }); + + it("should clamp consumer count to min/max bounds", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 2, + maxConsumerCount: 5, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(2); + + pool.updateQueueLength(100); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeLessThanOrEqual(5); + }); + + it("should respect custom targetRatio with smooth strategy", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + targetRatio: 5, + scaleUpCooldownMs: 0, + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + + const firstSize = pool.size; + expect(firstSize).toBeGreaterThanOrEqual(1); + expect(firstSize).toBeLessThanOrEqual(2); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeLessThanOrEqual(2); + }); + + it("should respect custom targetRatio with aggressive strategy", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "aggressive", + targetRatio: 5, + scaleUpCooldownMs: 0, + minConsumerCount: 1, + maxConsumerCount: 10, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + + const sizeAfterFirstScale = pool.size; + expect(sizeAfterFirstScale).toBeGreaterThanOrEqual(1); + + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeLessThanOrEqual(6); + }); + + it("should scale down when no items are dequeued (zero queue length)", async () => { + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + scaleDownCooldownMs: 0, + disableJitter: true, + }, + }); + + await pool.start(); + expect(pool.size).toBe(1); + + // Scale up first + pool.updateQueueLength(20); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeGreaterThan(1); + const sizeAfterScaleUp = pool.size; + + // Now send multiple zero queue lengths to converge EWMA to 0 + // The EWMA needs time to converge due to exponential smoothing + for (let i = 0; i < 5; i++) { + pool.updateQueueLength(0); + advanceTimeAndProcessMetrics(1100); + } + + // After multiple iterations with zero queue, should scale down but not to minimum yet + expect(pool.size).toBeLessThan(sizeAfterScaleUp); + expect(pool.size).toBeGreaterThan(1); + + // Continue until we reach minimum + for (let i = 0; i < 5; i++) { + pool.updateQueueLength(0); + advanceTimeAndProcessMetrics(1100); + } + + // Should eventually reach minimum + expect(pool.size).toBe(1); + }); + }); +}); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts new file mode 100644 index 0000000000..2dd3d1b898 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts @@ -0,0 +1,411 @@ +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; +import { QueueConsumer, RunQueueConsumer, RunQueueConsumerOptions } from "./queueConsumer.js"; +import { QueueMetricsProcessor } from "./queueMetricsProcessor.js"; +import { + ScalingStrategy, + ScalingStrategyKind, + ScalingStrategyOptions, +} from "./scalingStrategies.js"; +import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js"; +import type { Registry } from "prom-client"; + +export type QueueConsumerFactory = (opts: RunQueueConsumerOptions) => QueueConsumer; + +export type ScalingOptions = { + strategy?: ScalingStrategyKind; + minConsumerCount?: number; + maxConsumerCount?: number; + scaleUpCooldownMs?: number; + scaleDownCooldownMs?: number; + targetRatio?: number; + ewmaAlpha?: number; + batchWindowMs?: number; + disableJitter?: boolean; + dampingFactor?: number; +}; + +export type ConsumerPoolOptions = { + consumer: RunQueueConsumerOptions; + scaling: ScalingOptions; + consumerFactory?: QueueConsumerFactory; + metricsRegistry?: Registry; +}; + +type ScalingMetrics = { + targetConsumerCount: number; + queueLength?: number; + smoothedQueueLength: number; + lastScaleTime: Date; + lastQueueLengthUpdate: Date; +}; + +export class RunQueueConsumerPool { + private readonly consumerOptions: RunQueueConsumerOptions; + + private readonly logger = new SimpleStructuredLogger("consumer-pool"); + private readonly promMetrics?: ConsumerPoolMetrics; + + private readonly minConsumerCount: number; + private readonly maxConsumerCount: number; + private readonly scalingStrategy: ScalingStrategy; + private readonly disableJitter: boolean; + + private consumers: Map = new Map(); + private readonly consumerFactory: QueueConsumerFactory; + private isEnabled: boolean = false; + private isScaling: boolean = false; + + private metrics: ScalingMetrics; + private readonly metricsProcessor: QueueMetricsProcessor; + + // Scaling parameters + private readonly ewmaAlpha: number; + private readonly scaleUpCooldownMs: number; + private readonly scaleDownCooldownMs: number; + private readonly batchWindowMs: number; + + constructor(opts: ConsumerPoolOptions) { + this.consumerOptions = opts.consumer; + + // Initialize Prometheus metrics if registry provided + if (opts.metricsRegistry) { + this.promMetrics = new ConsumerPoolMetrics({ + register: opts.metricsRegistry, + }); + } + + this.minConsumerCount = Math.max(1, opts.scaling.minConsumerCount ?? 1); + this.maxConsumerCount = Math.max(this.minConsumerCount, opts.scaling.maxConsumerCount ?? 10); + this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default + this.scaleDownCooldownMs = opts.scaling.scaleDownCooldownMs ?? 60000; // 60 seconds default + this.disableJitter = opts.scaling.disableJitter ?? false; + + // Configure EWMA parameters from options + this.ewmaAlpha = opts.scaling.ewmaAlpha ?? 0.3; + this.batchWindowMs = opts.scaling.batchWindowMs ?? 1000; + + // Validate EWMA parameters + if (this.ewmaAlpha < 0 || this.ewmaAlpha > 1) { + throw new Error(`ewmaAlpha must be between 0 and 1, got: ${this.ewmaAlpha}`); + } + if (this.batchWindowMs <= 0) { + throw new Error(`batchWindowMs must be positive, got: ${this.batchWindowMs}`); + } + + // Initialize metrics processor + this.metricsProcessor = new QueueMetricsProcessor({ + ewmaAlpha: this.ewmaAlpha, + batchWindowMs: this.batchWindowMs, + }); + + const targetRatio = opts.scaling.targetRatio ?? 1.0; + const dampingFactor = opts.scaling.dampingFactor; + + // Create scaling strategy with metrics processor injected + this.scalingStrategy = ScalingStrategy.create(opts.scaling.strategy ?? "none", { + metricsProcessor: this.metricsProcessor, + dampingFactor, + targetRatio, + minConsumerCount: this.minConsumerCount, + maxConsumerCount: this.maxConsumerCount, + }); + + // Use provided factory or default to RunQueueConsumer + this.consumerFactory = + opts.consumerFactory || ((consumerOpts) => new RunQueueConsumer(consumerOpts)); + + this.metrics = { + targetConsumerCount: this.minConsumerCount, + queueLength: undefined, + smoothedQueueLength: 0, + lastScaleTime: new Date(0), + lastQueueLengthUpdate: new Date(0), + }; + + this.logger.log("Initialized consumer pool", { + minConsumerCount: this.minConsumerCount, + maxConsumerCount: this.maxConsumerCount, + scalingStrategy: this.scalingStrategy.name, + mode: this.scalingStrategy.name === "none" ? "static" : "dynamic", + ewmaAlpha: this.ewmaAlpha, + batchWindowMs: this.batchWindowMs, + }); + } + + async start() { + if (this.isEnabled) { + return; + } + + this.isEnabled = true; + + // For 'none' strategy, start with max consumers (static mode) + // For dynamic strategies, start with minimum + const initialCount = + this.scalingStrategy.name === "none" ? this.maxConsumerCount : this.minConsumerCount; + + // Set initial metrics + this.metrics.targetConsumerCount = initialCount; + + this.addConsumers(initialCount); + + this.logger.log("Started dynamic consumer pool", { + initialConsumerCount: this.consumers.size, + }); + + // Initialize Prometheus metrics with initial state + this.promMetrics?.updateState({ + consumerCount: this.consumers.size, + queueLength: this.metrics.queueLength, + smoothedQueueLength: this.metrics.smoothedQueueLength, + targetConsumerCount: initialCount, + strategy: this.scalingStrategy.name, + }); + } + + async stop() { + if (!this.isEnabled) { + return; + } + + this.isEnabled = false; + + // Stop all consumers + Array.from(this.consumers.values()).forEach((consumer) => consumer.stop()); + + this.consumers.clear(); + + this.logger.log("Stopped dynamic consumer pool"); + } + + /** + * Updates the queue length metric and triggers scaling decisions + * Uses QueueMetricsProcessor for batching and EWMA smoothing + */ + updateQueueLength(queueLength: number) { + // Track queue length update in metrics + this.promMetrics?.recordQueueLengthUpdate(); + + // Skip metrics tracking for static mode + if (this.scalingStrategy.name === "none") { + return; + } + + // Add sample to metrics processor + this.metricsProcessor.addSample(queueLength); + + // Check if we should process the current batch + if (this.metricsProcessor.shouldProcessBatch()) { + this.processMetricsBatch(); + } + } + + private processMetricsBatch() { + // Process batch using the metrics processor + const result = this.metricsProcessor.processBatch(); + + if (!result) { + this.logger.debug("No queue length samples in batch window - skipping scaling evaluation"); + return; + } + + // Update metrics + this.metrics.queueLength = result.median; + this.metrics.smoothedQueueLength = result.smoothedValue; + this.metrics.lastQueueLengthUpdate = new Date(); + + this.logger.verbose("Queue metrics batch processed", { + samples: result.sampleCount, + median: result.median, + smoothed: result.smoothedValue, + currentConsumerCount: this.consumers.size, + }); + + // Make scaling decision + this.evaluateScaling(); + } + + private evaluateScaling() { + if (!this.isEnabled) { + return; + } + + // No scaling in static mode + if (this.scalingStrategy.name === "none") { + return; + } + + // Skip if already scaling + if (this.isScaling) { + this.logger.debug("Scaling blocked - operation already in progress", { + currentCount: this.consumers.size, + targetCount: this.metrics.targetConsumerCount, + actualCount: this.consumers.size, + }); + return; + } + + const targetCount = this.calculateTargetConsumerCount(); + + if (targetCount === this.consumers.size) { + return; + } + + const timeSinceLastScale = Date.now() - this.metrics.lastScaleTime.getTime(); + + // Add random jitter to avoid thundering herd when multiple replicas exist + // Works without needing to know replica index or count + const jitterMs = this.disableJitter ? 0 : Math.random() * 3000; // 0-3 seconds random jitter + + // Check cooldown periods with jitter + if (targetCount > this.consumers.size) { + // Scale up + const effectiveCooldown = this.scaleUpCooldownMs + jitterMs; + if (timeSinceLastScale < effectiveCooldown) { + this.logger.debug("Scale up blocked by cooldown", { + timeSinceLastScale, + cooldownMs: effectiveCooldown, + jitterMs, + remainingMs: effectiveCooldown - timeSinceLastScale, + }); + this.promMetrics?.recordCooldownApplied("up"); + return; + } + } else if (targetCount < this.consumers.size) { + // Scale down + const effectiveCooldown = this.scaleDownCooldownMs + jitterMs; + if (timeSinceLastScale < effectiveCooldown) { + this.logger.debug("Scale down blocked by cooldown", { + timeSinceLastScale, + cooldownMs: effectiveCooldown, + jitterMs, + remainingMs: effectiveCooldown - timeSinceLastScale, + }); + this.promMetrics?.recordCooldownApplied("down"); + return; + } + } + + this.logger.info("Scaling consumer pool", { + from: this.consumers.size, + to: targetCount, + queueLength: this.metrics.queueLength, + smoothedQueueLength: this.metrics.smoothedQueueLength, + strategy: this.scalingStrategy, + }); + + // Set flag before scaling + this.isScaling = true; + + // Update target metric for visibility + const previousTarget = this.metrics.targetConsumerCount; + this.metrics.targetConsumerCount = targetCount; + + try { + this.scaleToTarget(targetCount); + } catch (error) { + this.logger.error("Failed to scale consumer pool", { error }); + // Revert target on failure + this.metrics.targetConsumerCount = previousTarget; + } finally { + this.isScaling = false; + } + } + + private calculateTargetConsumerCount(): number { + return this.scalingStrategy.calculateTargetCount(this.consumers.size); + } + + private scaleToTarget(targetCount: number) { + const actualCurrentCount = this.consumers.size; + + if (targetCount > actualCurrentCount) { + // Scale up + const count = targetCount - actualCurrentCount; + this.addConsumers(count); + this.promMetrics?.recordScalingOperation("up", this.scalingStrategy.name, count); + } else if (targetCount < actualCurrentCount) { + // Scale down + const count = actualCurrentCount - targetCount; + this.removeConsumers(count); + this.promMetrics?.recordScalingOperation("down", this.scalingStrategy.name, count); + } + + this.metrics.lastScaleTime = new Date(); + + // Update Prometheus state metrics + this.promMetrics?.updateState({ + consumerCount: this.consumers.size, + queueLength: this.metrics.queueLength, + smoothedQueueLength: this.metrics.smoothedQueueLength, + targetConsumerCount: targetCount, + strategy: this.scalingStrategy.name, + }); + } + + private addConsumers(count: number) { + const newConsumers: QueueConsumer[] = []; + + for (let i = 0; i < count; i++) { + const consumerId = `consumer-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`; + + const consumer = this.consumerFactory({ + ...this.consumerOptions, + onDequeue: async (messages) => { + // Always update queue length, default to 0 for empty dequeues or missing value + this.updateQueueLength(messages[0]?.workerQueueLength ?? 0); + + // Forward to the original handler + await this.consumerOptions.onDequeue(messages); + }, + }); + + this.consumers.set(consumerId, consumer); + newConsumers.push(consumer); + } + + // Start all new consumers + newConsumers.forEach((c) => c.start()); + + this.logger.info("Added consumers", { + count, + totalConsumers: this.consumers.size, + }); + } + + private removeConsumers(count: number) { + const allIds = Array.from(this.consumers.keys()); + const consumerIds = allIds.slice(-count); // Take from the end + const consumersToStop: QueueConsumer[] = []; + + for (const id of consumerIds) { + const consumer = this.consumers.get(id); + if (consumer) { + consumersToStop.push(consumer); + this.consumers.delete(id); + } + } + + // Stop removed consumers + consumersToStop.forEach((c) => c.stop()); + + this.logger.info("Removed consumers", { + count: consumersToStop.length, + totalConsumers: this.consumers.size, + }); + } + + /** + * Get current pool metrics for monitoring + */ + getMetrics(): Readonly { + return { ...this.metrics }; + } + + /** + * Get current number of consumers in the pool + */ + get size(): number { + return this.consumers.size; + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts new file mode 100644 index 0000000000..8f65fa0775 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts @@ -0,0 +1,160 @@ +import { Counter, Gauge, Histogram, Registry } from "prom-client"; + +export interface ConsumerPoolMetricsOptions { + register?: Registry; + prefix?: string; +} + +export class ConsumerPoolMetrics { + private readonly register: Registry; + private readonly prefix: string; + + // Current state metrics + public readonly consumerCount: Gauge; + public readonly queueLength: Gauge; + public readonly smoothedQueueLength: Gauge; + public readonly targetConsumerCount: Gauge; + public readonly scalingStrategy: Gauge; + + // Scaling operation metrics + public readonly scalingOperationsTotal: Counter; + public readonly consumersAddedTotal: Counter; + public readonly consumersRemovedTotal: Counter; + public readonly scalingCooldownsApplied: Counter; + + // Performance metrics + public readonly queueLengthUpdatesTotal: Counter; + public readonly batchesProcessedTotal: Counter; + + constructor(opts: ConsumerPoolMetricsOptions = {}) { + this.register = opts.register ?? new Registry(); + this.prefix = opts.prefix ?? "queue_consumer_pool"; + + // Current state metrics + this.consumerCount = new Gauge({ + name: `${this.prefix}_consumer_count`, + help: "Current number of active queue consumers", + labelNames: ["strategy"], + registers: [this.register], + }); + + this.queueLength = new Gauge({ + name: `${this.prefix}_queue_length`, + help: "Current queue length (median of recent samples)", + registers: [this.register], + }); + + this.smoothedQueueLength = new Gauge({ + name: `${this.prefix}_smoothed_queue_length`, + help: "EWMA smoothed queue length", + registers: [this.register], + }); + + this.targetConsumerCount = new Gauge({ + name: `${this.prefix}_target_consumer_count`, + help: "Target number of consumers calculated by scaling strategy", + labelNames: ["strategy"], + registers: [this.register], + }); + + this.scalingStrategy = new Gauge({ + name: `${this.prefix}_scaling_strategy_info`, + help: "Information about the active scaling strategy (1 = active, 0 = inactive)", + labelNames: ["strategy"], + registers: [this.register], + }); + + // Scaling operation metrics + this.scalingOperationsTotal = new Counter({ + name: `${this.prefix}_scaling_operations_total`, + help: "Total number of scaling operations performed", + labelNames: ["direction", "strategy"], + registers: [this.register], + }); + + this.consumersAddedTotal = new Counter({ + name: `${this.prefix}_consumers_added_total`, + help: "Total number of consumers added", + registers: [this.register], + }); + + this.consumersRemovedTotal = new Counter({ + name: `${this.prefix}_consumers_removed_total`, + help: "Total number of consumers removed", + registers: [this.register], + }); + + this.scalingCooldownsApplied = new Counter({ + name: `${this.prefix}_scaling_cooldowns_applied_total`, + help: "Number of times scaling was prevented due to cooldown", + labelNames: ["direction"], + registers: [this.register], + }); + + this.queueLengthUpdatesTotal = new Counter({ + name: `${this.prefix}_queue_length_updates_total`, + help: "Total number of queue length updates received", + registers: [this.register], + }); + + this.batchesProcessedTotal = new Counter({ + name: `${this.prefix}_batches_processed_total`, + help: "Total number of metric batches processed", + registers: [this.register], + }); + } + + /** + * Update all gauge metrics with current state + */ + updateState(state: { + consumerCount: number; + queueLength?: number; + smoothedQueueLength: number; + targetConsumerCount: number; + strategy: string; + }) { + this.consumerCount.set({ strategy: state.strategy }, state.consumerCount); + + if (state.queueLength !== undefined) { + this.queueLength.set(state.queueLength); + } + + this.smoothedQueueLength.set(state.smoothedQueueLength); + this.targetConsumerCount.set({ strategy: state.strategy }, state.targetConsumerCount); + + // Set strategy info (1 for active strategy, 0 for others) + ["none", "smooth", "aggressive"].forEach((s) => { + this.scalingStrategy.set({ strategy: s }, s === state.strategy ? 1 : 0); + }); + } + + /** + * Record a scaling operation + */ + recordScalingOperation(direction: "up" | "down" | "none", strategy: string, count: number) { + if (direction !== "none") { + this.scalingOperationsTotal.inc({ direction, strategy }); + + if (direction === "up") { + this.consumersAddedTotal.inc(count); + } else { + this.consumersRemovedTotal.inc(count); + } + } + } + + /** + * Record that scaling was prevented by cooldown + */ + recordCooldownApplied(direction: "up" | "down") { + this.scalingCooldownsApplied.inc({ direction }); + } + + /** + * Record a queue length update + */ + recordQueueLengthUpdate() { + this.queueLengthUpdatesTotal.inc(); + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 6eb5572bf3..4379eb54f3 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -3,7 +3,12 @@ import { SupervisorHttpClient } from "./http.js"; import { WorkerApiDequeueResponseBody } from "./schemas.js"; import { PreDequeueFn, PreSkipFn } from "./types.js"; -type RunQueueConsumerOptions = { +export interface QueueConsumer { + start(): void; + stop(): void; +} + +export type RunQueueConsumerOptions = { client: SupervisorHttpClient; intervalMs: number; idleIntervalMs: number; @@ -13,7 +18,7 @@ type RunQueueConsumerOptions = { onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; }; -export class RunQueueConsumer { +export class RunQueueConsumer implements QueueConsumer { private readonly client: SupervisorHttpClient; private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; @@ -131,7 +136,7 @@ export class RunQueueConsumer { this.scheduleNextDequeue(nextIntervalMs); } - scheduleNextDequeue(delayMs: number) { + private scheduleNextDequeue(delayMs: number) { if (delayMs === this.idleIntervalMs && this.idleIntervalMs !== this.intervalMs) { this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts new file mode 100644 index 0000000000..ac6a19a048 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.test.ts @@ -0,0 +1,371 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { QueueMetricsProcessor } from "./queueMetricsProcessor.js"; + +describe("QueueMetricsProcessor", () => { + let processor: QueueMetricsProcessor; + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe("Constructor validation", () => { + it("should throw error for invalid ewmaAlpha", () => { + expect(() => new QueueMetricsProcessor({ ewmaAlpha: -0.1, batchWindowMs: 1000 })).toThrow( + "ewmaAlpha must be between 0 and 1" + ); + + expect(() => new QueueMetricsProcessor({ ewmaAlpha: 1.1, batchWindowMs: 1000 })).toThrow( + "ewmaAlpha must be between 0 and 1" + ); + }); + + it("should throw error for invalid batchWindowMs", () => { + expect(() => new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 0 })).toThrow( + "batchWindowMs must be positive" + ); + + expect(() => new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: -100 })).toThrow( + "batchWindowMs must be positive" + ); + }); + + it("should accept valid parameters", () => { + expect(() => new QueueMetricsProcessor({ ewmaAlpha: 0, batchWindowMs: 1 })).not.toThrow(); + expect(() => new QueueMetricsProcessor({ ewmaAlpha: 1, batchWindowMs: 5000 })).not.toThrow(); + }); + }); + + describe("Sample collection", () => { + beforeEach(() => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + }); + + it("should collect samples without limit", () => { + for (let i = 0; i < 100; i++) { + processor.addSample(i); + } + + expect(processor.getCurrentSampleCount()).toBe(100); + expect(processor.getCurrentSamples()).toHaveLength(100); + }); + + it("should throw error for negative queue lengths", () => { + expect(() => processor.addSample(-1)).toThrow("Queue length cannot be negative"); + }); + + it("should accept zero queue length", () => { + expect(() => processor.addSample(0)).not.toThrow(); + expect(processor.getCurrentSampleCount()).toBe(1); + }); + + it("should handle empty queue with all zero samples", () => { + processor.addSample(0); + processor.addSample(0); + processor.addSample(0); + + const result = processor.processBatch(); + expect(result).not.toBeNull(); + expect(result!.median).toBe(0); + expect(result!.smoothedValue).toBe(0); + expect(processor.getSmoothedValue()).toBe(0); + }); + + it("should properly transition from zero to non-zero queue", () => { + // Start with empty queue + processor.addSample(0); + processor.addSample(0); + let result = processor.processBatch(); + expect(result!.median).toBe(0); + expect(result!.smoothedValue).toBe(0); + + // Queue starts filling + processor.addSample(10); + processor.addSample(15); + result = processor.processBatch(); + expect(result!.median).toBeGreaterThan(0); + // EWMA: 0.3 * median + 0.7 * 0 + expect(result!.smoothedValue).toBeGreaterThan(0); + }); + + it("should properly transition from non-zero to zero queue", () => { + // Start with non-empty queue + processor.addSample(10); + processor.addSample(15); + let result = processor.processBatch(); + const initialSmoothed = result!.smoothedValue; + expect(initialSmoothed).toBeGreaterThan(0); + + // Queue becomes empty + processor.addSample(0); + processor.addSample(0); + processor.addSample(0); + result = processor.processBatch(); + expect(result!.median).toBe(0); + // EWMA should gradually decrease: 0.3 * 0 + 0.7 * initialSmoothed + expect(result!.smoothedValue).toBe(0.7 * initialSmoothed); + }); + }); + + describe("Batch processing timing", () => { + beforeEach(() => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + }); + + it("should not process batch before window expires", () => { + processor.addSample(10, 1000); + + expect(processor.shouldProcessBatch(1500)).toBe(false); // 500ms later + expect(processor.shouldProcessBatch(1999)).toBe(false); // 999ms later + }); + + it("should process batch when window expires", () => { + processor.addSample(10, 1000); + + expect(processor.shouldProcessBatch(2000)).toBe(true); // 1000ms later + expect(processor.shouldProcessBatch(2500)).toBe(true); // 1500ms later + }); + + it("should not process empty batch", () => { + expect(processor.shouldProcessBatch(5000)).toBe(false); + }); + }); + + describe("EWMA calculation", () => { + it("should initialize with first value", () => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + + processor.addSample(10); + const result = processor.processBatch(); + + expect(result).not.toBeNull(); + expect(result!.median).toBe(10); + expect(result!.smoothedValue).toBe(10); + expect(processor.getSmoothedValue()).toBe(10); + }); + + it("should apply EWMA formula correctly", () => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + + // First batch: smoothed = 10 + processor.addSample(10); + processor.processBatch(); + expect(processor.getSmoothedValue()).toBe(10); + + // Second batch: smoothed = 0.3 * 20 + 0.7 * 10 = 6 + 7 = 13 + processor.addSample(20); + processor.processBatch(); + expect(processor.getSmoothedValue()).toBe(13); + + // Third batch: smoothed = 0.3 * 5 + 0.7 * 13 = 1.5 + 9.1 = 10.6 + processor.addSample(5); + processor.processBatch(); + expect(processor.getSmoothedValue()).toBe(10.6); + }); + + it("should test different alpha values", () => { + // High alpha (0.8) - more responsive + const highAlphaProcessor = new QueueMetricsProcessor({ ewmaAlpha: 0.8, batchWindowMs: 1000 }); + highAlphaProcessor.addSample(10); + highAlphaProcessor.processBatch(); + highAlphaProcessor.addSample(20); + highAlphaProcessor.processBatch(); + + // Low alpha (0.1) - more smoothing + const lowAlphaProcessor = new QueueMetricsProcessor({ ewmaAlpha: 0.1, batchWindowMs: 1000 }); + lowAlphaProcessor.addSample(10); + lowAlphaProcessor.processBatch(); + lowAlphaProcessor.addSample(20); + lowAlphaProcessor.processBatch(); + + // High alpha should be closer to recent value (20) + expect(highAlphaProcessor.getSmoothedValue()).toBeCloseTo(18); // 0.8 * 20 + 0.2 * 10 = 18 + // Low alpha should be closer to previous value (10) + expect(lowAlphaProcessor.getSmoothedValue()).toBeCloseTo(11); // 0.1 * 20 + 0.9 * 10 = 11 + }); + }); + + describe("Median filtering", () => { + beforeEach(() => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + }); + + it("should calculate median of single sample", () => { + processor.addSample(42); + + const result = processor.processBatch(); + expect(result!.median).toBe(42); + expect(result!.sampleCount).toBe(1); + expect(result!.smoothedValue).toBe(42); // First batch initializes to median + }); + + it("should calculate median of odd number of samples", () => { + processor.addSample(1); + processor.addSample(10); + processor.addSample(5); + + const result = processor.processBatch(); + expect(result!.median).toBe(5); + }); + + it("should calculate median of even number of samples", () => { + processor.addSample(1); + processor.addSample(10); + processor.addSample(5); + processor.addSample(8); + + const result = processor.processBatch(); + // With even count, we average the two middle values + // Sorted: [1, 5, 8, 10], median = (5 + 8) / 2 = 6.5 + expect(result!.median).toBe(6.5); + }); + + it("should filter outliers using median", () => { + // Add mostly low values with one outlier + processor.addSample(5); + processor.addSample(5); + processor.addSample(5); + processor.addSample(100); // outlier + processor.addSample(5); + + const result = processor.processBatch(); + // Sorted: [5, 5, 5, 5, 100], median = 5 (filters out outlier) + expect(result!.median).toBe(5); + }); + }); + + describe("Batch result", () => { + beforeEach(() => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + }); + + it("should return comprehensive batch result", () => { + processor.addSample(10); + processor.addSample(20); + processor.addSample(15); + + const result = processor.processBatch(); + + expect(result).not.toBeNull(); + expect(result!.median).toBe(15); + expect(result!.smoothedValue).toBe(15); // First batch + expect(result!.sampleCount).toBe(3); + expect(result!.samples).toEqual([10, 20, 15]); + }); + + it("should return null for empty batch", () => { + const result = processor.processBatch(); + expect(result).toBeNull(); + }); + + it("should clear samples after processing", () => { + processor.addSample(10); + processor.addSample(20); + + expect(processor.getCurrentSampleCount()).toBe(2); + + processor.processBatch(); + + expect(processor.getCurrentSampleCount()).toBe(0); + expect(processor.getCurrentSamples()).toHaveLength(0); + }); + }); + + describe("Reset functionality", () => { + beforeEach(() => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + }); + + it("should reset all state", () => { + processor.addSample(10); + processor.processBatch(); + processor.addSample(20); + + expect(processor.getSmoothedValue()).toBe(10); + expect(processor.getCurrentSampleCount()).toBe(1); + + processor.reset(); + + expect(processor.getSmoothedValue()).toBe(0); + expect(processor.getCurrentSampleCount()).toBe(0); + expect(processor.getCurrentSamples()).toHaveLength(0); + }); + + it("should reinitialize correctly after reset", () => { + // Process some data + processor.addSample(10); + processor.processBatch(); + processor.addSample(20); + processor.processBatch(); + + processor.reset(); + + // Should initialize with first value again + processor.addSample(30); + const result = processor.processBatch(); + + expect(result!.smoothedValue).toBe(30); + expect(processor.getSmoothedValue()).toBe(30); + }); + }); + + describe("Configuration", () => { + it("should return configuration", () => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.5, batchWindowMs: 2000 }); + + const config = processor.getConfig(); + expect(config.ewmaAlpha).toBe(0.5); + expect(config.batchWindowMs).toBe(2000); + }); + }); + + describe("Real-world simulation", () => { + it("should handle high-frequency samples from multiple consumers", () => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + + // Simulate 40 consumers reporting queue lengths within 1 second + const baseTime = 1000; + for (let i = 0; i < 40; i++) { + const queueLength = 100 - i * 2; // Queue decreasing as consumers work + processor.addSample(queueLength, baseTime + i * 25); // Spread over 1 second + } + + const result = processor.processBatch(baseTime + 1000); + + expect(result).not.toBeNull(); + expect(result!.sampleCount).toBe(40); + // Median should be around middle values (queue lengths 60-80) + expect(result!.median).toBeGreaterThanOrEqual(60); + expect(result!.median).toBeLessThanOrEqual(80); + }); + + it("should demonstrate EWMA smoothing over time", () => { + processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + + const results = []; + + // Simulate queue spike and recovery + const scenarios = [ + { samples: [5, 5, 5], expected: 5 }, // Baseline + { samples: [50, 50, 50], expected: 18.5 }, // Spike: 0.3 * 50 + 0.7 * 5 = 18.5 + { samples: [5, 5, 5], expected: 10.05 }, // Recovery: 0.3 * 5 + 0.7 * 18.5 = 14.45 + ]; + + for (const scenario of scenarios) { + for (const sample of scenario.samples) { + processor.addSample(sample); + } + const result = processor.processBatch(); + results.push(result!.smoothedValue); + } + + // Should show gradual change due to EWMA smoothing + expect(results[0]).toBe(5); // Initial + expect(results[1]).toBeCloseTo(18.5); // Spike response + expect(results[2]).toBeCloseTo(14.45); // Gradual recovery + }); + }); +}); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts new file mode 100644 index 0000000000..5f628f432c --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts @@ -0,0 +1,209 @@ +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; + +export interface QueueMetricsProcessorOptions { + /** + * EWMA smoothing factor (0-1) + * Lower values = more smoothing, less reactive + * Higher values = more responsive to recent changes + */ + ewmaAlpha: number; + + /** + * Batch window duration in milliseconds + * Samples within this window are collected and processed together + */ + batchWindowMs: number; +} + +export interface BatchProcessingResult { + /** Median of samples in the batch */ + median: number; + + /** EWMA-smoothed value after processing this batch */ + smoothedValue: number; + + /** Number of samples processed in this batch */ + sampleCount: number; + + /** Raw samples that were processed */ + samples: readonly number[]; +} + +/** + * Processes queue length samples using exponential weighted moving average (EWMA) + * for smoothing and median filtering for outlier resistance. + * + * Collects samples within a batch window, calculates median to filter outliers, + * then applies EWMA smoothing for stable trend tracking. + */ +export class QueueMetricsProcessor { + private readonly ewmaAlpha: number; + private readonly batchWindowMs: number; + private readonly logger = new SimpleStructuredLogger("queue-metrics-processor"); + + private samples: number[] = []; + private smoothedValue: number = 0; + private lastBatchTime: number = 0; + private isInitialized: boolean = false; + + constructor(options: QueueMetricsProcessorOptions) { + if (options.ewmaAlpha < 0 || options.ewmaAlpha > 1) { + throw new Error("ewmaAlpha must be between 0 and 1"); + } + if (options.batchWindowMs <= 0) { + throw new Error("batchWindowMs must be positive"); + } + + this.ewmaAlpha = options.ewmaAlpha; + this.batchWindowMs = options.batchWindowMs; + } + + /** + * Adds a sample to the current batch + */ + addSample(value: number, timestamp: number = Date.now()): void { + if (value < 0) { + throw new Error("Queue length cannot be negative"); + } + + this.samples.push(value); + + // Update last batch time on first sample + if (this.samples.length === 1) { + this.lastBatchTime = timestamp; + } + } + + /** + * Checks if enough time has passed to process the current batch + */ + shouldProcessBatch(currentTime: number = Date.now()): boolean { + if (this.samples.length === 0) { + return false; + } + + return currentTime - this.lastBatchTime >= this.batchWindowMs; + } + + private calculateMedian(samples: number[]): number | null { + const sortedSamples = [...samples].sort((a, b) => a - b); + const mid = Math.floor(sortedSamples.length / 2); + + if (sortedSamples.length % 2 === 1) { + // Odd length: use middle value + const median = sortedSamples[mid]; + + if (median === undefined) { + this.logger.error("Invalid median calculated from odd samples", { + sortedSamples, + mid, + median, + }); + return null; + } + + return median; + } else { + // Even length: average two middle values + const lowMid = sortedSamples[mid - 1]; + const highMid = sortedSamples[mid]; + + if (lowMid === undefined || highMid === undefined) { + this.logger.error("Invalid median calculated from even samples", { + sortedSamples, + mid, + lowMid, + highMid, + }); + return null; + } + + const median = (lowMid + highMid) / 2; + return median; + } + } + + /** + * Processes the current batch of samples and returns the result. + * Clears the samples array and updates the smoothed value. + * + * Returns null if there are no samples to process. + */ + processBatch(currentTime: number = Date.now()): BatchProcessingResult | null { + if (this.samples.length === 0) { + // No samples to process + return null; + } + + // Calculate median of samples to filter outliers + const median = this.calculateMedian(this.samples); + if (median === null) { + // We already logged a more specific error message + return null; + } + + // Update EWMA smoothed value + if (!this.isInitialized) { + // First value - initialize with median + this.smoothedValue = median; + this.isInitialized = true; + } else { + // Apply EWMA: s_t = α * x_t + (1 - α) * s_(t-1) + this.smoothedValue = this.ewmaAlpha * median + (1 - this.ewmaAlpha) * this.smoothedValue; + } + + const result: BatchProcessingResult = { + median, + smoothedValue: this.smoothedValue, + sampleCount: this.samples.length, + samples: Object.freeze([...this.samples]), + }; + + // Clear samples for next batch + this.samples = []; + this.lastBatchTime = currentTime; + + return result; + } + + /** + * Gets the current smoothed value without processing a batch + */ + getSmoothedValue(): number { + return this.smoothedValue; + } + + /** + * Gets the number of samples in the current batch + */ + getCurrentSampleCount(): number { + return this.samples.length; + } + + /** + * Gets the current samples (for testing/debugging) + */ + getCurrentSamples(): readonly number[] { + return Object.freeze([...this.samples]); + } + + /** + * Resets the processor state + */ + reset(): void { + this.samples = []; + this.smoothedValue = 0; + this.lastBatchTime = 0; + this.isInitialized = false; + } + + /** + * Gets processor configuration + */ + getConfig(): Readonly { + return { + ewmaAlpha: this.ewmaAlpha, + batchWindowMs: this.batchWindowMs, + }; + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts b/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts new file mode 100644 index 0000000000..3d6f4062e5 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts @@ -0,0 +1,293 @@ +import { describe, it, expect } from "vitest"; +import { + NoneScalingStrategy, + SmoothScalingStrategy, + AggressiveScalingStrategy, + ScalingStrategyOptions, +} from "./scalingStrategies.js"; +import { QueueMetricsProcessor } from "./queueMetricsProcessor.js"; + +describe("Scaling Strategies", () => { + const baseOptions: ScalingStrategyOptions = { + minConsumerCount: 1, + maxConsumerCount: 20, + targetRatio: 1.0, + }; + + function createMetricsProcessor(smoothedValue: number): QueueMetricsProcessor { + const processor = new QueueMetricsProcessor({ ewmaAlpha: 0.3, batchWindowMs: 1000 }); + // Initialize processor with the target smoothed value + processor.addSample(smoothedValue); + processor.processBatch(); + return processor; + } + + describe("NoneScalingStrategy", () => { + const strategy = new NoneScalingStrategy(baseOptions); + + it("should always return current count (static mode)", () => { + expect(strategy.calculateTargetCount(5)).toBe(5); + expect(strategy.calculateTargetCount(1)).toBe(1); + expect(strategy.calculateTargetCount(10)).toBe(10); + // Clamping still applies + expect(strategy.calculateTargetCount(25)).toBe(20); // Clamped to max + expect(strategy.calculateTargetCount(0)).toBe(1); // Clamped to min + }); + + it("should have correct name", () => { + expect(strategy.name).toBe("none"); + }); + + it("should handle zero current count", () => { + // Should clamp to minConsumerCount + const result = strategy.calculateTargetCount(0); + expect(result).toBe(1); + }); + }); + + describe("SmoothScalingStrategy", () => { + it("should calculate target based on smoothed queue length", () => { + const metricsProcessor = createMetricsProcessor(10); // smoothed value = 10 + const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor }); + + // With targetRatio=1.0, target consumers = ceil(10/1.0) = 10 + // With dampingFactor=0.7 and currentCount=5: + // dampedTarget = 5 + (10 - 5) * 0.7 = 5 + 3.5 = 8.5 → 9 + const result = strategy.calculateTargetCount(5); + expect(result).toBe(9); + }); + + it("should apply damping factor correctly", () => { + const metricsProcessor = createMetricsProcessor(20); // smoothed value = 20 + const strategy = new SmoothScalingStrategy({ + ...baseOptions, + metricsProcessor, + dampingFactor: 0.5, + }); // 50% damping + + // With targetRatio=1.0, target consumers = ceil(20/1.0) = 20 + // With dampingFactor=0.5 and currentCount=5: + // dampedTarget = 5 + (20 - 5) * 0.5 = 5 + 7.5 = 12.5 → 13 + const result = strategy.calculateTargetCount(5); + expect(result).toBe(13); + }); + + it("should handle zero current count", () => { + const metricsProcessor = createMetricsProcessor(5); + const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor }); + + // With smoothedQueueLength=5, targetRatio=1.0: + // targetConsumers = ceil(5/1.0) = 5 + // dampedTarget = 0 + (5 - 0) * 0.7 = 3.5 → 4 + const result = strategy.calculateTargetCount(0); + expect(result).toBe(4); + }); + + it("should validate damping factor", () => { + const metricsProcessor = createMetricsProcessor(10); + expect( + () => + new SmoothScalingStrategy({ + ...baseOptions, + metricsProcessor, + dampingFactor: -0.1, + }) + ).toThrow("dampingFactor must be between 0 and 1"); + + expect( + () => + new SmoothScalingStrategy({ + ...baseOptions, + metricsProcessor, + dampingFactor: 1.1, + }) + ).toThrow("dampingFactor must be between 0 and 1"); + + expect( + () => + new SmoothScalingStrategy({ + ...baseOptions, + metricsProcessor, + dampingFactor: 0, + }) + ).not.toThrow(); + + expect( + () => + new SmoothScalingStrategy({ + ...baseOptions, + metricsProcessor, + dampingFactor: 1, + }) + ).not.toThrow(); + }); + + it("should handle zero current count", () => { + const metricsProcessor = createMetricsProcessor(10); + const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor }); + + // With smoothedQueueLength=10, targetRatio=1.0: + // targetConsumers = ceil(10/1.0) = 10 + // dampedTarget = 0 + (10 - 0) * 0.7 = 7 + const result = strategy.calculateTargetCount(0); + expect(result).toBe(7); + }); + }); + + describe("AggressiveScalingStrategy", () => { + it("should scale down when under-utilized", () => { + // queuePerConsumer = 2/5 = 0.4, scaleDownThreshold = 1.0 * 0.5 = 0.5 + // Under-utilized since 0.4 < 0.5 + const metricsProcessor = createMetricsProcessor(2); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + const result = strategy.calculateTargetCount(5); + expect(result).toBeLessThan(5); + expect(result).toBeGreaterThanOrEqual(baseOptions.minConsumerCount); + }); + + it("should maintain count when in optimal zone", () => { + // queuePerConsumer = 5/5 = 1.0 + // Optimal zone: 0.5 < 1.0 < 2.0 + const metricsProcessor = createMetricsProcessor(5); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + const result = strategy.calculateTargetCount(5); + expect(result).toBe(5); + }); + + it("should scale up when over-utilized", () => { + // queuePerConsumer = 15/5 = 3.0, scaleUpThreshold = 1.0 * 2.0 = 2.0 + // Over-utilized since 3.0 > 2.0 + const metricsProcessor = createMetricsProcessor(15); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + const result = strategy.calculateTargetCount(5); + expect(result).toBeGreaterThan(5); + expect(result).toBeLessThanOrEqual(baseOptions.maxConsumerCount); + }); + + it("should scale aggressively for critical load", () => { + // queuePerConsumer = 25/5 = 5.0 (critical: 5x target ratio) + const metricsProcessor = createMetricsProcessor(25); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + const result = strategy.calculateTargetCount(5); + // Should apply 50% scale factor: ceil(5 * 1.5) = 8 + // But capped by 50% max increment: 5 + ceil(5 * 0.5) = 5 + 3 = 8 + expect(result).toBe(8); + }); + + it("should respect max consumer count", () => { + const metricsProcessor = createMetricsProcessor(50); // Very high load + const strategy = new AggressiveScalingStrategy({ + ...baseOptions, + maxConsumerCount: 6, + metricsProcessor, + }); + + const result = strategy.calculateTargetCount(5); + expect(result).toBeLessThanOrEqual(6); + }); + + it("should respect min consumer count", () => { + const metricsProcessor = createMetricsProcessor(0.1); // Very low load + const strategy = new AggressiveScalingStrategy({ + ...baseOptions, + minConsumerCount: 3, + metricsProcessor, + }); + + const result = strategy.calculateTargetCount(5); + expect(result).toBeGreaterThanOrEqual(3); + }); + + it("should return thresholds", () => { + const metricsProcessor = createMetricsProcessor(10); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + const thresholds = strategy.getThresholds(1.0); + expect(thresholds).toEqual({ + scaleDownThreshold: 0.5, + scaleUpThreshold: 2.0, + criticalThreshold: 5.0, + highThreshold: 3.0, + }); + }); + + it("should handle zero current count without division by zero", () => { + const metricsProcessor = createMetricsProcessor(10); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + // Should use (currentCount || 1) to prevent division by zero + // queuePerConsumer = 10 / 1 = 10 (not 10 / 0) + // This is over-utilized (10 > 2.0), should scale up + const result = strategy.calculateTargetCount(0); + expect(result).toBeGreaterThan(0); + expect(result).toBeLessThanOrEqual(baseOptions.maxConsumerCount); + }); + + it("should handle zero queue with zero consumers", () => { + const metricsProcessor = createMetricsProcessor(0); + const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor }); + + // queuePerConsumer = 0 / 1 = 0 + // This is under-utilized (0 < 0.5), should scale down + // But already at 0, so should return minConsumerCount + const result = strategy.calculateTargetCount(0); + expect(result).toBe(baseOptions.minConsumerCount); + }); + }); + + describe("Integration scenarios", () => { + it("should handle gradual load increase with smooth strategy", () => { + const metricsProcessor = createMetricsProcessor(2); + const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor }); + let currentCount = 2; + + // Gradual increase: 2 → 6 → 10 → 15 + const loads = [2, 6, 10, 15]; + const results = []; + + for (const load of loads) { + // Update the processor with the new load + metricsProcessor.addSample(load); + metricsProcessor.processBatch(); + const target = strategy.calculateTargetCount(currentCount); + results.push(target); + currentCount = target; + } + + // Should show gradual increase due to damping + expect(results[0]).toBeLessThan(results[1]!); + expect(results[1]).toBeLessThan(results[2]!); + expect(results[2]).toBeLessThan(results[3]!); + + // But not immediate jumps due to damping + expect(results[1]! - results[0]!).toBeLessThan(loads[1]! - loads[0]!); + }); + + it("should handle load spike with aggressive strategy", () => { + let currentCount = 3; + + // Sudden spike from normal to critical + const normalLoad = 3; // queuePerConsumer = 1.0 (optimal) + const spikeLoad = 15; // queuePerConsumer = 5.0 (critical) + + const normalProcessor = createMetricsProcessor(normalLoad); + const normalStrategy = new AggressiveScalingStrategy({ + ...baseOptions, + metricsProcessor: normalProcessor, + }); + const normalTarget = normalStrategy.calculateTargetCount(currentCount); + expect(normalTarget).toBe(3); // Should maintain + + const spikeProcessor = createMetricsProcessor(spikeLoad); + const spikeStrategy = new AggressiveScalingStrategy({ + ...baseOptions, + metricsProcessor: spikeProcessor, + }); + const spikeTarget = spikeStrategy.calculateTargetCount(currentCount); + expect(spikeTarget).toBeGreaterThan(3); // Should scale up aggressively + }); + }); +}); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts b/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts new file mode 100644 index 0000000000..665c689d64 --- /dev/null +++ b/packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.ts @@ -0,0 +1,186 @@ +import { QueueMetricsProcessor } from "./queueMetricsProcessor.js"; + +export type ScalingStrategyKind = "none" | "smooth" | "aggressive"; + +export interface ScalingStrategyOptions { + metricsProcessor?: QueueMetricsProcessor; + dampingFactor?: number; + minConsumerCount: number; + maxConsumerCount: number; + targetRatio: number; +} + +export abstract class ScalingStrategy { + abstract readonly name: string; + + private readonly minConsumerCount: number; + private readonly maxConsumerCount: number; + + protected readonly targetRatio: number; + + constructor(options?: ScalingStrategyOptions) { + this.minConsumerCount = options?.minConsumerCount ?? 1; + this.maxConsumerCount = options?.maxConsumerCount ?? 10; + this.targetRatio = options?.targetRatio ?? 1; + } + + /** + * Calculates the target consumer count with clamping to min/max bounds + * Uses template method pattern to ensure consistent clamping across all strategies + */ + calculateTargetCount(currentCount: number): number { + const targetCount = this.calculateTargetCountInternal(currentCount); + + // Apply consistent clamping to all strategies + return Math.min(Math.max(targetCount, this.minConsumerCount), this.maxConsumerCount); + } + + /** + * Internal method for subclasses to implement their specific scaling logic + * Should return the unclamped target count + */ + protected abstract calculateTargetCountInternal(currentCount: number): number; + + /** + * Creates a scaling strategy by name + */ + static create(strategy: ScalingStrategyKind, options?: ScalingStrategyOptions): ScalingStrategy { + switch (strategy) { + case "none": + return new NoneScalingStrategy(options); + + case "smooth": + return new SmoothScalingStrategy(options); + + case "aggressive": + return new AggressiveScalingStrategy(options); + + default: + throw new Error(`Unknown scaling strategy: ${strategy}`); + } + } +} + +/** + * Static scaling strategy - maintains a fixed number of consumers + */ +export class NoneScalingStrategy extends ScalingStrategy { + readonly name = "none"; + + constructor(options?: ScalingStrategyOptions) { + super(options); + } + + protected calculateTargetCountInternal(currentCount: number): number { + return currentCount; + } +} + +/** + * Smooth scaling strategy with EWMA smoothing and damping + * Uses exponentially weighted moving average for queue length smoothing + * and applies damping to prevent rapid oscillations. + */ +export class SmoothScalingStrategy extends ScalingStrategy { + readonly name = "smooth"; + private readonly dampingFactor: number; + private readonly metricsProcessor: QueueMetricsProcessor; + + constructor(options?: ScalingStrategyOptions) { + super(options); + const dampingFactor = options?.dampingFactor ?? 0.7; + if (dampingFactor < 0 || dampingFactor > 1) { + throw new Error("dampingFactor must be between 0 and 1"); + } + if (!options?.metricsProcessor) { + throw new Error("metricsProcessor is required for smooth scaling strategy"); + } + this.dampingFactor = dampingFactor; + this.metricsProcessor = options.metricsProcessor; + } + + protected calculateTargetCountInternal(currentCount: number): number { + const smoothedQueueLength = this.metricsProcessor.getSmoothedValue(); + + // Calculate target consumers based on the configured ratio + const targetConsumers = Math.ceil(smoothedQueueLength / this.targetRatio); + + // Apply damping factor to smooth out changes + // This prevents oscillation by only moving toward the target gradually + const dampedTarget = currentCount + (targetConsumers - currentCount) * this.dampingFactor; + + // Return rounded value without clamping (handled by base class) + return Math.round(dampedTarget); + } +} + +/** + * Aggressive scaling strategy with threshold-based zones + * Uses threshold-based zones for different scaling behaviors. + * Scales up quickly when load increases but scales down cautiously. + */ +export class AggressiveScalingStrategy extends ScalingStrategy { + readonly name = "aggressive"; + private readonly metricsProcessor: QueueMetricsProcessor; + + constructor(options?: ScalingStrategyOptions) { + super(options); + if (!options?.metricsProcessor) { + throw new Error("metricsProcessor is required for aggressive scaling strategy"); + } + this.metricsProcessor = options.metricsProcessor; + } + + protected calculateTargetCountInternal(currentCount: number): number { + const smoothedQueueLength = this.metricsProcessor.getSmoothedValue(); + + // Calculate queue items per consumer, + const queuePerConsumer = smoothedQueueLength / (currentCount || 1); + + // Define zones based on targetRatio + // Optimal zone: 0.5x to 2x the target ratio + const scaleDownThreshold = this.targetRatio * 0.5; + const scaleUpThreshold = this.targetRatio * 2.0; + + if (queuePerConsumer < scaleDownThreshold) { + // Zone 1: Under-utilized (< 0.5x target ratio) + // Scale down gradually to avoid removing too many consumers + const reductionFactor = Math.max(0.9, 1 - (scaleDownThreshold - queuePerConsumer) * 0.1); + // Return without min clamping (handled by base class) + return Math.floor(currentCount * reductionFactor); + } else if (queuePerConsumer > scaleUpThreshold) { + // Zone 3: Over-utilized (> 2x target ratio) + // Scale up aggressively based on queue pressure + let scaleFactor: number; + if (queuePerConsumer >= this.targetRatio * 5) { + // Critical: Queue is 5x target ratio or higher + scaleFactor = 1.5; // 50% increase + } else if (queuePerConsumer >= this.targetRatio * 3) { + // High: Queue is 3x target ratio + scaleFactor = 1.3; // 30% increase + } else { + // Moderate: Queue is 2x target ratio + scaleFactor = 1.1; // 10% increase + } + + const targetCount = Math.ceil(currentCount * scaleFactor); + // Cap increase at 50% to prevent overshooting + const maxIncrement = Math.ceil(currentCount * 0.5); + // Return without max clamping (handled by base class) + return Math.min(currentCount + maxIncrement, targetCount); + } else { + // Zone 2: Optimal (0.5x - 2x target ratio) + // Maintain current consumer count + return currentCount; + } + } + + getThresholds(targetRatio: number) { + return { + scaleDownThreshold: targetRatio * 0.5, + scaleUpThreshold: targetRatio * 2.0, + criticalThreshold: targetRatio * 5.0, + highThreshold: targetRatio * 3.0, + }; + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index a458a9cd4c..e5a783b8d4 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -1,7 +1,7 @@ import { SupervisorHttpClient } from "./http.js"; import { PreDequeueFn, PreSkipFn, SupervisorClientCommonOptions } from "./types.js"; import { WorkerApiDequeueResponseBody, WorkerApiHeartbeatRequestBody } from "./schemas.js"; -import { RunQueueConsumer } from "./queueConsumer.js"; +import { RunQueueConsumerPool, ScalingOptions } from "./consumerPool.js"; import { WorkerEvents } from "./events.js"; import EventEmitter from "events"; import { VERSION } from "../../../version.js"; @@ -10,6 +10,7 @@ import { WorkerClientToServerEvents, WorkerServerToClientEvents } from "../types import { getDefaultWorkerHeaders } from "./util.js"; import { IntervalService } from "../../utils/interval.js"; import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; +import type { Registry } from "prom-client"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { queueConsumerEnabled?: boolean; @@ -20,8 +21,9 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; - maxConsumerCount?: number; sendRunDebugLogs?: boolean; + scaling: ScalingOptions; + metricsRegistry?: Registry; }; export class SupervisorSession extends EventEmitter { @@ -33,7 +35,7 @@ export class SupervisorSession extends EventEmitter { private runNotificationsSocket?: Socket; private readonly queueConsumerEnabled: boolean; - private readonly queueConsumers: RunQueueConsumer[]; + private readonly consumerPool: RunQueueConsumerPool; private readonly heartbeat: IntervalService; @@ -44,8 +46,9 @@ export class SupervisorSession extends EventEmitter { this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true; this.httpClient = new SupervisorHttpClient(opts); - this.queueConsumers = Array.from({ length: opts.maxConsumerCount ?? 1 }, () => { - return new RunQueueConsumer({ + + this.consumerPool = new RunQueueConsumerPool({ + consumer: { client: this.httpClient, preDequeue: opts.preDequeue, preSkip: opts.preSkip, @@ -53,7 +56,9 @@ export class SupervisorSession extends EventEmitter { intervalMs: opts.dequeueIntervalMs, idleIntervalMs: opts.dequeueIdleIntervalMs, maxRunCount: opts.maxRunCount, - }); + }, + scaling: opts.scaling, + metricsRegistry: opts.metricsRegistry, }); this.heartbeat = new IntervalService({ @@ -179,8 +184,13 @@ export class SupervisorSession extends EventEmitter { }); if (this.queueConsumerEnabled) { - this.logger.log("Queue consumer enabled"); - await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); + this.logger.log("Queue consumer enabled", { + scalingStrategy: this.consumerPool["scalingStrategy"], + minConsumers: this.consumerPool["minConsumerCount"], + maxConsumers: this.consumerPool["maxConsumerCount"], + }); + + await this.consumerPool.start(); this.heartbeat.start(); } else { this.logger.warn("Queue consumer disabled"); @@ -195,7 +205,7 @@ export class SupervisorSession extends EventEmitter { } async stop() { - await Promise.allSettled(this.queueConsumers.map(async (q) => q.stop())); + await this.consumerPool.stop(); this.heartbeat.stop(); this.runNotificationsSocket?.disconnect(); }