diff --git a/src/queue/client.ts b/src/queue/client.ts index ebac700b..74cb99f0 100644 --- a/src/queue/client.ts +++ b/src/queue/client.ts @@ -5,7 +5,8 @@ * Only loaded when REDIS_URL is set (production dashboard container). */ -import { type ConnectionOptions, Queue } from 'bullmq'; +import { Queue } from 'bullmq'; +import { parseRedisUrl } from '../utils/redis.js'; // ── Job types ──────────────────────────────────────────────────────────────── @@ -41,15 +42,6 @@ export type DashboardJob = ManualRunJob | RetryRunJob | DebugAnalysisJob; const QUEUE_NAME = 'cascade-dashboard-jobs'; -function parseRedisUrl(url: string): ConnectionOptions { - const parsed = new URL(url); - return { - host: parsed.hostname, - port: Number(parsed.port) || 6379, - password: parsed.password || undefined, - }; -} - let queue: Queue | null = null; function getQueue(): Queue { diff --git a/src/router/bullmq-workers.ts b/src/router/bullmq-workers.ts new file mode 100644 index 00000000..4a7aa659 --- /dev/null +++ b/src/router/bullmq-workers.ts @@ -0,0 +1,65 @@ +/** + * BullMQ worker factory for CASCADE queue consumers. + * + * Provides a `createQueueWorker` factory that de-duplicates the event handler + * boilerplate shared across all queue workers (completed/failed/error logging + * and Sentry capture). + */ + +import { type ConnectionOptions, type Job, Worker } from 'bullmq'; +import { captureException } from '../sentry.js'; +import { parseRedisUrl } from '../utils/redis.js'; + +// Re-export so existing callers (worker-manager.ts) don't need to change imports. +export { parseRedisUrl }; + +export interface QueueWorkerConfig { + queueName: string; + /** Human-readable label used in log messages and Sentry tags */ + label: string; + connection: ConnectionOptions; + concurrency: number; + lockDuration: number; + processFn: (job: Job) => Promise; +} + +/** + * Factory that creates a BullMQ Worker with standard event handlers. + * + * All cascade queue workers share the same completed/failed/error handling + * pattern — this factory de-duplicates that boilerplate while keeping + * per-queue differences (name, label, processFn) configurable. + */ +export function createQueueWorker(config: QueueWorkerConfig): Worker { + const { queueName, label, connection, concurrency, lockDuration, processFn } = config; + + const worker = new Worker(queueName, processFn, { + connection, + concurrency, + lockDuration, + }); + + worker.on('completed', (job) => { + console.log(`[WorkerManager] ${label} dispatched:`, { jobId: job.id }); + }); + + worker.on('failed', (job, err) => { + console.error(`[WorkerManager] ${label} failed to dispatch:`, { + jobId: job?.id, + error: String(err), + }); + captureException(err, { + tags: { source: 'bullmq_dispatch', queue: queueName }, + extra: { jobId: job?.id }, + }); + }); + + worker.on('error', (err) => { + console.error(`[WorkerManager] ${label} worker error:`, err); + captureException(err, { + tags: { source: 'bullmq_error', queue: queueName }, + }); + }); + + return worker; +} diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts new file mode 100644 index 00000000..a1fbf469 --- /dev/null +++ b/src/router/container-manager.ts @@ -0,0 +1,323 @@ +/** + * Docker container lifecycle management for CASCADE worker processes. + * + * Handles spawning, monitoring, killing, and tracking of worker containers. + * Each BullMQ job gets its own isolated Docker container. + */ + +import type { Job } from 'bullmq'; +import Docker from 'dockerode'; +import { findProjectByRepo, getAllProjectCredentials } from '../config/provider.js'; +import { captureException } from '../sentry.js'; +import { routerConfig } from './config.js'; +import { notifyTimeout } from './notifications.js'; +import type { CascadeJob } from './queue.js'; + +const docker = new Docker(); + +export interface ActiveWorker { + containerId: string; + jobId: string; + startedAt: Date; + timeoutHandle: NodeJS.Timeout; + job: CascadeJob; +} + +const activeWorkers = new Map(); + +/** + * Extract projectId from job data for credential resolution. + * Different job types have the projectId in different locations. + * + * Note: Dashboard jobs (manual-run, retry-run, debug-analysis) come through + * cascade-dashboard-jobs queue and are cast to CascadeJob for spawning. + */ +export async function extractProjectIdFromJob(data: CascadeJob): Promise { + // Use type assertion since dashboard jobs are cast to CascadeJob + const jobData = data as unknown as { type: string; projectId?: string; repoFullName?: string }; + + if (jobData.type === 'trello' || jobData.type === 'jira') { + return jobData.projectId ?? null; + } + if (jobData.type === 'github') { + if (!jobData.repoFullName) return null; + const project = await findProjectByRepo(jobData.repoFullName); + return project?.id ?? null; + } + if (jobData.type === 'manual-run' || jobData.type === 'debug-analysis') { + return jobData.projectId ?? null; + } + if (jobData.type === 'retry-run') { + // Retry jobs now include projectId from the API + return jobData.projectId ?? null; + } + return null; +} + +/** + * Build environment variables for a worker container. + * Resolves project credentials and forwards required infrastructure env vars. + */ +export async function buildWorkerEnv(job: Job): Promise { + const env: string[] = [ + `JOB_ID=${job.id}`, + `JOB_TYPE=${job.data.type}`, + `JOB_DATA=${JSON.stringify(job.data)}`, + // Redis for job completion reporting + `REDIS_URL=${routerConfig.redisUrl}`, + // Database connection + `CASCADE_POSTGRES_HOST=${process.env.CASCADE_POSTGRES_HOST || 'postgres'}`, + `CASCADE_POSTGRES_PORT=${process.env.CASCADE_POSTGRES_PORT || '5432'}`, + // Database connection for config + `DATABASE_URL=${process.env.DATABASE_URL || ''}`, + // Logging + `LOG_LEVEL=${process.env.LOG_LEVEL || 'info'}`, + ]; + + // Resolve project credentials in the router and set as individual env vars. + // NOTE: CREDENTIAL_MASTER_KEY is intentionally NOT passed to workers. + const projectId = await extractProjectIdFromJob(job.data); + if (projectId) { + try { + const secrets = await getAllProjectCredentials(projectId); + for (const [key, value] of Object.entries(secrets)) { + env.push(`${key}=${value}`); + } + env.push(`CASCADE_CREDENTIAL_KEYS=${Object.keys(secrets).join(',')}`); + } catch (err) { + console.warn('[WorkerManager] Failed to resolve credentials for project:', { + projectId, + error: String(err), + }); + captureException(err, { + tags: { source: 'credential_resolution' }, + extra: { projectId }, + level: 'warning', + }); + } + } + + // CLAUDE_CODE_OAUTH_TOKEN is for the Claude Code backend (subscription auth). + if (process.env.CLAUDE_CODE_OAUTH_TOKEN) + env.push(`CLAUDE_CODE_OAUTH_TOKEN=${process.env.CLAUDE_CODE_OAUTH_TOKEN}`); + + // Forward Sentry env vars so worker containers report to the same project. + if (process.env.SENTRY_DSN) env.push(`SENTRY_DSN=${process.env.SENTRY_DSN}`); + if (process.env.SENTRY_ENVIRONMENT) + env.push(`SENTRY_ENVIRONMENT=${process.env.SENTRY_ENVIRONMENT}`); + if (process.env.SENTRY_RELEASE) env.push(`SENTRY_RELEASE=${process.env.SENTRY_RELEASE}`); + + return env; +} + +/** + * Spawn a worker container for a job. + * Sets up timeout tracking and monitors container exit asynchronously. + */ +export async function spawnWorker(job: Job): Promise { + const jobId = job.id ?? `unknown-${Date.now()}`; + const containerName = `cascade-worker-${jobId}`; + + const workerEnv = await buildWorkerEnv(job); + const hasCredentials = workerEnv.some((e) => e.startsWith('CASCADE_CREDENTIAL_KEYS=')); + + console.log('[WorkerManager] Spawning worker:', { + jobId, + type: job.data.type, + containerName, + hasCredentials, + }); + + try { + const container = await docker.createContainer({ + Image: routerConfig.workerImage, + name: containerName, + Env: workerEnv, + HostConfig: { + Memory: routerConfig.workerMemoryMb * 1024 * 1024, + MemorySwap: routerConfig.workerMemoryMb * 1024 * 1024, // No swap + NetworkMode: routerConfig.dockerNetwork, + AutoRemove: true, // Clean up container on exit + }, + Labels: { + 'cascade.job.id': jobId, + 'cascade.job.type': job.data.type, + 'cascade.managed': 'true', + }, + }); + + await container.start(); + + // Set up timeout + const startedAt = new Date(); + const timeoutHandle = setTimeout(() => { + const durationMs = Date.now() - startedAt.getTime(); + console.warn('[WorkerManager] Worker timeout, killing:', { + jobId, + durationMs, + }); + captureException(new Error(`Worker timeout after ${durationMs}ms`), { + tags: { source: 'worker_timeout', jobType: job.data.type }, + extra: { jobId, durationMs }, + level: 'warning', + }); + killWorker(jobId).catch((err) => { + console.error('[WorkerManager] Failed to kill timed-out worker:', err); + }); + }, routerConfig.workerTimeoutMs); + + // Track the worker + activeWorkers.set(jobId, { + containerId: container.id, + jobId, + startedAt, + timeoutHandle, + job: job.data, + }); + + console.log('[WorkerManager] Worker started:', { + jobId, + containerId: container.id.slice(0, 12), + }); + + // Monitor container exit + container + .wait() + .then(async (result) => { + // Collect worker logs before auto-removal + try { + const logs = await container.logs({ + stdout: true, + stderr: true, + follow: false, + }); + const logText = logs.toString('utf-8'); + if (logText.trim()) { + const lines = logText.trim().split('\n'); + const tail = lines.slice(-50).join('\n'); + console.log( + `[WorkerManager] Worker logs (last ${Math.min(lines.length, 50)} of ${lines.length} lines):\n${tail}`, + ); + } + } catch { + // Container may already be removed — expected with AutoRemove + } + + if (result.StatusCode !== 0) { + captureException(new Error(`Worker exited with status ${result.StatusCode}`), { + tags: { source: 'worker_exit', jobType: job.data.type }, + extra: { jobId, statusCode: result.StatusCode }, + }); + } + console.log('[WorkerManager] Worker exited:', { + jobId, + statusCode: result.StatusCode, + }); + cleanupWorker(jobId); + }) + .catch((err) => { + console.error('[WorkerManager] Error waiting for container:', err); + captureException(err, { + tags: { source: 'worker_wait', jobType: job.data.type }, + extra: { jobId }, + }); + cleanupWorker(jobId); + }); + } catch (err) { + console.error('[WorkerManager] Failed to spawn worker:', { + jobId, + error: String(err), + }); + captureException(err, { + tags: { source: 'worker_spawn', jobType: job.data.type }, + extra: { jobId }, + }); + throw err; + } +} + +/** + * Kill a worker container with two-phase shutdown: + * 1. SIGTERM via container.stop(t=15) — gives agent watchdog 15s to clean up + * 2. Docker auto-escalates to SIGKILL after 15s + * 3. Router posts its own timeout notification + */ +export async function killWorker(jobId: string): Promise { + const worker = activeWorkers.get(jobId); + if (!worker) return; + + try { + const container = docker.getContainer(worker.containerId); + await container.stop({ t: 15 }); + console.log('[WorkerManager] Worker stopped:', { jobId }); + } catch (err) { + // Container might already be stopped + console.warn('[WorkerManager] Error stopping worker (may already be stopped):', { + jobId, + error: String(err), + }); + } + + // Send timeout notification (fire-and-forget) + const durationMs = Date.now() - worker.startedAt.getTime(); + notifyTimeout(worker.job, { + jobId: worker.jobId, + startedAt: worker.startedAt, + durationMs, + }).catch((err) => { + console.error('[WorkerManager] Timeout notification error:', String(err)); + }); + + cleanupWorker(jobId); +} + +/** + * Clean up worker tracking state (timeout handle + map entry). + */ +export function cleanupWorker(jobId: string): void { + const worker = activeWorkers.get(jobId); + if (worker) { + clearTimeout(worker.timeoutHandle); + activeWorkers.delete(jobId); + console.log('[WorkerManager] Worker cleaned up:', { + jobId, + activeWorkers: activeWorkers.size, + }); + } +} + +/** + * Get number of currently active worker containers. + */ +export function getActiveWorkerCount(): number { + return activeWorkers.size; +} + +/** + * Get summary info for currently active workers. + */ +export function getActiveWorkers(): Array<{ jobId: string; startedAt: Date }> { + return Array.from(activeWorkers.values()).map((w) => ({ + jobId: w.jobId, + startedAt: w.startedAt, + })); +} + +/** + * Detach from all active workers on shutdown. + * Workers continue running as independent containers. + * Clears timeout handles so the router process can exit cleanly. + */ +export function detachAll(): void { + if (activeWorkers.size > 0) { + console.log('[WorkerManager] Detaching from active workers (will continue running):', { + count: activeWorkers.size, + workers: Array.from(activeWorkers.keys()), + }); + } + + for (const [, worker] of activeWorkers) { + clearTimeout(worker.timeoutHandle); + } + activeWorkers.clear(); +} diff --git a/src/router/queue.ts b/src/router/queue.ts index 92dbc92a..9437bde0 100644 --- a/src/router/queue.ts +++ b/src/router/queue.ts @@ -1,19 +1,10 @@ -import { type ConnectionOptions, Queue } from 'bullmq'; +import { Queue } from 'bullmq'; import { captureException } from '../sentry.js'; import type { TriggerResult } from '../types/index.js'; import { logger } from '../utils/logging.js'; +import { parseRedisUrl } from '../utils/redis.js'; import { routerConfig } from './config.js'; -// Parse Redis URL to connection options -function parseRedisUrl(url: string): ConnectionOptions { - const parsed = new URL(url); - return { - host: parsed.hostname, - port: Number(parsed.port) || 6379, - password: parsed.password || undefined, - }; -} - const connection = parseRedisUrl(routerConfig.redisUrl); // Job types diff --git a/src/router/worker-manager.ts b/src/router/worker-manager.ts index 558e3638..080282d0 100644 --- a/src/router/worker-manager.ts +++ b/src/router/worker-manager.ts @@ -1,387 +1,70 @@ -import { type Job, Worker } from 'bullmq'; -import Docker from 'dockerode'; -import { findProjectByRepo, getAllProjectCredentials } from '../config/provider.js'; -import { captureException } from '../sentry.js'; -import { routerConfig } from './config.js'; -import { notifyTimeout } from './notifications.js'; -import type { CascadeJob } from './queue.js'; - -const docker = new Docker(); - -interface ActiveWorker { - containerId: string; - jobId: string; - startedAt: Date; - timeoutHandle: NodeJS.Timeout; - job: CascadeJob; -} - -const activeWorkers = new Map(); - /** - * Extract projectId from job data for credential resolution. - * Different job types have the projectId in different locations. + * Orchestrator for CASCADE worker processing. + * + * Wires together BullMQ queue consumers (bullmq-workers.ts) and Docker + * container lifecycle management (container-manager.ts). * - * Note: Dashboard jobs (manual-run, retry-run, debug-analysis) come through - * cascade-dashboard-jobs queue and are cast to CascadeJob for spawning. + * Public API is unchanged — all consumers continue importing from this module. */ -async function extractProjectIdFromJob(data: CascadeJob): Promise { - // Use type assertion since dashboard jobs are cast to CascadeJob - const jobData = data as unknown as { type: string; projectId?: string; repoFullName?: string }; - - if (jobData.type === 'trello' || jobData.type === 'jira') { - return jobData.projectId ?? null; - } - if (jobData.type === 'github') { - if (!jobData.repoFullName) return null; - const project = await findProjectByRepo(jobData.repoFullName); - return project?.id ?? null; - } - if (jobData.type === 'manual-run' || jobData.type === 'debug-analysis') { - return jobData.projectId ?? null; - } - if (jobData.type === 'retry-run') { - // Retry jobs now include projectId from the API - return jobData.projectId ?? null; - } - return null; -} - -// Build environment variables for worker container -async function buildWorkerEnv(job: Job): Promise { - const env: string[] = [ - `JOB_ID=${job.id}`, - `JOB_TYPE=${job.data.type}`, - `JOB_DATA=${JSON.stringify(job.data)}`, - // Redis for job completion reporting - `REDIS_URL=${routerConfig.redisUrl}`, - // Database connection - `CASCADE_POSTGRES_HOST=${process.env.CASCADE_POSTGRES_HOST || 'postgres'}`, - `CASCADE_POSTGRES_PORT=${process.env.CASCADE_POSTGRES_PORT || '5432'}`, - // Database connection for config - `DATABASE_URL=${process.env.DATABASE_URL || ''}`, - // Logging - `LOG_LEVEL=${process.env.LOG_LEVEL || 'info'}`, - ]; - - // Resolve project credentials in the router and set as individual env vars. - // NOTE: CREDENTIAL_MASTER_KEY is intentionally NOT passed to workers. - const projectId = await extractProjectIdFromJob(job.data); - if (projectId) { - try { - const secrets = await getAllProjectCredentials(projectId); - for (const [key, value] of Object.entries(secrets)) { - env.push(`${key}=${value}`); - } - env.push(`CASCADE_CREDENTIAL_KEYS=${Object.keys(secrets).join(',')}`); - } catch (err) { - console.warn('[WorkerManager] Failed to resolve credentials for project:', { - projectId, - error: String(err), - }); - captureException(err, { - tags: { source: 'credential_resolution' }, - extra: { projectId }, - level: 'warning', - }); - } - } - - // CLAUDE_CODE_OAUTH_TOKEN is for the Claude Code backend (subscription auth). - if (process.env.CLAUDE_CODE_OAUTH_TOKEN) - env.push(`CLAUDE_CODE_OAUTH_TOKEN=${process.env.CLAUDE_CODE_OAUTH_TOKEN}`); - - // Forward Sentry env vars so worker containers report to the same project. - if (process.env.SENTRY_DSN) env.push(`SENTRY_DSN=${process.env.SENTRY_DSN}`); - if (process.env.SENTRY_ENVIRONMENT) - env.push(`SENTRY_ENVIRONMENT=${process.env.SENTRY_ENVIRONMENT}`); - if (process.env.SENTRY_RELEASE) env.push(`SENTRY_RELEASE=${process.env.SENTRY_RELEASE}`); - - return env; -} - -// Spawn a worker container for a job -async function spawnWorker(job: Job): Promise { - const jobId = job.id ?? `unknown-${Date.now()}`; - const containerName = `cascade-worker-${jobId}`; - - const workerEnv = await buildWorkerEnv(job); - const hasCredentials = workerEnv.some((e) => e.startsWith('CASCADE_CREDENTIAL_KEYS=')); - - console.log('[WorkerManager] Spawning worker:', { - jobId, - type: job.data.type, - containerName, - hasCredentials, - }); - - try { - const container = await docker.createContainer({ - Image: routerConfig.workerImage, - name: containerName, - Env: workerEnv, - HostConfig: { - Memory: routerConfig.workerMemoryMb * 1024 * 1024, - MemorySwap: routerConfig.workerMemoryMb * 1024 * 1024, // No swap - NetworkMode: routerConfig.dockerNetwork, - AutoRemove: true, // Clean up container on exit - }, - Labels: { - 'cascade.job.id': jobId, - 'cascade.job.type': job.data.type, - 'cascade.managed': 'true', - }, - }); - - await container.start(); - - // Set up timeout - const startedAt = new Date(); - const timeoutHandle = setTimeout(() => { - const durationMs = Date.now() - startedAt.getTime(); - console.warn('[WorkerManager] Worker timeout, killing:', { - jobId, - durationMs, - }); - captureException(new Error(`Worker timeout after ${durationMs}ms`), { - tags: { source: 'worker_timeout', jobType: job.data.type }, - extra: { jobId, durationMs }, - level: 'warning', - }); - killWorker(jobId).catch((err) => { - console.error('[WorkerManager] Failed to kill timed-out worker:', err); - }); - }, routerConfig.workerTimeoutMs); - - // Track the worker - activeWorkers.set(jobId, { - containerId: container.id, - jobId, - startedAt, - timeoutHandle, - job: job.data, - }); - - console.log('[WorkerManager] Worker started:', { - jobId, - containerId: container.id.slice(0, 12), - }); - - // Monitor container exit - container - .wait() - .then(async (result) => { - // Collect worker logs before auto-removal - try { - const logs = await container.logs({ - stdout: true, - stderr: true, - follow: false, - }); - const logText = logs.toString('utf-8'); - if (logText.trim()) { - const lines = logText.trim().split('\n'); - const tail = lines.slice(-50).join('\n'); - console.log( - `[WorkerManager] Worker logs (last ${Math.min(lines.length, 50)} of ${lines.length} lines):\n${tail}`, - ); - } - } catch { - // Container may already be removed — expected with AutoRemove - } - - if (result.StatusCode !== 0) { - captureException(new Error(`Worker exited with status ${result.StatusCode}`), { - tags: { source: 'worker_exit', jobType: job.data.type }, - extra: { jobId, statusCode: result.StatusCode }, - }); - } - console.log('[WorkerManager] Worker exited:', { - jobId, - statusCode: result.StatusCode, - }); - cleanupWorker(jobId); - }) - .catch((err) => { - console.error('[WorkerManager] Error waiting for container:', err); - captureException(err, { - tags: { source: 'worker_wait', jobType: job.data.type }, - extra: { jobId }, - }); - cleanupWorker(jobId); - }); - } catch (err) { - console.error('[WorkerManager] Failed to spawn worker:', { - jobId, - error: String(err), - }); - captureException(err, { - tags: { source: 'worker_spawn', jobType: job.data.type }, - extra: { jobId }, - }); - throw err; - } -} -// Kill a worker container with two-phase shutdown: -// 1. SIGTERM via container.stop(t=15) — gives agent watchdog 15s to clean up -// 2. Docker auto-escalates to SIGKILL after 15s -// 3. Router posts its own timeout notification -async function killWorker(jobId: string): Promise { - const worker = activeWorkers.get(jobId); - if (!worker) return; - - try { - const container = docker.getContainer(worker.containerId); - await container.stop({ t: 15 }); - console.log('[WorkerManager] Worker stopped:', { jobId }); - } catch (err) { - // Container might already be stopped - console.warn('[WorkerManager] Error stopping worker (may already be stopped):', { - jobId, - error: String(err), - }); - } +import type { Job, Worker } from 'bullmq'; +import { createQueueWorker, parseRedisUrl } from './bullmq-workers.js'; +import { routerConfig } from './config.js'; +import { + detachAll, + getActiveWorkerCount, + getActiveWorkers, + spawnWorker, +} from './container-manager.js'; +import type { CascadeJob } from './queue.js'; - // Send timeout notification (fire-and-forget) - const durationMs = Date.now() - worker.startedAt.getTime(); - notifyTimeout(worker.job, { - jobId: worker.jobId, - startedAt: worker.startedAt, - durationMs, - }).catch((err) => { - console.error('[WorkerManager] Timeout notification error:', String(err)); - }); +// Re-export container-manager public API so existing callers are unaffected. +export { getActiveWorkerCount, getActiveWorkers }; - cleanupWorker(jobId); -} +// BullMQ Workers that process jobs by spawning containers +let bullWorker: Worker | null = null; +let dashboardWorker: Worker | null = null; -// Clean up worker tracking -function cleanupWorker(jobId: string): void { - const worker = activeWorkers.get(jobId); - if (worker) { - clearTimeout(worker.timeoutHandle); - activeWorkers.delete(jobId); - console.log('[WorkerManager] Worker cleaned up:', { - jobId, - activeWorkers: activeWorkers.size, - }); +/** Guard that enforces the per-router concurrency cap before spawning. */ +async function guardedSpawn(job: Job): Promise { + // Check if we have capacity. + // This shouldn't happen with proper concurrency settings, + // but just in case, throw to retry later. + if (getActiveWorkerCount() >= routerConfig.maxWorkers) { + throw new Error('No worker slots available'); } + await spawnWorker(job); + // Note: We don't wait for the container to complete here. + // The job is considered "processed" once the container starts. + // Container exit is handled asynchronously. } -// Get active worker count -export function getActiveWorkerCount(): number { - return activeWorkers.size; -} - -// Get active worker info -export function getActiveWorkers(): Array<{ jobId: string; startedAt: Date }> { - return Array.from(activeWorkers.values()).map((w) => ({ - jobId: w.jobId, - startedAt: w.startedAt, - })); -} - -// BullMQ Worker that processes jobs by spawning containers -let bullWorker: Worker | null = null; -let dashboardWorker: Worker | null = null; - export function startWorkerProcessor(): void { if (bullWorker) { console.warn('[WorkerManager] Worker processor already started'); return; } - const redisConnection = { - host: new URL(routerConfig.redisUrl).hostname, - port: Number(new URL(routerConfig.redisUrl).port) || 6379, - }; - - bullWorker = new Worker( - 'cascade-jobs', - async (job) => { - // Check if we have capacity - if (activeWorkers.size >= routerConfig.maxWorkers) { - // This shouldn't happen with proper concurrency settings, - // but just in case, throw to retry later - throw new Error('No worker slots available'); - } - - await spawnWorker(job); - - // Note: We don't wait for the container to complete here. - // The job is considered "processed" once the container starts. - // Container exit is handled asynchronously. - }, - { - connection: redisConnection, - concurrency: routerConfig.maxWorkers, - // Lock jobs for the timeout duration plus buffer - lockDuration: routerConfig.workerTimeoutMs + 60000, - }, - ); - - bullWorker.on('completed', (job) => { - console.log('[WorkerManager] Job dispatched:', { jobId: job.id }); - }); - - bullWorker.on('failed', (job, err) => { - console.error('[WorkerManager] Job failed to dispatch:', { - jobId: job?.id, - error: String(err), - }); - captureException(err, { - tags: { source: 'bullmq_dispatch', queue: 'cascade-jobs' }, - extra: { jobId: job?.id }, - }); - }); + const connection = parseRedisUrl(routerConfig.redisUrl); - bullWorker.on('error', (err) => { - console.error('[WorkerManager] Worker error:', err); - captureException(err, { - tags: { source: 'bullmq_error', queue: 'cascade-jobs' }, - }); + bullWorker = createQueueWorker({ + queueName: 'cascade-jobs', + label: 'Job', + connection, + concurrency: routerConfig.maxWorkers, + lockDuration: routerConfig.workerTimeoutMs + 60000, + processFn: guardedSpawn, }); // Dashboard jobs queue — manual runs, retries, debug analyses submitted - // from the dashboard API container - dashboardWorker = new Worker( - 'cascade-dashboard-jobs', - async (job) => { - if (activeWorkers.size >= routerConfig.maxWorkers) { - throw new Error('No worker slots available'); - } - // Dashboard jobs are forwarded as worker containers with the same - // JOB_TYPE / JOB_DATA protocol that worker-entry.ts understands. - await spawnWorker(job as Job); - }, - { - connection: redisConnection, - concurrency: routerConfig.maxWorkers, - lockDuration: routerConfig.workerTimeoutMs + 60000, - }, - ); - - dashboardWorker.on('completed', (job) => { - console.log('[WorkerManager] Dashboard job dispatched:', { jobId: job.id }); - }); - - dashboardWorker.on('failed', (job, err) => { - console.error('[WorkerManager] Dashboard job failed to dispatch:', { - jobId: job?.id, - error: String(err), - }); - captureException(err, { - tags: { source: 'bullmq_dispatch', queue: 'cascade-dashboard-jobs' }, - extra: { jobId: job?.id }, - }); - }); - - dashboardWorker.on('error', (err) => { - console.error('[WorkerManager] Dashboard worker error:', err); - captureException(err, { - tags: { source: 'bullmq_error', queue: 'cascade-dashboard-jobs' }, - }); + // from the dashboard API container. + dashboardWorker = createQueueWorker({ + queueName: 'cascade-dashboard-jobs', + label: 'Dashboard job', + connection, + concurrency: routerConfig.maxWorkers, + lockDuration: routerConfig.workerTimeoutMs + 60000, + processFn: (job) => guardedSpawn(job as Job), }); console.log('[WorkerManager] Started with max', routerConfig.maxWorkers, 'concurrent workers'); @@ -401,18 +84,7 @@ export async function stopWorkerProcessor(): Promise { // Don't kill active workers — they're independent containers that will // finish their jobs and auto-remove. Workers have their own internal // watchdog (src/utils/lifecycle.ts) for timeout enforcement. - if (activeWorkers.size > 0) { - console.log('[WorkerManager] Detaching from active workers (will continue running):', { - count: activeWorkers.size, - workers: Array.from(activeWorkers.keys()), - }); - } - - // Clear timeout handles so the router process can exit cleanly - for (const [, worker] of activeWorkers) { - clearTimeout(worker.timeoutHandle); - } - activeWorkers.clear(); + detachAll(); console.log('[WorkerManager] Stopped'); } diff --git a/src/utils/redis.ts b/src/utils/redis.ts new file mode 100644 index 00000000..50d46c93 --- /dev/null +++ b/src/utils/redis.ts @@ -0,0 +1,20 @@ +/** + * Shared Redis utility functions. + * + * Provides a single implementation of Redis URL parsing used by BullMQ + * consumers across the codebase (router queues, dashboard queue, worker manager). + */ + +import type { ConnectionOptions } from 'bullmq'; + +/** + * Parse a Redis URL string into BullMQ ConnectionOptions. + */ +export function parseRedisUrl(url: string): ConnectionOptions { + const parsed = new URL(url); + return { + host: parsed.hostname, + port: Number(parsed.port) || 6379, + password: parsed.password || undefined, + }; +} diff --git a/tests/unit/router/bullmq-workers.test.ts b/tests/unit/router/bullmq-workers.test.ts new file mode 100644 index 00000000..26d07f82 --- /dev/null +++ b/tests/unit/router/bullmq-workers.test.ts @@ -0,0 +1,186 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Module mocks — factories use vi.fn() directly (no external variable refs) +// --------------------------------------------------------------------------- + +vi.mock('bullmq', () => ({ + Worker: vi.fn().mockImplementation((_queueName, _processFn, _opts) => ({ + on: vi.fn(), + })), +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: vi.fn(), +})); + +// --------------------------------------------------------------------------- +// Imports (after mocks) +// --------------------------------------------------------------------------- + +import { Worker } from 'bullmq'; +import { createQueueWorker, parseRedisUrl } from '../../../src/router/bullmq-workers.js'; +import { captureException } from '../../../src/sentry.js'; + +const MockWorker = vi.mocked(Worker); +const mockCaptureException = vi.mocked(captureException); + +beforeEach(() => { + MockWorker.mockClear(); + mockCaptureException.mockClear(); + // Re-establish default mock so each test gets a fresh mock worker + MockWorker.mockImplementation( + (_queueName, _processFn, _opts) => + ({ + on: vi.fn(), + }) as never, + ); +}); + +// --------------------------------------------------------------------------- +// parseRedisUrl (re-exported from utils/redis.ts) +// --------------------------------------------------------------------------- + +describe('parseRedisUrl', () => { + it('parses a simple redis URL', () => { + const conn = parseRedisUrl('redis://localhost:6379'); + expect(conn).toEqual({ host: 'localhost', port: 6379, password: undefined }); + }); + + it('defaults to port 6379 when no port specified', () => { + const conn = parseRedisUrl('redis://localhost'); + expect(conn).toEqual({ host: 'localhost', port: 6379, password: undefined }); + }); + + it('extracts password from URL', () => { + const conn = parseRedisUrl('redis://:secret@localhost:6379'); + expect(conn.password).toBe('secret'); + expect(conn.host).toBe('localhost'); + expect(conn.port).toBe(6379); + }); +}); + +// --------------------------------------------------------------------------- +// createQueueWorker +// --------------------------------------------------------------------------- + +describe('createQueueWorker', () => { + const processFn = vi.fn().mockResolvedValue(undefined); + const baseConfig = { + queueName: 'test-queue', + label: 'Test job', + connection: { host: 'localhost', port: 6379 }, + concurrency: 3, + lockDuration: 60000, + processFn, + }; + + it('creates a Worker with the supplied config', () => { + createQueueWorker(baseConfig); + + expect(MockWorker).toHaveBeenCalledWith( + 'test-queue', + processFn, + expect.objectContaining({ + connection: { host: 'localhost', port: 6379 }, + concurrency: 3, + lockDuration: 60000, + }), + ); + }); + + it('registers completed, failed, and error event handlers', () => { + const worker = createQueueWorker(baseConfig); + const mockOn = vi.mocked(worker.on); + + const registeredEvents = mockOn.mock.calls.map((call) => call[0]); + expect(registeredEvents).toContain('completed'); + expect(registeredEvents).toContain('failed'); + expect(registeredEvents).toContain('error'); + }); + + it('returns the created Worker instance', () => { + const worker = createQueueWorker(baseConfig); + expect(worker).toBeDefined(); + expect(typeof worker.on).toBe('function'); + }); + + it('completed handler logs with label', () => { + const logSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + const worker = createQueueWorker(baseConfig); + const mockOn = vi.mocked(worker.on); + + // Find and invoke the completed handler + const completedCall = mockOn.mock.calls.find((call) => call[0] === 'completed'); + expect(completedCall).toBeDefined(); + const completedHandler = completedCall?.[1] as (job: { id: string }) => void; + completedHandler({ id: 'job-42' }); + + expect(logSpy).toHaveBeenCalledWith( + expect.stringContaining('Test job'), + expect.objectContaining({ jobId: 'job-42' }), + ); + logSpy.mockRestore(); + }); + + it('failed handler logs error and calls captureException', () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + const worker = createQueueWorker(baseConfig); + const mockOn = vi.mocked(worker.on); + + const failedCall = mockOn.mock.calls.find((call) => call[0] === 'failed'); + expect(failedCall).toBeDefined(); + const failedHandler = failedCall?.[1] as (job: { id: string } | undefined, err: Error) => void; + const err = new Error('dispatch failed'); + failedHandler({ id: 'job-7' }, err); + + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('Test job'), + expect.objectContaining({ jobId: 'job-7' }), + ); + expect(mockCaptureException).toHaveBeenCalledWith( + err, + expect.objectContaining({ + tags: expect.objectContaining({ queue: 'test-queue' }), + }), + ); + errorSpy.mockRestore(); + }); + + it('error handler logs and calls captureException', () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + const worker = createQueueWorker(baseConfig); + const mockOn = vi.mocked(worker.on); + + const errorCall = mockOn.mock.calls.find((call) => call[0] === 'error'); + expect(errorCall).toBeDefined(); + const errorHandler = errorCall?.[1] as (err: Error) => void; + const err = new Error('worker crashed'); + errorHandler(err); + + expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining('Test job'), err); + expect(mockCaptureException).toHaveBeenCalledWith( + err, + expect.objectContaining({ + tags: expect.objectContaining({ source: 'bullmq_error', queue: 'test-queue' }), + }), + ); + errorSpy.mockRestore(); + }); + + it('uses queue name in Sentry tags for failed handler', () => { + const worker = createQueueWorker({ ...baseConfig, queueName: 'my-special-queue' }); + const mockOn = vi.mocked(worker.on); + + const failedCall = mockOn.mock.calls.find((call) => call[0] === 'failed'); + const handler = failedCall?.[1] as (job: { id: string }, err: Error) => void; + handler({ id: 'x' }, new Error('oops')); + + expect(mockCaptureException).toHaveBeenCalledWith( + expect.any(Error), + expect.objectContaining({ + tags: expect.objectContaining({ queue: 'my-special-queue' }), + }), + ); + }); +}); diff --git a/tests/unit/router/container-manager.test.ts b/tests/unit/router/container-manager.test.ts new file mode 100644 index 00000000..4a526b81 --- /dev/null +++ b/tests/unit/router/container-manager.test.ts @@ -0,0 +1,370 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Hoisted mock state — vi.hoisted creates variables before vi.mock factories run +// --------------------------------------------------------------------------- + +const { mockDockerCreateContainer, mockDockerGetContainer } = vi.hoisted(() => ({ + mockDockerCreateContainer: vi.fn(), + mockDockerGetContainer: vi.fn(), +})); + +// --------------------------------------------------------------------------- +// Module-level mocks +// --------------------------------------------------------------------------- + +vi.mock('dockerode', () => ({ + default: vi.fn().mockImplementation(() => ({ + createContainer: mockDockerCreateContainer, + getContainer: mockDockerGetContainer, + })), +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: vi.fn(), +})); + +vi.mock('../../../src/config/provider.js', () => ({ + findProjectByRepo: vi.fn(), + getAllProjectCredentials: vi.fn(), +})); + +vi.mock('../../../src/config/configCache.js', () => ({ + configCache: { + getConfig: vi.fn().mockReturnValue(null), + getProjectByBoardId: vi.fn().mockReturnValue(null), + getProjectByRepo: vi.fn().mockReturnValue(null), + setConfig: vi.fn(), + setProjectByBoardId: vi.fn(), + setProjectByRepo: vi.fn(), + invalidate: vi.fn(), + }, +})); + +vi.mock('../../../src/router/notifications.js', () => ({ + notifyTimeout: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { + redisUrl: 'redis://localhost:6379', + maxWorkers: 3, + workerImage: 'test-worker:latest', + workerMemoryMb: 512, + workerTimeoutMs: 5000, + dockerNetwork: 'test-network', + }, +})); + +// --------------------------------------------------------------------------- +// Imports (after mocks) +// --------------------------------------------------------------------------- + +import { findProjectByRepo, getAllProjectCredentials } from '../../../src/config/provider.js'; +import { + buildWorkerEnv, + cleanupWorker, + detachAll, + extractProjectIdFromJob, + getActiveWorkerCount, + getActiveWorkers, + killWorker, + spawnWorker, +} from '../../../src/router/container-manager.js'; +import { notifyTimeout } from '../../../src/router/notifications.js'; +import type { CascadeJob } from '../../../src/router/queue.js'; + +const mockFindProjectByRepo = vi.mocked(findProjectByRepo); +const mockGetAllProjectCredentials = vi.mocked(getAllProjectCredentials); +const mockNotifyTimeout = vi.mocked(notifyTimeout); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeJob(overrides: Partial<{ id: string; data: CascadeJob }> = {}) { + return { + id: overrides.id ?? 'job-1', + data: overrides.data ?? ({ type: 'trello', projectId: 'proj-1' } as CascadeJob), + }; +} + +function setupMockContainer(exitCode = 0) { + let resolveWait!: (v: { StatusCode: number }) => void; + const waitPromise = new Promise<{ StatusCode: number }>((res) => { + resolveWait = res; + }); + + const container = { + id: 'container-abc123def456', + start: vi.fn().mockResolvedValue(undefined), + wait: vi.fn().mockReturnValue(waitPromise), + logs: vi.fn().mockResolvedValue(Buffer.from('')), + stop: vi.fn().mockResolvedValue(undefined), + }; + + mockDockerCreateContainer.mockResolvedValue(container); + mockDockerGetContainer.mockReturnValue(container); + + return { + container, + resolveWait: (code = exitCode) => resolveWait({ StatusCode: code }), + }; +} + +// --------------------------------------------------------------------------- +// extractProjectIdFromJob +// --------------------------------------------------------------------------- + +describe('extractProjectIdFromJob', () => { + it('returns projectId for trello jobs', async () => { + const job = { type: 'trello', projectId: 'proj-trello' } as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBe('proj-trello'); + }); + + it('returns projectId for jira jobs', async () => { + const job = { type: 'jira', projectId: 'proj-jira' } as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBe('proj-jira'); + }); + + it('returns projectId resolved from repo for github jobs', async () => { + const job = { type: 'github', repoFullName: 'owner/repo' } as CascadeJob; + mockFindProjectByRepo.mockResolvedValue({ id: 'proj-gh' } as never); + expect(await extractProjectIdFromJob(job)).toBe('proj-gh'); + }); + + it('returns null for github jobs with no repoFullName', async () => { + const job = { type: 'github' } as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBeNull(); + }); + + it('returns projectId for manual-run jobs', async () => { + const job = { type: 'manual-run', projectId: 'proj-m' } as unknown as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBe('proj-m'); + }); + + it('returns projectId for retry-run jobs', async () => { + const job = { type: 'retry-run', projectId: 'proj-r' } as unknown as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBe('proj-r'); + }); + + it('returns null for unknown job types', async () => { + const job = { type: 'unknown' } as unknown as CascadeJob; + expect(await extractProjectIdFromJob(job)).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// buildWorkerEnv +// --------------------------------------------------------------------------- + +describe('buildWorkerEnv', () => { + beforeEach(() => { + mockGetAllProjectCredentials.mockResolvedValue({ GITHUB_TOKEN: 'ghp_test' }); + }); + + it('includes JOB_ID, JOB_TYPE, and JOB_DATA', async () => { + const job = makeJob(); + const env = await buildWorkerEnv(job as never); + expect(env).toContain('JOB_ID=job-1'); + expect(env).toContain('JOB_TYPE=trello'); + expect(env.some((e) => e.startsWith('JOB_DATA='))).toBe(true); + }); + + it('includes project credentials and CASCADE_CREDENTIAL_KEYS', async () => { + const env = await buildWorkerEnv(makeJob() as never); + expect(env).toContain('GITHUB_TOKEN=ghp_test'); + expect(env).toContain('CASCADE_CREDENTIAL_KEYS=GITHUB_TOKEN'); + }); + + it('skips credential env vars if credential resolution fails', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + mockGetAllProjectCredentials.mockRejectedValue(new Error('DB error')); + const env = await buildWorkerEnv(makeJob() as never); + expect(env.some((e) => e.startsWith('CASCADE_CREDENTIAL_KEYS='))).toBe(false); + warnSpy.mockRestore(); + }); + + it('forwards SENTRY_DSN when set', async () => { + process.env.SENTRY_DSN = 'https://sentry.example.com/1'; + const env = await buildWorkerEnv(makeJob() as never); + expect(env).toContain('SENTRY_DSN=https://sentry.example.com/1'); + process.env.SENTRY_DSN = undefined; + }); +}); + +// --------------------------------------------------------------------------- +// spawnWorker / getActiveWorkerCount / getActiveWorkers +// --------------------------------------------------------------------------- + +describe('spawnWorker', () => { + beforeEach(() => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + vi.spyOn(console, 'warn').mockImplementation(() => {}); + vi.spyOn(console, 'error').mockImplementation(() => {}); + mockGetAllProjectCredentials.mockResolvedValue({}); + detachAll(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + detachAll(); + }); + + it('creates and starts a container', async () => { + const { container, resolveWait } = setupMockContainer(); + + await spawnWorker(makeJob() as never); + + expect(mockDockerCreateContainer).toHaveBeenCalledWith( + expect.objectContaining({ + Image: 'test-worker:latest', + name: 'cascade-worker-job-1', + }), + ); + expect(container.start).toHaveBeenCalled(); + + resolveWait(); + }); + + it('increments active worker count after spawn', async () => { + const { resolveWait } = setupMockContainer(); + + await spawnWorker(makeJob({ id: 'job-cnt' }) as never); + + expect(getActiveWorkerCount()).toBeGreaterThan(0); + + resolveWait(); + }); + + it('cleans up worker after container exits', async () => { + const { resolveWait } = setupMockContainer(); + + await spawnWorker(makeJob({ id: 'job-exit' }) as never); + expect(getActiveWorkerCount()).toBeGreaterThanOrEqual(1); + + resolveWait(0); + // Let microtasks flush + await new Promise((r) => setTimeout(r, 10)); + + const workers = getActiveWorkers(); + expect(workers.find((w) => w.jobId === 'job-exit')).toBeUndefined(); + }); + + it('throws and does not track worker if container creation fails', async () => { + mockDockerCreateContainer.mockRejectedValue(new Error('Docker unavailable')); + const countBefore = getActiveWorkerCount(); + + await expect(spawnWorker(makeJob({ id: 'job-fail' }) as never)).rejects.toThrow( + 'Docker unavailable', + ); + + expect(getActiveWorkerCount()).toBe(countBefore); + }); +}); + +// --------------------------------------------------------------------------- +// killWorker +// --------------------------------------------------------------------------- + +describe('killWorker', () => { + beforeEach(() => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + vi.spyOn(console, 'warn').mockImplementation(() => {}); + mockGetAllProjectCredentials.mockResolvedValue({}); + mockNotifyTimeout.mockResolvedValue(undefined); + detachAll(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + detachAll(); + }); + + it('is a no-op for an unknown jobId', async () => { + await expect(killWorker('nonexistent')).resolves.toBeUndefined(); + expect(mockDockerGetContainer).not.toHaveBeenCalled(); + }); + + it('stops the container and sends timeout notification', async () => { + const { container, resolveWait } = setupMockContainer(); + + await spawnWorker(makeJob({ id: 'job-kill' }) as never); + await killWorker('job-kill'); + + expect(container.stop).toHaveBeenCalledWith({ t: 15 }); + expect(mockNotifyTimeout).toHaveBeenCalled(); + + resolveWait(); + }); + + it('still sends notification even if container stop fails', async () => { + const { container, resolveWait } = setupMockContainer(); + container.stop.mockRejectedValue(new Error('already stopped')); + + await spawnWorker(makeJob({ id: 'job-already-stopped' }) as never); + await killWorker('job-already-stopped'); + + expect(mockNotifyTimeout).toHaveBeenCalled(); + + resolveWait(); + }); + + it('removes worker from tracking after kill', async () => { + const { resolveWait } = setupMockContainer(); + + await spawnWorker(makeJob({ id: 'job-rm' }) as never); + expect(getActiveWorkers().find((w) => w.jobId === 'job-rm')).toBeDefined(); + + await killWorker('job-rm'); + expect(getActiveWorkers().find((w) => w.jobId === 'job-rm')).toBeUndefined(); + + resolveWait(); + }); +}); + +// --------------------------------------------------------------------------- +// cleanupWorker +// --------------------------------------------------------------------------- + +describe('cleanupWorker', () => { + beforeEach(() => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + detachAll(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + detachAll(); + }); + + it('is a no-op for an unknown jobId', () => { + expect(() => cleanupWorker('nonexistent')).not.toThrow(); + }); +}); + +// --------------------------------------------------------------------------- +// detachAll +// --------------------------------------------------------------------------- + +describe('detachAll', () => { + beforeEach(() => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + mockGetAllProjectCredentials.mockResolvedValue({}); + detachAll(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + detachAll(); + }); + + it('clears all tracked workers', async () => { + setupMockContainer(); + await spawnWorker(makeJob({ id: 'job-d1' }) as never); + expect(getActiveWorkerCount()).toBeGreaterThan(0); + + detachAll(); + expect(getActiveWorkerCount()).toBe(0); + }); +}); diff --git a/tests/unit/router/worker-manager.test.ts b/tests/unit/router/worker-manager.test.ts new file mode 100644 index 00000000..7c585d4c --- /dev/null +++ b/tests/unit/router/worker-manager.test.ts @@ -0,0 +1,223 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Module mocks — all factories use vi.fn() directly (no external variable refs) +// --------------------------------------------------------------------------- + +vi.mock('../../../src/router/bullmq-workers.js', () => ({ + createQueueWorker: vi.fn(), + parseRedisUrl: vi.fn().mockReturnValue({ host: 'localhost', port: 6379 }), +})); + +vi.mock('../../../src/router/container-manager.js', () => ({ + spawnWorker: vi.fn().mockResolvedValue(undefined), + getActiveWorkerCount: vi.fn().mockReturnValue(0), + getActiveWorkers: vi.fn().mockReturnValue([]), + detachAll: vi.fn(), +})); + +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { + redisUrl: 'redis://localhost:6379', + maxWorkers: 3, + workerImage: 'test-worker:latest', + workerMemoryMb: 512, + workerTimeoutMs: 5000, + dockerNetwork: 'test-network', + }, +})); + +// --------------------------------------------------------------------------- +// Imports (after mocks) +// --------------------------------------------------------------------------- + +import { createQueueWorker, parseRedisUrl } from '../../../src/router/bullmq-workers.js'; +import { + detachAll, + getActiveWorkerCount, + getActiveWorkers, + spawnWorker, +} from '../../../src/router/container-manager.js'; +import { + startWorkerProcessor, + stopWorkerProcessor, + getActiveWorkerCount as wmGetActiveWorkerCount, + getActiveWorkers as wmGetActiveWorkers, +} from '../../../src/router/worker-manager.js'; + +const mockCreateQueueWorker = vi.mocked(createQueueWorker); +const mockParseRedisUrl = vi.mocked(parseRedisUrl); +const mockSpawnWorker = vi.mocked(spawnWorker); +const mockGetActiveWorkerCount = vi.mocked(getActiveWorkerCount); +const mockGetActiveWorkers = vi.mocked(getActiveWorkers); +const mockDetachAll = vi.mocked(detachAll); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeMockWorker() { + return { close: vi.fn().mockResolvedValue(undefined) }; +} + +// --------------------------------------------------------------------------- +// Re-exports +// --------------------------------------------------------------------------- + +describe('re-exports', () => { + it('getActiveWorkerCount delegates to container-manager', () => { + mockGetActiveWorkerCount.mockReturnValue(5); + expect(wmGetActiveWorkerCount()).toBe(5); + }); + + it('getActiveWorkers delegates to container-manager', () => { + const workers = [{ jobId: 'j1', startedAt: new Date() }]; + mockGetActiveWorkers.mockReturnValue(workers); + expect(wmGetActiveWorkers()).toBe(workers); + }); +}); + +// --------------------------------------------------------------------------- +// startWorkerProcessor +// --------------------------------------------------------------------------- + +describe('startWorkerProcessor', () => { + beforeEach(async () => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + vi.spyOn(console, 'warn').mockImplementation(() => {}); + mockCreateQueueWorker.mockReturnValue(makeMockWorker() as never); + // Ensure clean state + await stopWorkerProcessor(); + mockCreateQueueWorker.mockClear(); + mockParseRedisUrl.mockClear(); + }); + + afterEach(async () => { + vi.restoreAllMocks(); + await stopWorkerProcessor(); + }); + + it('creates two queue workers (cascade-jobs and cascade-dashboard-jobs)', () => { + startWorkerProcessor(); + + expect(mockCreateQueueWorker).toHaveBeenCalledTimes(2); + const queueNames = mockCreateQueueWorker.mock.calls.map((call) => call[0].queueName); + expect(queueNames).toContain('cascade-jobs'); + expect(queueNames).toContain('cascade-dashboard-jobs'); + }); + + it('passes parsed Redis connection to both workers', () => { + const connection = { host: 'redis-host', port: 6380 }; + mockParseRedisUrl.mockReturnValue(connection); + + startWorkerProcessor(); + + for (const call of mockCreateQueueWorker.mock.calls) { + expect(call[0].connection).toBe(connection); + } + }); + + it('configures maxWorkers as concurrency for both workers', () => { + startWorkerProcessor(); + + for (const call of mockCreateQueueWorker.mock.calls) { + expect(call[0].concurrency).toBe(3); // routerConfig.maxWorkers + } + }); + + it('does not create duplicate workers when called twice', () => { + startWorkerProcessor(); + startWorkerProcessor(); // second call should warn and return early + + expect(mockCreateQueueWorker).toHaveBeenCalledTimes(2); // still only 2 workers total + expect(console.warn).toHaveBeenCalledWith(expect.stringContaining('already started')); + }); + + it('passes a processFn that checks capacity before spawning', async () => { + startWorkerProcessor(); + + // Get the processFn from the cascade-jobs worker call + const cascadeJobsCall = mockCreateQueueWorker.mock.calls.find( + (call) => call[0].queueName === 'cascade-jobs', + ); + expect(cascadeJobsCall).toBeDefined(); + const processFn = cascadeJobsCall?.[0].processFn; + + // When under capacity, spawnWorker should be called + mockGetActiveWorkerCount.mockReturnValue(0); + const fakeJob = { id: 'j1', data: { type: 'trello', projectId: 'p1' } }; + await processFn(fakeJob); + expect(mockSpawnWorker).toHaveBeenCalledWith(fakeJob); + }); + + it('processFn throws when at capacity', async () => { + startWorkerProcessor(); + + const cascadeJobsCall = mockCreateQueueWorker.mock.calls.find( + (call) => call[0].queueName === 'cascade-jobs', + ); + const processFn = cascadeJobsCall?.[0].processFn; + + // At capacity + mockGetActiveWorkerCount.mockReturnValue(3); // equals maxWorkers + const fakeJob = { id: 'j2', data: { type: 'trello', projectId: 'p1' } }; + await expect(processFn(fakeJob)).rejects.toThrow('No worker slots available'); + expect(mockSpawnWorker).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// stopWorkerProcessor +// --------------------------------------------------------------------------- + +describe('stopWorkerProcessor', () => { + beforeEach(async () => { + vi.spyOn(console, 'log').mockImplementation(() => {}); + vi.spyOn(console, 'warn').mockImplementation(() => {}); + mockCreateQueueWorker.mockReturnValue(makeMockWorker() as never); + await stopWorkerProcessor(); // ensure clean state + mockCreateQueueWorker.mockClear(); + }); + + afterEach(async () => { + vi.restoreAllMocks(); + await stopWorkerProcessor(); + }); + + it('closes both workers', async () => { + const worker1 = makeMockWorker(); + const worker2 = makeMockWorker(); + mockCreateQueueWorker + .mockReturnValueOnce(worker1 as never) + .mockReturnValueOnce(worker2 as never); + + startWorkerProcessor(); + await stopWorkerProcessor(); + + expect(worker1.close).toHaveBeenCalled(); + expect(worker2.close).toHaveBeenCalled(); + }); + + it('calls detachAll to release container references', async () => { + startWorkerProcessor(); + await stopWorkerProcessor(); + + expect(mockDetachAll).toHaveBeenCalled(); + }); + + it('is idempotent — safe to call multiple times', async () => { + startWorkerProcessor(); + await stopWorkerProcessor(); + mockDetachAll.mockClear(); + await stopWorkerProcessor(); // second call should not throw + + expect(mockDetachAll).toHaveBeenCalledTimes(1); + }); + + it('logs Stopped message', async () => { + startWorkerProcessor(); + await stopWorkerProcessor(); + + expect(console.log).toHaveBeenCalledWith(expect.stringContaining('Stopped')); + }); +});