-
-
Notifications
You must be signed in to change notification settings - Fork 848
feat(run-engine): worker queue resolver #2476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Walkthrough
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (11)
internal-packages/run-engine/src/run-queue/index.ts (3)
173-173
: Make resolver reference immutableDeclare the resolver as readonly to prevent accidental reassignment.
- private workerQueueResolver: WorkerQueueResolver; + private readonly workerQueueResolver: WorkerQueueResolver;
191-193
: Optional: allow injectable overrides for tests/opsConsider accepting an optional overrides JSON via RunQueueOptions and pass it here, so production can avoid relying solely on process.env and tests can inject without mutating env.
If you want, I can draft a minimal RunQueueOptions addition (e.g. workerQueueOverridesJson?: string) and plumb it through.
1852-1854
: Wrapper method name clarity (optional)Method is now a thin pass-through. Consider renaming to resolveWorkerQueue for clarity, or inline calls at use-sites. Totally optional.
internal-packages/run-engine/src/run-queue/workerQueueResolver.ts (5)
5-13
: Disambiguate schema vs type and validate non-empty valuesRename the Zod schema to avoid name shadowing with the exported type, and ensure override values aren’t empty strings (which could route to a blank queue).
-const WorkerQueueOverrides = z.object({ - environmentId: z.record(z.string(), z.string()).optional(), - projectId: z.record(z.string(), z.string()).optional(), - orgId: z.record(z.string(), z.string()).optional(), - workerQueue: z.record(z.string(), z.string()).optional(), -}); +const WorkerQueueOverridesSchema = z.object({ + environmentId: z.record(z.string(), z.string().min(1)).optional(), + projectId: z.record(z.string(), z.string().min(1)).optional(), + orgId: z.record(z.string(), z.string().min(1)).optional(), + workerQueue: z.record(z.string(), z.string().min(1)).optional(), +}); -export type WorkerQueueOverrides = z.infer<typeof WorkerQueueOverrides>; +export type WorkerQueueOverrides = z.infer<typeof WorkerQueueOverridesSchema>;- const result = WorkerQueueOverrides.safeParse(parsed); + const result = WorkerQueueOverridesSchema.safeParse(parsed);Also applies to: 35-41
66-74
: Fix comment: version reference typoThis branch handles v1 messages, not v2. Update the comment to avoid confusion.
- // In v2, if the environment is development, the worker queue is the environment id. + // In v1, if the environment is development, the worker queue is the environment id.
71-74
: Guard against empty masterQueues in v1If masterQueues is empty/malformed, we’ll return undefined. Add a safe fallback and log once for visibility.
- return message.masterQueues[0]; + const first = message.masterQueues[0]; + if (!first) { + this.logger.error("v1 message missing masterQueues[0], falling back to environmentId", { + orgId: message.orgId, + projectId: message.projectId, + environmentId: message.environmentId, + }); + return message.environmentId; + } + return first;
23-26
: Hot-reload overrides (optional)If you anticipate changing overrides at runtime, expose a small API to reload without recreating RunQueue.
constructor(opts: WorkerQueueResolverOptions) { this.logger = opts.logger; this.overrides = this.parseOverrides(opts.overrideConfig); } + + public reloadOverrides(overrideConfig?: string) { + this.overrides = this.parseOverrides(overrideConfig); + }
81-99
: Optional observability: log which override matchedAdding a debug log when an override is applied helps explain unexpected routing during incidents.
- if (this.overrides.environmentId?.[message.environmentId]) { - return this.overrides.environmentId[message.environmentId]; - } + if (this.overrides.environmentId?.[message.environmentId]) { + const q = this.overrides.environmentId[message.environmentId]; + this.logger.debug("Applying workerQueue override", { + scope: "environmentId", + key: message.environmentId, + workerQueue: q, + }); + return q; + } - if (this.overrides.projectId?.[message.projectId]) { - return this.overrides.projectId[message.projectId]; - } + if (this.overrides.projectId?.[message.projectId]) { + const q = this.overrides.projectId[message.projectId]; + this.logger.debug("Applying workerQueue override", { + scope: "projectId", + key: message.projectId, + workerQueue: q, + }); + return q; + } - if (this.overrides.orgId?.[message.orgId]) { - return this.overrides.orgId[message.orgId]; - } + if (this.overrides.orgId?.[message.orgId]) { + const q = this.overrides.orgId[message.orgId]; + this.logger.debug("Applying workerQueue override", { + scope: "orgId", + key: message.orgId, + workerQueue: q, + }); + return q; + } - if (this.overrides.workerQueue?.[message.workerQueue]) { - return this.overrides.workerQueue[message.workerQueue]; - } + if (this.overrides.workerQueue?.[message.workerQueue]) { + const q = this.overrides.workerQueue[message.workerQueue]; + this.logger.debug("Applying workerQueue override", { + scope: "workerQueue", + key: message.workerQueue, + workerQueue: q, + }); + return q; + }internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts (3)
9-9
: Align suite name with classMinor: rename describe title to match the class name.
-describe("WorkerQueueOverrideResolver", () => { +describe("WorkerQueueResolver", () => {
431-437
: Restore all mocks after each testPrevents cross-test leakage from spies.
afterEach(() => { if (originalEnv === undefined) { delete process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES; } else { process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES = originalEnv; } }); + + afterEach(() => { + vi.restoreAllMocks(); + });
400-421
: Add edge-case: v1 with empty masterQueuesEnsure fallback path is exercised and stable.
describe("V1 message handling", () => { + it("should fall back when v1 masterQueues is empty", () => { + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger }); + + const v1Msg: OutputPayloadV1 = { + version: "1", + runId: "run_123", + taskIdentifier: "task_123", + orgId: "org_123", + projectId: "proj_123", + environmentId: "env_prod", + environmentType: RuntimeEnvironmentType.PRODUCTION, + queue: "test-queue", + timestamp: Date.now(), + attempt: 0, + masterQueues: [], + }; + + const result = resolver.getWorkerQueueFromMessage(v1Msg); + expect(result).toBe("env_prod"); + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
internal-packages/run-engine/src/run-queue/index.ts
(4 hunks)internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/workerQueueResolver.ts
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
internal-packages/run-engine/src/run-queue/index.ts
internal-packages/run-engine/src/run-queue/workerQueueResolver.ts
internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts
**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Our tests are all vitest
Files:
internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{test,spec}.{ts,tsx,js,jsx}
: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks
Files:
internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts
🧬 Code graph analysis (3)
internal-packages/run-engine/src/run-queue/index.ts (2)
internal-packages/run-engine/src/run-queue/workerQueueResolver.ts (2)
WorkerQueueResolver
(19-100)message
(76-99)internal-packages/run-engine/src/run-queue/types.ts (2)
OutputPayload
(31-31)OutputPayload
(33-33)
internal-packages/run-engine/src/run-queue/workerQueueResolver.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (5)
message
(1399-1447)message
(1696-1751)message
(1753-1801)message
(1803-1831)message
(1852-1854)
internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts (1)
internal-packages/run-engine/src/run-queue/workerQueueResolver.ts (3)
WorkerQueueResolver
(19-100)message
(76-99)WorkerQueueOverrides
(12-12)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
internal-packages/run-engine/src/run-queue/index.ts (1)
45-45
: Good delegation: use the centralized resolverImporting and delegating to WorkerQueueResolver reduces duplication and keeps resolution logic isolated. LGTM.
internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts (1)
1-8
: Vitest usage and structure look goodUses vitest, colocated with implementation, and covers V1/V2 paths and precedence thoroughly. Nice work.
No description provided.