Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019daf6c-ac45-7602-abbb-5162b202059c Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019daf6c-ac45-7602-abbb-5162b202059c Co-authored-by: Amp <amp@ampcode.com>
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 44 minutes and 30 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (17)
WalkthroughThis pull request introduces a dependency injection pattern by adding trait abstractions ( Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
grpc-service/src/ksql.rs (1)
217-224:⚠️ Potential issue | 🟠 MajorReject negative JSON integers before casting to
u64.The new integer tests cover non-negative
i64, butparse_u64_field(json!(-1))currently returnsu64::MAXvianumber as u64. That can silently corrupt KSQL numeric fields.🐛 Proposed parser and coverage fix
fn parse_u64_field(value: &Value) -> GeykagResult<u64> { if let Some(number) = value.as_u64() { return Ok(number); } if let Some(number) = value.as_i64() { - return Ok(number as u64); + if number >= 0 { + return Ok(number as u64); + } } Err(GeykagError::InvalidJsonInteger { value: value.clone(), }) }#[test] fn parse_u64_field_accepts_non_negative_i64_values() { assert_eq!(parse_u64_field(&json!(42_i64)).unwrap(), 42); } +#[test] +fn parse_u64_field_rejects_negative_i64_values() { + let error = parse_u64_field(&json!(-1_i64)).unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); +} + #[test] fn parse_u64_field_rejects_string_values() { let error = parse_u64_field(&json!("42")).unwrap_err();Also applies to: 617-627
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc-service/src/ksql.rs` around lines 217 - 224, The parser parse_u64_field currently casts negative i64 values to u64 (producing u64::MAX) which silently corrupts fields; update parse_u64_field to reject negative integers by checking the i64 sign before casting and return a GeykagResult error (with a clear message like "negative value for unsigned field") when number < 0; apply the same fix to the duplicate/related logic at the other occurrence referenced (lines 617-627) so all i64-to-u64 conversions validate non-negativity first and only cast safe, non-negative values.grpc-service/src/grpc_service/service.rs (2)
311-328:⚠️ Potential issue | 🟠 MajorBootstrap the initial subscription filter, not only later patches.
Line 311 parses the first request and Line 320 registers those pubkeys with the dispatcher, but
bootstrap_new_pubkeys_implis only invoked for subsequent stream messages at Lines 365-372. A client that only sends the required initial subscribe request will not receive existing snapshots, and ksql-missing pubkeys will never be whitelisted with the validator until the client sends a later filter update.🐛 Proposed fix to bootstrap the initial filter in the reader task
let initial_filter = parse_accounts_filter(&first_req)?; + let initial_filter_for_bootstrap = initial_filter.clone(); info!( filter_size = initial_filter.len(), "new gRPC subscriber connected" ); @@ let dispatcher = self.dispatcher.clone(); let snapshot_store = self.snapshot_store.clone(); let validator_subscriptions = self.validator_subscriptions.clone(); tokio::spawn(async move { + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator_subscriptions, + client_id, + initial_filter_for_bootstrap, + ) + .await; + while let Some(result) = request_stream.next().await { match result {Also applies to: 363-372
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc-service/src/grpc_service/service.rs` around lines 311 - 328, The initial parsed filter from parse_accounts_filter(...) is registered via self.dispatcher.add_client(...) but never passed into bootstrap_new_pubkeys_impl for the spawned reader task, so clients that only send the initial subscribe request miss existing snapshots and validator whitelisting; to fix, inside the tokio::spawn block (the reader task created after add_client), invoke bootstrap_new_pubkeys_impl(client_id, initial_filter.clone(), snapshot_store.clone(), validator_subscriptions.clone(), dispatcher.clone()) (or the appropriate signature) before entering the message-read loop so the initial_filter is treated the same as subsequent updates and triggers snapshot delivery and ksql whitelisting; keep use of the same client_id, snapshot_store, validator_subscriptions and dispatcher variables already captured for the task.
141-180:⚠️ Potential issue | 🟠 MajorDo not whitelist pubkeys after aborting bootstrap for a gone client.
The
breakbranches at Lines 149, 157, 165, and 174 still fall through to Lines 179-191. If an earlier pubkey was missing from ksql and queued inpubkeys_to_whitelist, a laterClientNotFound/policy removal/send error will still whitelist validator pubkeys for a client that can no longer receive them.🐛 Proposed fix to stop before validator whitelisting on terminal delivery failures
Ok(TargetedSendResult::ClientNotFound) => { warn!( client_id, pubkey = %pubkey_b58, "targeted snapshot skipped because client is no longer registered" ); - break; + return; } Ok(TargetedSendResult::FailedButRetained) => { warn!( client_id, pubkey = %pubkey_b58, "targeted snapshot delivery failed but client was retained" ); - break; + return; } Ok(TargetedSendResult::RemovedByPolicy) => { info!( client_id, pubkey = %pubkey_b58, "targeted snapshot delivery removed client by dispatcher policy" ); - break; + return; } Err(error) => { warn!( client_id, pubkey = %pubkey_b58, error = %error, "failed to queue targeted snapshot update" ); - break; + return; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc-service/src/grpc_service/service.rs` around lines 141 - 180, The match's break branches only exit the inner loop but still fall through to whitelisting; record a terminal-failure flag (e.g., let mut aborted_delivery = false) and set aborted_delivery = true inside the ClientNotFound, FailedButRetained, RemovedByPolicy and Err(error) arms (where you currently break), then break; after the loop check if aborted_delivery { return; } to avoid executing the subsequent pubkeys_to_whitelist logic (references: pubkeys_to_whitelist, dispatcher.send_to_client, TargetedSendResult::ClientNotFound / FailedButRetained / RemovedByPolicy).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc-service/src/grpc_service/dispatcher.rs`:
- Around line 769-785: The test currently relies on tokio::task::yield_now() to
wait for the dispatcher to process the enqueued add_client/remove_client
command, which can race; instead make the test perform an acknowledged
dispatcher operation so you know the client registration has been applied before
publishing. Replace the yield_now() usage in tests (e.g.,
add_client_registers_client_and_receives_matching_updates and the similar test
at 836-853) with an awaited acknowledged command such as calling
dispatcher.update_filter(...) or another dispatcher API that returns only after
the command is applied; ensure you await that acknowledgement immediately after
dispatcher.add_client(...) (and before dispatcher.try_publish(...)) so the
client registration is deterministic.
---
Outside diff comments:
In `@grpc-service/src/grpc_service/service.rs`:
- Around line 311-328: The initial parsed filter from parse_accounts_filter(...)
is registered via self.dispatcher.add_client(...) but never passed into
bootstrap_new_pubkeys_impl for the spawned reader task, so clients that only
send the initial subscribe request miss existing snapshots and validator
whitelisting; to fix, inside the tokio::spawn block (the reader task created
after add_client), invoke bootstrap_new_pubkeys_impl(client_id,
initial_filter.clone(), snapshot_store.clone(), validator_subscriptions.clone(),
dispatcher.clone()) (or the appropriate signature) before entering the
message-read loop so the initial_filter is treated the same as subsequent
updates and triggers snapshot delivery and ksql whitelisting; keep use of the
same client_id, snapshot_store, validator_subscriptions and dispatcher variables
already captured for the task.
- Around line 141-180: The match's break branches only exit the inner loop but
still fall through to whitelisting; record a terminal-failure flag (e.g., let
mut aborted_delivery = false) and set aborted_delivery = true inside the
ClientNotFound, FailedButRetained, RemovedByPolicy and Err(error) arms (where
you currently break), then break; after the loop check if aborted_delivery {
return; } to avoid executing the subsequent pubkeys_to_whitelist logic
(references: pubkeys_to_whitelist, dispatcher.send_to_client,
TargetedSendResult::ClientNotFound / FailedButRetained / RemovedByPolicy).
In `@grpc-service/src/ksql.rs`:
- Around line 217-224: The parser parse_u64_field currently casts negative i64
values to u64 (producing u64::MAX) which silently corrupts fields; update
parse_u64_field to reject negative integers by checking the i64 sign before
casting and return a GeykagResult error (with a clear message like "negative
value for unsigned field") when number < 0; apply the same fix to the
duplicate/related logic at the other occurrence referenced (lines 617-627) so
all i64-to-u64 conversions validate non-negativity first and only cast safe,
non-negative values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d110daa7-b640-4281-938c-146f95f2a98c
📒 Files selected for processing (11)
grpc-service/src/app.rsgrpc-service/src/domain.rsgrpc-service/src/grpc_service/convert.rsgrpc-service/src/grpc_service/dispatcher.rsgrpc-service/src/grpc_service/init_subs.rsgrpc-service/src/grpc_service/runtime.rsgrpc-service/src/grpc_service/service.rsgrpc-service/src/kafka.rsgrpc-service/src/ksql.rsgrpc-service/src/output.rsgrpc-service/src/traits.rs
Amp-Thread-ID: https://ampcode.com/threads/T-019db34c-78c0-775a-a862-306bf65a70cb Co-authored-by: Amp <amp@ampcode.com>
Summary
Refactors
grpc-serviceto inject its external collaborators through trait-based seams andadds a focused unit-test suite around the newly testable paths. The PR covers snapshot loading,
Kafka payload decoding, gRPC bootstrap behavior, dispatcher routing and health policy, app
orchestration, and output formatting without introducing networked or integration-style tests.
Details
Testability Refactor
AppandGrpcSubscriptionServiceto accept injected implementations while preserving the existing production constructors and runtime wiringUnit Test Coverage
App::run()orchestration tests with recording sinks and scripted sourcesSummary by CodeRabbit
Tests
Refactor