-
-
Notifications
You must be signed in to change notification settings - Fork 842
feat(webapp): add support for running web services (api, engine, webapp) in cluster mode for better perf #2472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…pp) in cluster mode for better perf
|
WalkthroughAdds a typed signalsEmitter singleton for SIGTERM/SIGINT and migrates multiple modules to subscribe to it instead of using process listeners. Wires graceful shutdown handlers into event loop monitoring, relay realtime streams, runs replication, MarQS, and DynamicFlushScheduler (which gains shutdown semantics and new config options). Adds an idempotence guard to RunsReplicationService.shutdown. Adjusts tracing to delegate error handling to startSpan. Implements optional Node.js cluster support in server.ts with primary/worker separation, worker forking, signal forwarding, graceful worker shutdown, and WebSocket/upgrade handling; preserves existing HTTP routes and middleware inside workers. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/webapp/app/v3/marqs/index.server.ts (1)
160-163
: Stop the Redis worker on shutdown to avoid leaks
shutdown()
clears timers and async workers but doesn’t stopthis.worker
.async shutdown(signal: NodeJS.Signals) { console.log("👇 Shutting down marqs", this.name, signal); clearInterval(this.clearCooloffPeriodInterval); this.#rebalanceWorkers.forEach((worker) => worker.stop()); + try { + await this.worker.stop(); + } catch (e) { + // log and continue shutdown + logger.warn("Error stopping MarQS worker during shutdown", { error: e, service: this.name }); + } }apps/webapp/app/v3/dynamicFlushScheduler.server.ts (1)
389-391
: Consider adding a timeout to the shutdown loop.The while loop waiting for pending flushes could potentially hang indefinitely if a flush operation gets stuck. Consider adding a timeout mechanism.
// Wait for all pending flushes to complete + const shutdownTimeout = 30000; // 30 seconds timeout + const startTime = Date.now(); while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) { + if (Date.now() - startTime > shutdownTimeout) { + this.logger.error("Shutdown timeout reached, forcefully exiting", { + remainingBatches: this.batchQueue.length, + activeFlushes: this.limiter.activeCount, + }); + break; + } await new Promise((resolve) => setTimeout(resolve, 100)); }
🧹 Nitpick comments (3)
apps/webapp/server.ts (1)
19-21
: Provide fallback foros.availableParallelism()
On older Node versions it may not exist. Use
os.cpus().length
as a fallback.-const cpuCount = os.availableParallelism(); +const cpuCount = + typeof (os as any).availableParallelism === "function" + ? (os as any).availableParallelism() + : os.cpus().length;apps/webapp/app/eventLoopMonitor.server.ts (1)
114-119
: Consider reducing code duplication in signal handlers.Both signal handlers perform the identical action of clearing the interval. Consider extracting this to a single handler function.
+ const cleanup = () => clearInterval(interval); + signalsEmitter.on("SIGTERM", cleanup); + signalsEmitter.on("SIGINT", cleanup); - signalsEmitter.on("SIGTERM", () => { - clearInterval(interval); - }); - signalsEmitter.on("SIGINT", () => { - clearInterval(interval); - });apps/webapp/app/services/signals.server.ts (1)
1-2
: Consider using Node.js built-in typed EventEmitter.Since Node.js 14.17.0+, you can use the built-in typed EventEmitter from
node:events
which provides better type safety.-import { EventEmitter } from "events"; +import { EventEmitter } from "node:events";
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (8)
apps/webapp/app/eventLoopMonitor.server.ts
(2 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
(2 hunks)apps/webapp/app/services/runsReplicationInstance.server.ts
(2 hunks)apps/webapp/app/services/signals.server.ts
(1 hunks)apps/webapp/app/v3/dynamicFlushScheduler.server.ts
(10 hunks)apps/webapp/app/v3/marqs/index.server.ts
(2 hunks)apps/webapp/app/v3/tracing.server.ts
(1 hunks)apps/webapp/server.ts
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/app/eventLoopMonitor.server.ts
apps/webapp/app/services/runsReplicationInstance.server.ts
apps/webapp/app/v3/tracing.server.ts
apps/webapp/server.ts
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
apps/webapp/app/services/signals.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/app/eventLoopMonitor.server.ts
apps/webapp/app/services/runsReplicationInstance.server.ts
apps/webapp/app/v3/tracing.server.ts
apps/webapp/server.ts
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
apps/webapp/app/services/signals.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/app/eventLoopMonitor.server.ts
apps/webapp/app/services/runsReplicationInstance.server.ts
apps/webapp/app/v3/tracing.server.ts
apps/webapp/server.ts
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
apps/webapp/app/services/signals.server.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/app/eventLoopMonitor.server.ts
apps/webapp/app/services/runsReplicationInstance.server.ts
apps/webapp/app/v3/tracing.server.ts
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
apps/webapp/app/services/signals.server.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/app/eventLoopMonitor.server.ts
apps/webapp/app/services/runsReplicationInstance.server.ts
apps/webapp/app/v3/tracing.server.ts
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
apps/webapp/app/services/signals.server.ts
🧠 Learnings (1)
📚 Learning: 2025-08-29T10:06:49.283Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-08-29T10:06:49.283Z
Learning: Applies to {apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts} : Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Applied to files:
apps/webapp/app/v3/marqs/index.server.ts
🧬 Code graph analysis (7)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (3)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams
(19-19)apps/webapp/app/services/signals.server.ts (1)
signalsEmitter
(32-32)apps/webapp/app/v3/marqs/index.server.ts (1)
signalsEmitter
(154-157)
apps/webapp/app/v3/marqs/index.server.ts (1)
apps/webapp/app/services/signals.server.ts (1)
signalsEmitter
(32-32)
apps/webapp/app/eventLoopMonitor.server.ts (2)
apps/webapp/app/services/signals.server.ts (1)
signalsEmitter
(32-32)apps/webapp/app/v3/marqs/index.server.ts (1)
signalsEmitter
(154-157)
apps/webapp/app/services/runsReplicationInstance.server.ts (2)
apps/webapp/app/services/signals.server.ts (1)
signalsEmitter
(32-32)apps/webapp/app/v3/marqs/index.server.ts (1)
signalsEmitter
(154-157)
apps/webapp/app/v3/tracing.server.ts (2)
apps/webapp/app/v3/tracer.server.ts (1)
attributesFromAuthenticatedEnv
(763-775)internal-packages/tracing/src/index.ts (1)
SpanKind
(34-34)
apps/webapp/app/v3/dynamicFlushScheduler.server.ts (2)
apps/webapp/app/services/signals.server.ts (1)
signalsEmitter
(32-32)apps/webapp/app/v3/marqs/index.server.ts (1)
signalsEmitter
(154-157)
apps/webapp/app/services/signals.server.ts (1)
apps/webapp/app/v3/marqs/index.server.ts (2)
T
(1205-1239)signalsEmitter
(154-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/webapp/app/v3/marqs/index.server.ts (1)
155-157
: LGTM: centralized signal wiringSwitch to
signalsEmitter
is consistent with the new shutdown model.apps/webapp/app/services/runsReplicationInstance.server.ts (1)
83-84
: LGTM! Signal handling properly migrated to signalsEmitter.The change from process signal listeners to the centralized signalsEmitter is consistent with the PR-wide pattern for graceful shutdown handling.
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
247-257
: LGTM! Proper graceful shutdown wiring for relay service.The implementation correctly:
- Creates the service instance before wiring signal handlers
- Binds the close method to maintain correct
this
context- Registers both SIGTERM and SIGINT handlers for comprehensive coverage
apps/webapp/app/v3/dynamicFlushScheduler.server.ts (2)
370-372
: Good implementation of idempotent shutdown.The early return check prevents race conditions when shutdown is triggered multiple times, which is important for graceful shutdown in cluster mode.
126-128
: LGTM! Proper handling of shutdown state in batch creation.The immediate batch creation during shutdown ensures no data is lost when the flush timer is stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
apps/webapp/app/services/runsReplicationService.server.ts (3)
934-941
: Make ConcurrentFlushScheduler.shutdown await a full drainRight now shutdown() is fire-and-forget: it clears the timer and triggers a flush but does not await completion or in-flight work. That can drop data on process exit. Suggest making it async and draining until both the current batch is empty and the limiter has no active/pending work.
Apply this diff within the method:
- shutdown(): void { - this.logger.info("Shutting down ConcurrentFlushScheduler"); - - this._isShutDown = true; - - this.#clearTimer(); - this.#flushNextBatchIfNeeded(); - } + async shutdown(): Promise<void> { + this.logger.info("Shutting down ConcurrentFlushScheduler"); + this._isShutDown = true; + this.#clearTimer(); + // Drain all remaining work, including in-flight callbacks + await this.#drainAll(); + }And add this private helper (outside the selected range; place near other private helpers):
// Drain until there is no buffered work and no in-flight/pending tasks async #drainAll(): Promise<void> { // Keep flushing while items remain while (this.currentBatch.length > 0) { await this.#flushNextBatch(); } // Then wait for in-flight/pending tasks to complete while (this.concurrencyLimiter.activeCount > 0 || this.concurrencyLimiter.pendingCount > 0) { await new Promise((r) => setTimeout(r, 25)); } }This change aligns with the shutdown modification in RunsReplicationService so callers can await durability before exiting.
206-221
: MakeConcurrentFlushScheduler.shutdown
async and drain pending batches before stopping
- Right now
ConcurrentFlushScheduler.shutdown()
is synchronous (returnsvoid
) and only clears the timer—pending in-memory batches aren’t flushed.- First, change its signature to
async shutdown(): Promise<void>
that (a) sets_isShutDown
, (b) clears the timer, and (c) awaits any in-flight flush tasks.- Then in
RunsReplicationService.shutdown()
, alwaysawait this._concurrentFlushScheduler.shutdown()
before stopping the replication client or marking shutdown complete.
172-180
: Prevent premature ACKs during shutdown
In
#acknowledgeLatestTransaction
, bail out when_isShuttingDown && !this._isShutDownComplete
to avoid advancing the resume LSN before all inserts have been flushed.async #acknowledgeLatestTransaction() { + if (this._isShuttingDown && !this._isShutDownComplete) return; ...
Ensure the heartbeat handler’s
lsn
(fromevents.on("heartbeat")
) reflects the last fully processed commit, not merely the server’s WAL end, before callingthis._replicationClient.acknowledge(lsn)
.
🧹 Nitpick comments (1)
apps/webapp/app/services/runsReplicationService.server.ts (1)
898-901
: Type nit: initialize failedBatchCount at declarationAvoid implicit any on a class field; initialize at declaration and drop the constructor assignment.
- private failedBatchCount; + private failedBatchCount = 0; @@ - this.failedBatchCount = 0; + // initialized at declarationAlso applies to: 913-914
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
apps/webapp/app/services/runsReplicationService.server.ts
(1 hunks)apps/webapp/app/v3/dynamicFlushScheduler.server.ts
(10 hunks)apps/webapp/app/v3/marqs/index.server.ts
(3 hunks)apps/webapp/app/v3/tracing.server.ts
(1 hunks)apps/webapp/server.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- apps/webapp/server.ts
- apps/webapp/app/v3/marqs/index.server.ts
- apps/webapp/app/v3/tracing.server.ts
- apps/webapp/app/v3/dynamicFlushScheduler.server.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/services/runsReplicationService.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/services/runsReplicationService.server.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/services/runsReplicationService.server.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/services/runsReplicationService.server.ts
🧠 Learnings (1)
📓 Common learnings
Learnt from: ericallam
PR: triggerdotdev/trigger.dev#2472
File: apps/webapp/app/services/signals.server.ts:23-30
Timestamp: 2025-09-03T15:45:24.897Z
Learning: In Node.js cluster mode, each worker is a separate process with its own memory space and process object. When using singletons that register process signal handlers, each worker process correctly gets its own instance and handlers - there's no risk of duplicate handlers within a process due to the singleton pattern, and each process needs its own handlers anyway.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
No description provided.