-
-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce max concurrency to DynamicFlushScheduler #1792
base: main
Are you sure you want to change the base?
Introduce max concurrency to DynamicFlushScheduler #1792
Conversation
|
WalkthroughThis update enhances the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Scheduler
participant Limiter as pLimit
User->>Scheduler: addToBatch(items)
Scheduler->>Scheduler: Check batch threshold
alt Batch threshold reached
Scheduler->>Scheduler: flushNextBatch()
Scheduler->>Limiter: Execute flush for sub-batch
Limiter-->>Scheduler: Return flush result/error
end
Scheduler-->>User: Flush complete
sequenceDiagram
participant OS
participant Scheduler
OS->>Scheduler: SIGTERM Received
Scheduler->>Scheduler: setupShutdownHandlers()
Scheduler->>Scheduler: shutdown()
Scheduler->>Limiter: Process remaining batches
Scheduler-->>OS: Graceful shutdown complete
Possibly related PRs
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure. 🔧 ESLint
Scope: all 2 workspace projects Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
apps/webapp/package.json (1)
148-148
: Introduction of “p-limit” for concurrency control.
This aligns with the concurrency enhancements inDynamicFlushScheduler
. Make sure to lock down the version if consistent behavior is critical across environments.- "p-limit": "^6.2.0", + "p-limit": "6.2.0",apps/webapp/test/dynamicFlushScheduler.test.ts (2)
37-51
: Flush interval-driven tests look solid.
By advancing the fake timers, you're verifying time-based flushes. This effectively ensures that the scheduler initiates a flush once the interval elapses.Consider adding a test verifying that multiple flush intervals still cause repeated flushes if items keep coming in.
68-89
: Signal handling test is comprehensive.
SimulatingSIGTERM
to confirm a final flush is crucial. This test ensures the scheduler properly returns the pending items.Consider adding a concurrency test to confirm the
maxConcurrency
setting’s effect under multiple batches being flushed concurrently.apps/webapp/app/v3/dynamicFlushScheduler.server.ts (5)
29-30
: Initializing pLimit with user-specified concurrency.
Fallback to1
avoids unbounded concurrency. This approach is safe for minimal parallelism but might be low for certain workloads.Would you like to auto-scale concurrency based on system metrics (CPU load, memory usage)? I can help open an enhancement issue if desired.
43-56
: Prometheus Gauges for batch size & failed batches.
This instrumentation broadens observability. Consider adding concurrency metrics (like active/queued flush tasks) for deeper insights.
67-77
: Comment and logic in addToBatch.
The TODO about using.concat()
for improved performance is valid for large arrays. This function is otherwise well-documented.Use
.concat(items)
to avoid overhead from spread, especially for large arrays.
100-113
: Graceful shutdown method.
The function checks if shutdown is already in progress, flushes pending items, and stops timers. This ensures no events are lost on exit.Consider adding a timeout fallback to handle stuck callbacks during shutdown.
138-179
: flushNextBatch concurrency logic.
Splitting items into smaller batches and applyingpLimit
ensures concurrency is well-managed. Error handling logs the batch-ID and incrementsfailedBatchCount
. This is robust.A circuit-breaker approach or retry queue might handle repeated failures gracefully, especially for flakier downstream systems.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (3)
apps/webapp/app/v3/dynamicFlushScheduler.server.ts
(1 hunks)apps/webapp/package.json
(2 hunks)apps/webapp/test/dynamicFlushScheduler.test.ts
(1 hunks)
🔇 Additional comments (17)
apps/webapp/package.json (3)
49-49
: Good restoration of @depot/sdk-node dependency.
This reintroduces the@depot/sdk-node
package at version^1.0.0
. If this is intentional, ensure there's no conflict with any previous or existing use in the code.
55-56
: Restoring internal dependencies appears consistent.
Re-adding@internal/run-engine
and@internal/zod-worker
atworkspace:*
seems proper for your monorepo setup. Verify that the reintroduction doesn't conflict with prior removal rationale.Would you like to confirm stable version references using a script snippet? I can provide one if needed.
60-60
: Re-added @opentelemetry/api-logs dependency.
Confirm that usage of@opentelemetry/api-logs
aligns with the new instrumentation approach. If the module was previously removed for compatibility reasons, validate any issues are resolved now.apps/webapp/test/dynamicFlushScheduler.test.ts (4)
4-9
: Test suite setup is clear.
Usingvi.useFakeTimers()
and resetting mocks inbeforeEach
is good practice to ensure each test runs independently.
11-21
: Verify no-op behavior for empty batch.
This test confirms that the scheduler doesn't invoke the callback when no items are enqueued. The logic is straightforward and well structured.
23-35
: Validating single flush scenario.
This test ensures the scheduler triggers the callback once the batch size is exactly met. This is a core scenario, and the expectations appear correct.
53-66
: Batching multiple times is well tested.
You accurately check that two flushes happen for six items with a batch size of three. This confirms the correct partitioning of items.apps/webapp/app/v3/dynamicFlushScheduler.server.ts (10)
2-3
: Importing p-limit and prom-client.
You’ve introducedp-limit
for concurrency limiting andprom-client
metrics. Ensure any bundling or environment constraints (e.g., serverless) don’t conflict with these packages.
10-11
: Optional maxConcurrency prop in the config.
Great addition to control concurrency. Ensure defaultmaxConcurrency
does not conflict with user expectations.
18-19
: Tracking concurrency limiting fields.
DefiningMAX_CONCURRENCY
andconcurrencyLimiter
clarifies concurrency logic. The typed return frompLimit
is correct.
22-24
: New fields for shutdown logic and metrics.
isShuttingDown
andfailedBatchCount
neatly track terminating conditions and error states.
36-40
: Comprehensive initialization logging.
Logging initial config helps debug misconfigurations in production. The structure is concise.
79-84
: Threshold-based flush trigger.
Flushing whencurrentBatch.length >= BATCH_SIZE
ensures partial batch flushes. This logic is solid for real-time scenarios.
94-98
: Setting up graceful shutdown hooks.
Listening for SIGTERM and SIGINT is essential for containerized environments. Logging the handler configuration is beneficial for troubleshooting.
115-120
: Timer clearing process.
Clearing the interval is crucial to prevent unwanted flush attempts after we initiate shutdown. Logging helps confirm the timer removal.
122-126
: Timer reset ensures fresh intervals.
This pattern restarts the timer after a successful flush, which supports continuous ingestion.
129-134
: Periodic flush triggered.
If the batch length is non-zero, we flush. This prevents stale items from accumulating if batch thresholds are never reached.
Closes #1789
✅ Checklist
Testing
DynamicFlushScheduler
Changelog
maxConcurrency
setting to theDynamicFlushScheduler
, limiting the number of concurrent requests made to the callback.batchSize
items, invoke the callback with onlybatchSize
events.DynamicFlushScheduler
TODOs:
Array.concat
when adding items tocurrentBatch
inaddToBatch
, as the spread operator is not performant when adding a large number of items.pLimit.activeCount
,pLimit.pendingCount
, andpLimit.concurrency
to/metrics
.batchSize
andflushInterval
in a single test.