Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds gRPC and Sled endpoints, replaces file subscribe/delete booleans with a FileConsumerMode enum, introduces a Reader publisher, updates models and JSON schema, and conditionally compiles protobufs in build.rs when the grpc feature is enabled. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant GrpcPub as GrpcPublisher
participant BridgeSrv as Bridge (gRPC)
participant GrpcCons as GrpcConsumer
participant SledStore as Sled (optional)
Client->>GrpcPub: send_batch(stream CanonicalMessage)
GrpcPub->>BridgeSrv: PublishBatch(stream BridgeMessage)
BridgeSrv->>SledStore: persist / route (if configured)
BridgeSrv->>GrpcCons: Subscribe (server-stream BridgeMessage)
GrpcCons->>GrpcCons: convert -> CanonicalMessage, infer id, attach metadata
GrpcCons->>SledStore: commit/offset tracking (if configured)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (8)
src/endpoints/file.rs (1)
389-404: Dead code:initializedvariable is set but never read.The
initializedflag at line 389 is only ever written (lines 402-404) and never used for any branching logic. This appears to be leftover from a previous refactor.Remove dead code
- let mut initialized = false; loop { if reader.is_none() { let mut file = match std::fs::OpenOptions::new().read(true).open(&path) { Ok(f) => f, Err(e) => { tracing::error!("Failed to open {}: {}", path, e); std::thread::sleep(std::time::Duration::from_secs(1)); continue; } }; - if !initialized { - initialized = true; - } - if let Ok(metadata) = file.metadata() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/file.rs` around lines 389 - 404, Remove the dead flag by deleting the unused variable initialized and its assignment inside the loop: drop the declaration "let mut initialized = false;" and remove the "if !initialized { initialized = true; }" block; keep the rest of the logic around opening the file (reader, path, file) unchanged and ensure no other code references initialized.Cargo.toml (1)
104-104: Includinggrpcas a default feature pulls inprotobuf-src, which compiles protobuf from source.This can add significant build time for all downstream users, even those who don't need gRPC. Consider whether
grpcshould be opt-in rather than default, or at minimum document the build-time cost.Proposed change
-default = ["grpc"] +default = []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` at line 104, The crate currently enables grpc by default via the Cargo.toml `default = ["grpc"]`, which pulls in `protobuf-src` and forces protobuf compilation for all downstream users; change the default feature list to exclude `"grpc"` (make `grpc` an opt-in feature) or document the build-time cost of `protobuf-src` prominently in README/Cargo.toml metadata so consumers know to opt into `grpc` if they need it; update the `default` entry to remove `"grpc"` and add a short note near the `grpc` feature describing that it requires `protobuf-src` and can significantly increase build times.src/endpoints/grpc.rs (2)
66-74: 1ms timeout for subsequent batch messages may be too aggressive.Line 73 uses
Duration::from_millis(1)for polling subsequent messages after the first. On loaded systems or with network jitter, this could cause batches to be consistently undersized (just 1 message). Consider making this configurable or using a slightly more generous default (e.g., 5-10ms).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 66 - 74, The 1ms hardcoded poll in the loop (tokio::time::timeout(Duration::from_millis(1), stream.message())) is too aggressive and can cause undersized batches; replace the literal Duration::from_millis(1) with a configurable timeout (e.g., GRPC_BATCH_POLL_MS or a field on the config) and use a slightly larger default (5–10 ms) so subsequent polls for stream.message() have more headroom; update any related documentation/tests and ensure the code uses that constant/config value in the timeout call instead of the hardcoded 1.
26-30:MutexonclientandstreaminGrpcConsumeris unnecessary.
receive_batchtakes&mut self, which already guarantees exclusive access via the borrow checker. TheMutex<Option<tonic::Streaming<BridgeMessage>>>forstreamandMutex<BridgeClient<Channel>>forclientadd lock overhead without benefit.clientisn't even used after construction.Consider replacing with plain fields:
♻️ Suggested simplification
pub struct GrpcConsumer { - client: Mutex<BridgeClient<Channel>>, - stream: Mutex<Option<tonic::Streaming<BridgeMessage>>>, + stream: Option<tonic::Streaming<BridgeMessage>>, topic: String, }And in
receive_batch:- let mut stream_guard = self.stream.lock().await; - if let Some(stream) = stream_guard.as_mut() { + if let Some(stream) = self.stream.as_mut() {Also applies to: 56-60
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 26 - 30, The GrpcConsumer struct currently wraps client and stream in Mutex but receive_batch takes &mut self so the Mutexes are redundant; change GrpcConsumer.client from Mutex<BridgeClient<Channel>> to BridgeClient<Channel> and GrpcConsumer.stream from Mutex<Option<tonic::Streaming<BridgeMessage>>> to Option<tonic::Streaming<BridgeMessage>>, update the constructor/new code that builds GrpcConsumer to set those plain fields, and update receive_batch to access self.client and self.stream directly (and remove locking/unwrapping); if client is truly unused after construction remove the client field entirely (and any uses of BridgeClient) to keep the struct minimal. Ensure you update all corresponding uses in the impl (including the other occurrence around the stream/client fields) to match the new plain-field types.src/models.rs (2)
667-708: CustomFileConfigdeserializer:extramay not be anObject, leading to silent default.On line 691, the code checks
if let serde_json::Value::Object(ref mut map) = extra, but ifextrahappens to be a non-Object value (e.g.,Nullwhen no extra fields exist beyondpath), the condition falls through without inserting"mode", and thenserde_json::from_value(extra)on line 700-701 will attempt to deserialize a non-object intoFileConsumerMode(which uses#[serde(tag = "mode")]), resulting in a confusing error.In practice,
#[serde(flatten)] extra: serde_json::Valueon a struct with only one other named field should always produce anObject, so this is likely safe. But a defensive fallback or explicit error for the non-object case would be more robust.🛡️ Add a defensive else-branch
if let serde_json::Value::Object(ref mut map) = extra { if !map.contains_key("mode") { map.insert( "mode".to_string(), serde_json::Value::String("consume".to_string()), ); } + } else { + extra = serde_json::json!({"mode": "consume"}); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/models.rs` around lines 667 - 708, The custom Deserialize impl for FileConfig assumes the flattened `extra` is an Object but can be non-Object (e.g., Null), which leads to confusing deserialization errors when inserting default "mode"; update the Deserialize implementation for FileConfig (inside the impl<'de> Deserialize<'de>) to handle the non-Object case defensively: after deserializing FileConfigHelper, check if `extra` is an Object and insert the default "mode" as before, but if `extra` is not an Object, replace or wrap it with a new Object containing the default "mode" (or return a clear serde error); ensure the subsequent serde_json::from_value(extra) call receives an Object so FileConsumerMode (which uses `#[serde(tag = "mode")]`) deserializes reliably.
998-1010:GrpcConfighas no TLS configuration field.Other network-facing endpoint configs (Kafka, NATS, AMQP, MQTT, HTTP, IBM MQ) all carry a
tls: TlsConfigfield.GrpcConfigomits it entirely, meaning users cannot configure mTLS or server-certificate verification for gRPC connections. If this is intentional for an MVP, consider adding a TODO comment. Otherwise, addingtlssupport would align with the rest of the codebase.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/models.rs` around lines 998 - 1010, GrpcConfig currently lacks TLS settings; add a tls field (e.g., pub tls: Option<TlsConfig> or pub tls: TlsConfig with Default) to the GrpcConfig struct so gRPC endpoints can be configured like other transports; update derives/serde attributes as needed (keep #[serde(deny_unknown_fields)] and add #[serde(default)] if you use an Option or defaulted TlsConfig) and ensure GrpcConfig remains Clone/Default by referencing the existing TlsConfig type so mTLS and server cert verification can be configured consistently across transports.src/endpoints/sled.rs (2)
302-311:receive_batchignoresmax_messagesand always returns exactly one message.The method accepts
_max_messagesbut makes no attempt to fill the batch up to that count. It delegates toself.receive()and wraps the single result. This undercuts the purpose of batch consumption and may cause unnecessary overhead in the routing layer if it expects batches.Consider iterating up to
max_messages(with a short timeout for subsequent messages, similar to the gRPC consumer's approach) to improve throughput.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 302 - 311, The receive_batch implementation for async fn receive_batch currently ignores the _max_messages parameter and always returns a single message; change it to loop up to max_messages calling self.receive() repeatedly (with a short per-receive timeout or non-blocking variant for subsequent receives) to accumulate messages into a Vec, collect their commit functions (or wrap a combined commit via crate::traits::into_batch_commit_func), and return early on timeout or terminal error so batches can contain up to max_messages items; refer to receive_batch, self.receive(), and crate::traits::into_batch_commit_func when making the change.
28-40: GlobalSLED_DBSregistry never removes entries.
get_or_open_dbinserts into the globalHashMapbut there's no corresponding removal. For a process that opens a fixed set of DBs this is fine. However, theDbhandle keeps the database open (including file locks and memory-mapped files), so DBs are never closed during the lifetime of the process.This is acceptable for typical usage but worth documenting, especially for testing where
tempdir()-based paths accumulate stale entries in the global map.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 28 - 40, The global SLED_DBS registry currently never removes entries which keeps Db handles (and file locks/mmap) alive; add a counterpart to get_or_open_db that cleanly removes and closes a DB: implement a close_db(path: &str) -> anyhow::Result<()> (or remove_db) that locks SLED_DBS, takes and removes the Db for the given path, calls db.flush() (or any required sled cleanup) before dropping it, and return an error if the registry is poisoned or the path is not found; update tests to call this cleanup for tempdir-based paths. Use the existing SLED_DBS and get_or_open_db symbols to locate where to add and call this remover.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@build.rs`:
- Around line 26-34: Add a Cargo rerun directive so changes to the proto trigger
a rebuild: inside the #[cfg(feature = "grpc")] block in build.rs (the block that
calls protobuf_src::protoc() and
tonic_prost_build::configure().compile_protos(...)), emit a
println!("cargo:rerun-if-changed=src/endpoints/grpc.proto"); before compiling
the protos to ensure Cargo re-runs the build script when the proto file changes.
In `@src/endpoints/grpc.rs`:
- Around line 251-267: Tests bind to hardcoded ports (addr "[::1]:50051" and
similar later) which causes flakes; change the test to create a
tokio::net::TcpListener bound to "[::1]:0" (or "[::1]:0"/"127.0.0.1:0"), get the
assigned socket address via listener.local_addr(), use that address for clients,
and start the server with
Server::builder().serve_with_incoming(listener.incoming()) (or convert the
listener into a tonic incoming stream) instead of Server::serve; update the
test_grpc_publisher_and_consumer and the other test (lines ~327-340) to use this
dynamically assigned port and the listener-derived addr so tests won’t collide
in CI.
- Around line 33-51: GrpcConfig.timeout_ms is declared but not applied; update
GrpcConsumer::new and GrpcPublisher::new to honor it by constructing the tonic
transport Channel via Endpoint (use Endpoint::from_shared(config.url.clone())
and set the connect timeout from config.timeout_ms) instead of calling
BridgeClient::connect directly, and also apply per-RPC timeouts when performing
calls like client.subscribe(Request::new(SubscribeRequest { .. })) and other
RPCs (e.g., via tokio::time::timeout(Duration::from_millis(config.timeout_ms),
client.method(...))). Ensure you reference GrpcConfig.timeout_ms,
BridgeClient::connect, GrpcConsumer::new / GrpcPublisher::new, client.subscribe,
and Request::new(SubscribeRequest) when making the changes.
In `@src/endpoints/reader.rs`:
- Around line 49-84: The send_batch function currently returns
SentBatch::Partial even when all messages were successfully received and
committed; change the success path in send_batch (inside match on
consumer.receive_batch and after batch.commit) to return SentBatch::Ack when the
operation consumed and committed all messages (no failures) instead of
SentBatch::Partial with responses Some(...); keep the Partial/failed response
only for true partial/failure cases. Update the branch that inspects
received_count, batch.commit, batch.messages, and count to return SentBatch::Ack
on full success while preserving existing error handling for ConsumerError and
commit failures.
In `@src/endpoints/sled.rs`:
- Around line 55-69: The send() method currently calls self.tree.insert(...) but
does not call self.tree.flush_async().await like send_batch() does, causing
inconsistent durability; update send() to await self.tree.flush_async().await
after insert and convert any flush errors to PublisherError::Retryable (similar
to send_batch's error mapping), or if the intended behavior is weaker durability
for single sends, add a comment in send() documenting this trade-off and why
flush is omitted; reference the send(), send_batch(), insert(), and
tree.flush_async().await symbols when making the change.
- Around line 250-300: The receive function currently uses pop_min() in queue
mode which permanently deletes the entry before processing; change the flow so
messages are not removed until Ack to avoid loss: in receive (when
self.delete_after_read is true) replace the pop_min() usage with a
non-destructive read (e.g., peek/iter/range to fetch the min key+value) and move
the delete into the commit closure for the Ack path (or implement an "inflight"
marker by moving the key to a processing tree and only dropping it on Ack and
restoring on Nack); update the commit closure logic (the Boxed closure created
in receive) to perform the actual delete on Ack and to re-insert or clear the
inflight marker on Nack so crash between read and commit cannot permanently lose
messages.
- Around line 222-231: The spawned thread blocks forever on subscriber.next(),
causing orphaned threads if no sled events arrive; change the loop to poll with
a timeout and check whether the receiver has been dropped so the thread can exit
gracefully. Specifically, replace the blocking subscriber.next() loop in the
tree.watch_prefix(...)->Subscriber handling with a timed poll (e.g.,
Subscriber::next_timeout(Duration) or a loop that tries subscriber.try_next()
with a small sleep/timeout) and break the loop when tx.send_blocking would fail
or when rx.is_closed() (or equivalent) indicates the consumer is gone; update
the code surrounding subscriber, tx, and rx to use this timeout-based polling so
the thread can exit instead of lingering forever.
---
Nitpick comments:
In `@Cargo.toml`:
- Line 104: The crate currently enables grpc by default via the Cargo.toml
`default = ["grpc"]`, which pulls in `protobuf-src` and forces protobuf
compilation for all downstream users; change the default feature list to exclude
`"grpc"` (make `grpc` an opt-in feature) or document the build-time cost of
`protobuf-src` prominently in README/Cargo.toml metadata so consumers know to
opt into `grpc` if they need it; update the `default` entry to remove `"grpc"`
and add a short note near the `grpc` feature describing that it requires
`protobuf-src` and can significantly increase build times.
In `@src/endpoints/file.rs`:
- Around line 389-404: Remove the dead flag by deleting the unused variable
initialized and its assignment inside the loop: drop the declaration "let mut
initialized = false;" and remove the "if !initialized { initialized = true; }"
block; keep the rest of the logic around opening the file (reader, path, file)
unchanged and ensure no other code references initialized.
In `@src/endpoints/grpc.rs`:
- Around line 66-74: The 1ms hardcoded poll in the loop
(tokio::time::timeout(Duration::from_millis(1), stream.message())) is too
aggressive and can cause undersized batches; replace the literal
Duration::from_millis(1) with a configurable timeout (e.g., GRPC_BATCH_POLL_MS
or a field on the config) and use a slightly larger default (5–10 ms) so
subsequent polls for stream.message() have more headroom; update any related
documentation/tests and ensure the code uses that constant/config value in the
timeout call instead of the hardcoded 1.
- Around line 26-30: The GrpcConsumer struct currently wraps client and stream
in Mutex but receive_batch takes &mut self so the Mutexes are redundant; change
GrpcConsumer.client from Mutex<BridgeClient<Channel>> to BridgeClient<Channel>
and GrpcConsumer.stream from Mutex<Option<tonic::Streaming<BridgeMessage>>> to
Option<tonic::Streaming<BridgeMessage>>, update the constructor/new code that
builds GrpcConsumer to set those plain fields, and update receive_batch to
access self.client and self.stream directly (and remove locking/unwrapping); if
client is truly unused after construction remove the client field entirely (and
any uses of BridgeClient) to keep the struct minimal. Ensure you update all
corresponding uses in the impl (including the other occurrence around the
stream/client fields) to match the new plain-field types.
In `@src/endpoints/sled.rs`:
- Around line 302-311: The receive_batch implementation for async fn
receive_batch currently ignores the _max_messages parameter and always returns a
single message; change it to loop up to max_messages calling self.receive()
repeatedly (with a short per-receive timeout or non-blocking variant for
subsequent receives) to accumulate messages into a Vec, collect their commit
functions (or wrap a combined commit via crate::traits::into_batch_commit_func),
and return early on timeout or terminal error so batches can contain up to
max_messages items; refer to receive_batch, self.receive(), and
crate::traits::into_batch_commit_func when making the change.
- Around line 28-40: The global SLED_DBS registry currently never removes
entries which keeps Db handles (and file locks/mmap) alive; add a counterpart to
get_or_open_db that cleanly removes and closes a DB: implement a close_db(path:
&str) -> anyhow::Result<()> (or remove_db) that locks SLED_DBS, takes and
removes the Db for the given path, calls db.flush() (or any required sled
cleanup) before dropping it, and return an error if the registry is poisoned or
the path is not found; update tests to call this cleanup for tempdir-based
paths. Use the existing SLED_DBS and get_or_open_db symbols to locate where to
add and call this remover.
In `@src/models.rs`:
- Around line 667-708: The custom Deserialize impl for FileConfig assumes the
flattened `extra` is an Object but can be non-Object (e.g., Null), which leads
to confusing deserialization errors when inserting default "mode"; update the
Deserialize implementation for FileConfig (inside the impl<'de>
Deserialize<'de>) to handle the non-Object case defensively: after deserializing
FileConfigHelper, check if `extra` is an Object and insert the default "mode" as
before, but if `extra` is not an Object, replace or wrap it with a new Object
containing the default "mode" (or return a clear serde error); ensure the
subsequent serde_json::from_value(extra) call receives an Object so
FileConsumerMode (which uses `#[serde(tag = "mode")]`) deserializes reliably.
- Around line 998-1010: GrpcConfig currently lacks TLS settings; add a tls field
(e.g., pub tls: Option<TlsConfig> or pub tls: TlsConfig with Default) to the
GrpcConfig struct so gRPC endpoints can be configured like other transports;
update derives/serde attributes as needed (keep #[serde(deny_unknown_fields)]
and add #[serde(default)] if you use an Option or defaulted TlsConfig) and
ensure GrpcConfig remains Clone/Default by referencing the existing TlsConfig
type so mTLS and server cert verification can be configured consistently across
transports.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
Cargo.tomlbenches/performance_bench.rsbuild.rsmq-bridge.schema.jsonsrc/endpoints/file.rssrc/endpoints/grpc.protosrc/endpoints/grpc.rssrc/endpoints/mod.rssrc/endpoints/reader.rssrc/endpoints/sled.rssrc/models.rs
| let commit: crate::traits::BatchCommitFunc = if let Some(offset_file) = | ||
| &c.offset_file | ||
| { | ||
| let offset_file = offset_file.clone(); | ||
| let captured_messages = messages.clone(); | ||
|
|
||
| Box::new( | ||
| move |dispositions: Vec<crate::traits::MessageDisposition>| { | ||
| Box::pin(async move { | ||
| let max_offset = dispositions | ||
| .iter() | ||
| .zip(captured_messages.iter()) | ||
| .filter_map(|(d, m)| match d { | ||
| crate::traits::MessageDisposition::Ack | ||
| | crate::traits::MessageDisposition::Reply(_) => m | ||
| .metadata | ||
| .get("file_offset") | ||
| .and_then(|s| s.parse::<u64>().ok()), | ||
| _ => None, | ||
| }) | ||
| .max(); | ||
|
|
||
| if let Some(offset) = max_offset { | ||
| let mut file = offset_file.lock().await; | ||
| if let Err(e) = file.rewind().await { | ||
| tracing::error!("Failed to rewind offset file: {}", e); | ||
| } else if let Err(e) = file.set_len(0).await { | ||
| tracing::error!("Failed to truncate offset file: {}", e); | ||
| } else if let Err(e) = | ||
| file.write_all(offset.to_string().as_bytes()).await | ||
| { | ||
| tracing::error!("Failed to write offset file: {}", e); | ||
| } else if let Err(e) = file.flush().await { | ||
| tracing::error!("Failed to flush offset file: {}", e); | ||
| } | ||
| } | ||
| Ok(()) | ||
| }) | ||
| as crate::traits::BoxFuture<'static, anyhow::Result<()>> | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Offset file write is not atomic — crash between set_len(0) and write_all loses the offset.
The sequence rewind → set_len(0) → write_all → flush at lines 741-751 truncates the file before writing the new offset. If the process crashes after truncation but before the write completes, the offset file will be empty, causing the consumer to restart from offset 0 (or tail).
For a file-based offset store, a safer pattern is write-to-temp-then-rename (like remove_lines_from_file already does). However, since the fallback (re-reading from start/tail) is safe and this is a lightweight local persistence mechanism, this is acceptable for now.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/models.rs (1)
144-166:⚠️ Potential issue | 🟡 Minor
"reader"is missing fromis_known_endpoint_name.
EndpointType::Readeris a valid variant (line 421) that serde will try to parse via#[serde(rename_all = "lowercase")]as"reader". If a user provides areader:config block that has a structural error, the fallback logic at line 250 won't recognize it as a known endpoint and will silently wrap it asCustominstead of surfacing the parse error. Add"reader"tois_known_endpoint_namefor consistency with the other variants.Proposed fix
matches!( name, "aws" | "kafka" | "nats" | "file" | "static" | "memory" | "sled" | "amqp" | "mongodb" | "mqtt" | "http" | "ibm-mq" | "ibmmq" | "zeromq" | "grpc" | "fanout" | "switch" | "response" + | "reader" )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/models.rs` around lines 144 - 166, The function is_known_endpoint_name currently omits the "reader" token, so EndpointType::Reader (serialized as "reader") isn't recognized and malformed reader configs get wrapped as Custom instead of surfacing serde errors; update is_known_endpoint_name to include "reader" in its matches list so the fallback logic treating known endpoint names (used when parsing EndpointType) correctly recognizes reader and lets serde return parse errors.
♻️ Duplicate comments (6)
src/endpoints/grpc.rs (2)
249-265: Tests use hardcoded ports — prone to flakiness in CI.Tests bind to fixed ports
50051and50052. If tests run in parallel or the port is in use, these will fail. Consider binding to port0and retrieving the assigned port.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 249 - 265, The test test_grpc_publisher_and_consumer currently binds to fixed addresses (addr using port 50051/50052) which can conflict in CI; change to bind a TcpListener to "[::1]:0" (ephemeral port), obtain the assigned port via listener.local_addr(), and use that address for the client rather than hardcoded ports; then pass the listener (or an incoming stream derived from it) into Server::builder().serve_with_incoming(...) (or similar tonic API) when starting the server so the test reliably uses an available port instead of 50051/50052.
32-49:GrpcConfig::timeout_msis accepted in config but never applied.Neither
GrpcConsumer::newnorGrpcPublisher::newusesconfig.timeout_ms. A user settingtimeout_ms: 5000will get no effect. Wire it into channel/request configuration or mark it as reserved/unimplemented inGrpcConfig.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 32 - 49, GrpcConfig::timeout_ms is accepted but not used; update GrpcConsumer::new (and similarly GrpcPublisher::new) to apply the timeout by wrapping async gRPC operations (e.g., BridgeClient::connect(...).await and client.subscribe(...).await) with tokio::time::timeout(Duration::from_millis(config.timeout_ms)) so the connect/subscribe awaits fail if they exceed the configured timeout, mapping the timeout error into your existing Result error type; ensure you import Duration and tokio::time::timeout and handle the Timeout error to return a clear Err instead of hanging indefinitely.src/endpoints/sled.rs (3)
55-69:send()doesn't flush, butsend_batch()does — inconsistent durability.Already flagged in a prior review. Individual sends can be lost on crash while batches are durable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 55 - 69, The send() implementation writes the record but doesn't call a flush like send_batch() does, so single-message sends are not durable; update send() (the async fn send) to call the same flush used by send_batch (e.g., self.tree.flush() or self.db.flush() as used elsewhere) after insert and map any flush error to PublisherError::Retryable, ensuring the insert+flush path mirrors send_batch()'s durability guarantees.
250-269: Queue mode: message is at risk betweenpop_min()and Nack re-insert.Already flagged in a prior review. If the process crashes after
pop_min()but before the Nack-triggered re-insert, the message is permanently lost.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 250 - 269, Queue-mode currently calls pop_min() in receive(), risking permanent loss if the process crashes before the Nack re-insert; fix it by making the remove-and-mark-in-progress atomic: in receive() when delete_after_read is true, do not call pop_min() directly—instead perform a sled transaction (using sled::transaction::TransactionalTree or Tree::transaction) that atomically moves the key/value from the main tree into an "inflight" tree (a new Tree member like self.inflight_tree) and return the item from that transaction; update the Nack path (handle Nack/reinsert) to move the key back from self.inflight_tree into the main tree and Ack to remove it from self.inflight_tree so items are not lost across crashes. />
225-231: Blocking watcher thread may linger after consumer is dropped.Already flagged in a prior review. The spawned thread blocks on
subscriberiteration and only exits whentx.send_blocking()fails, which requires a new event to arrive after the consumer is dropped.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 225 - 231, The watcher thread currently blocks on iterating `subscriber` and only exits when `tx.send_blocking()` fails; change the loop to poll `subscriber` with a timeout (e.g., use `recv_timeout` or a non-blocking `try_recv`/`try_iter` if the subscriber API supports it) so the thread periodically wakes and checks the send side state, and replace the blocking send with a non-blocking check (`tx.try_send` or normal `send` but handle the error) to break the loop when the consumer is dropped; update the closure created in `std::thread::spawn(move || { ... })` around `subscriber` and `tx` to use `recv_timeout`/`try_recv` plus `tx.try_send`/error handling so the thread can exit promptly when the receiver is gone.src/endpoints/file.rs (1)
718-758: Offset file write is not atomic — known limitation.The
rewind → set_len(0) → write_all → flushsequence can lose the offset on crash between truncation and write. This was flagged in a prior review and the fallback (re-reading from start/tail) is safe. No change needed now.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/file.rs` around lines 718 - 758, The current non-atomic sequence in the commit closure that updates the offset file (the closure created for commit: crate::traits::BatchCommitFunc using the captured offset_file) can lose the offset between truncate and write; make the write atomic by writing the new offset to a temporary file in the same directory and then atomically renaming it into place (or use a library that provides atomic file replace). Concretely: in the Box::new move |...| closure that currently does rewind/set_len/write_all/flush on offset_file, instead create a temp path (e.g., original + ".tmp"), write the offset bytes to that temp file, flush and sync it, then rename/replace the original offset file with tokio::fs::rename (or equivalent) and sync the directory if available; ensure errors are logged as before.
🧹 Nitpick comments (6)
Cargo.toml (2)
104-104: Consider whethergrpcshould be in the default feature set.Making
grpcthe default pulls intonic,prost,protoc-bin-vendored, and the proto compilation toolchain for every user, even those who only need lightweight in-memory or file endpoints. This significantly increases compile time and binary size. A leaner default (e.g.,default = []) would let users opt in explicitly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` at line 104, The Cargo.toml currently sets default = ["grpc"], which forces heavy dependencies (tonic, prost, protoc-bin-vendored) into every build; change the default feature set to an empty list (e.g., default = []) so grpc is opt-in, and ensure the "grpc" feature remains defined so users can enable it explicitly; update any README or feature docs that reference default behavior to instruct users to opt into the grpc feature if they need gRPC/proto compilation.
84-101: Document the gRPC feature's MSRV requirement.The existing MSRV comment (lines 18–22) calls out
httpandawsfeatures requiring Rust 1.75+. Thegrpcfeature (tonic 0.14) also requires Rust 1.75+. Add a similar note for thegrpcfeature to maintain consistency and help users on older toolchains understand the requirement.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` around lines 84 - 101, Add a short MSRV note stating that the gRPC feature requires Rust 1.75+ alongside the existing MSRV comment for `http` and `aws`; place this note near the gRPC-related dependencies (tonic, tonic-prost, prost, tokio-stream and build deps tonic-prost-build, protoc-bin-vendored) in Cargo.toml so users reading the `grpc` feature block see the Rust 1.75+ requirement.src/endpoints/grpc.rs (2)
26-49:_clientfield is stored but never used after initialization.The
_clientfield (prefixed with_to suppress warnings) is only used duringnew()to create the subscription stream. After that, it's kept alive in aMutexbut never accessed. If it's solely to keep the gRPC connection alive, thestreamalready holds a reference to the underlying channel. Consider removing the field or documenting why it needs to be retained.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 26 - 49, The GrpcConsumer struct currently stores a Mutex-wrapped _client that is only used in new() and never referenced afterwards; remove the unused field _client from GrpcConsumer and adjust the constructor new() to no longer create or store the client (keep only the subscribed stream in stream), or if you intend to retain the connection for a reason, add a clear comment explaining why _client must be held alive and use a non-underscore name; update any references to GrpcConsumer::new and the struct definition accordingly so the code compiles with only the stream field (or documents the rationale if you keep _client).
142-146: Redundant metadata conversion.
msg.metadatais already aHashMap<String, String>andBridgeMessage.metadata(from proto) is alsoHashMap<String, String>. The.into_iter().collect()rebuilds the map unnecessarily.Proposed simplification
let bridge_messages = messages.into_iter().map(|msg| BridgeMessage { payload: msg.payload.to_vec(), id: fast_uuid_v7::format_uuid(msg.message_id).to_string(), - metadata: msg.metadata.into_iter().collect(), + metadata: msg.metadata, });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 142 - 146, The mapping recreates msg.metadata unnecessarily; since msg.metadata is already a HashMap<String,String> and BridgeMessage.metadata expects the same type, stop rebuilding it—use msg.metadata directly in the BridgeMessage construction (inside the bridge_messages mapping that builds BridgeMessage from msg) so replace the metadata: msg.metadata.into_iter().collect() expression with metadata: msg.metadata.src/endpoints/sled.rs (1)
28-40: GlobalSLED_DBSregistry never evicts closed databases.
get_or_open_dbinserts entries but never removes them. If the application dynamically creates and destroys endpoints over time,Dbinstances (and their underlying file handles) accumulate indefinitely. For a long-lived service with static configuration this is acceptable, but consider usingWeak<Db>or a cleanup mechanism if dynamic endpoint creation is planned.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 28 - 40, The SLED_DBS registry retains strong Db instances forever, causing leaked file handles; change the registry to hold weak references and prune expired entries in get_or_open_db: replace static SLED_DBS: Lazy<Mutex<HashMap<String, Db>>> with a HashMap<String, Weak<Db>> (store and return Arc/Db clones), on get_or_open_db attempt to upgrade the Weak to a strong Arc and return it if present, otherwise open a new Db wrapped in Arc, insert its Weak into the map, and remove any entries whose Weak::upgrade() fails during lookup (or run a short cleanup pass) so closed DBs are evicted; reference symbols: SLED_DBS, get_or_open_db, Db.src/endpoints/file.rs (1)
440-443: Offset propagation via metadata string is fragile.The file offset is injected into message metadata as a string (
"file_offset") and later parsed back tou64in the commit function (line 734). If a middleware or user mutates/removes the"file_offset"metadata key, offset tracking silently breaks. Consider using a dedicated field or a constant for the key name.Extract key name as a constant
+const FILE_OFFSET_META_KEY: &str = "file_offset"; + fn run_file_tail_task_sync( // ... ) { // ... if group_id.is_some() { msg.metadata - .insert("file_offset".to_string(), last_position.to_string()); + .insert(FILE_OFFSET_META_KEY.to_string(), last_position.to_string()); } // ... }And similarly in the commit function (line 734):
- .get("file_offset") + .get(FILE_OFFSET_META_KEY)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/file.rs` around lines 440 - 443, Extract the magic metadata key into a constant (e.g., const FILE_OFFSET_KEY: &str = "file_offset") and replace the literal usage in the insert call (msg.metadata.insert(...)) with FILE_OFFSET_KEY.to_string(); also update the corresponding commit function to read from msg.metadata using the same constant, validate/parse the value with proper error handling (log/warn and skip or fail instead of silently treating missing/invalid value), and consider adding an explicit optional field on your Message struct (e.g., file_offset: Option<u64>) to set when group_id.is_some() so offset propagation does not solely rely on a mutable metadata string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/endpoints/grpc.rs`:
- Around line 391-395: The test loop that awaits messages currently can hang
indefinitely; wrap the receive loop that uses received_messages,
messages_to_send and mem_dest_consumer.receive_batch(...) inside a
tokio::time::timeout(Duration::from_secs(...)) so the test fails fast on missing
messages; if the timeout elapses, unwrap or propagate a clear error/panic (or
return Err) so the test stops rather than blocking forever. Ensure you import
tokio::time::Duration and tokio::time::timeout and apply the timeout around the
whole waiting logic (not just a single receive_batch call) so the total wait is
bounded.
In `@src/endpoints/sled.rs`:
- Around line 302-311: receive_batch currently ignores the _max_messages
parameter and always returns one message; change receive_batch to loop up to
max_messages calling self.receive().await repeatedly, collecting
Received.message into a Vec and their Received.commit functions into a list, and
return ReceivedBatch with messages set to the collected Vec and commit set to a
new batch commit function that invokes each collected commit (in order) and
aggregates/propagates errors; use crate::traits::into_batch_commit_func or
implement a wrapper that calls each individual commit closure to create the
combined commit behavior so callers get a proper batched commit for
ReceivedBatch.
---
Outside diff comments:
In `@src/models.rs`:
- Around line 144-166: The function is_known_endpoint_name currently omits the
"reader" token, so EndpointType::Reader (serialized as "reader") isn't
recognized and malformed reader configs get wrapped as Custom instead of
surfacing serde errors; update is_known_endpoint_name to include "reader" in its
matches list so the fallback logic treating known endpoint names (used when
parsing EndpointType) correctly recognizes reader and lets serde return parse
errors.
---
Duplicate comments:
In `@src/endpoints/file.rs`:
- Around line 718-758: The current non-atomic sequence in the commit closure
that updates the offset file (the closure created for commit:
crate::traits::BatchCommitFunc using the captured offset_file) can lose the
offset between truncate and write; make the write atomic by writing the new
offset to a temporary file in the same directory and then atomically renaming it
into place (or use a library that provides atomic file replace). Concretely: in
the Box::new move |...| closure that currently does
rewind/set_len/write_all/flush on offset_file, instead create a temp path (e.g.,
original + ".tmp"), write the offset bytes to that temp file, flush and sync it,
then rename/replace the original offset file with tokio::fs::rename (or
equivalent) and sync the directory if available; ensure errors are logged as
before.
In `@src/endpoints/grpc.rs`:
- Around line 249-265: The test test_grpc_publisher_and_consumer currently binds
to fixed addresses (addr using port 50051/50052) which can conflict in CI;
change to bind a TcpListener to "[::1]:0" (ephemeral port), obtain the assigned
port via listener.local_addr(), and use that address for the client rather than
hardcoded ports; then pass the listener (or an incoming stream derived from it)
into Server::builder().serve_with_incoming(...) (or similar tonic API) when
starting the server so the test reliably uses an available port instead of
50051/50052.
- Around line 32-49: GrpcConfig::timeout_ms is accepted but not used; update
GrpcConsumer::new (and similarly GrpcPublisher::new) to apply the timeout by
wrapping async gRPC operations (e.g., BridgeClient::connect(...).await and
client.subscribe(...).await) with
tokio::time::timeout(Duration::from_millis(config.timeout_ms)) so the
connect/subscribe awaits fail if they exceed the configured timeout, mapping the
timeout error into your existing Result error type; ensure you import Duration
and tokio::time::timeout and handle the Timeout error to return a clear Err
instead of hanging indefinitely.
In `@src/endpoints/sled.rs`:
- Around line 55-69: The send() implementation writes the record but doesn't
call a flush like send_batch() does, so single-message sends are not durable;
update send() (the async fn send) to call the same flush used by send_batch
(e.g., self.tree.flush() or self.db.flush() as used elsewhere) after insert and
map any flush error to PublisherError::Retryable, ensuring the insert+flush path
mirrors send_batch()'s durability guarantees.
- Around line 250-269: Queue-mode currently calls pop_min() in receive(),
risking permanent loss if the process crashes before the Nack re-insert; fix it
by making the remove-and-mark-in-progress atomic: in receive() when
delete_after_read is true, do not call pop_min() directly—instead perform a sled
transaction (using sled::transaction::TransactionalTree or Tree::transaction)
that atomically moves the key/value from the main tree into an "inflight" tree
(a new Tree member like self.inflight_tree) and return the item from that
transaction; update the Nack path (handle Nack/reinsert) to move the key back
from self.inflight_tree into the main tree and Ack to remove it from
self.inflight_tree so items are not lost across crashes. />
- Around line 225-231: The watcher thread currently blocks on iterating
`subscriber` and only exits when `tx.send_blocking()` fails; change the loop to
poll `subscriber` with a timeout (e.g., use `recv_timeout` or a non-blocking
`try_recv`/`try_iter` if the subscriber API supports it) so the thread
periodically wakes and checks the send side state, and replace the blocking send
with a non-blocking check (`tx.try_send` or normal `send` but handle the error)
to break the loop when the consumer is dropped; update the closure created in
`std::thread::spawn(move || { ... })` around `subscriber` and `tx` to use
`recv_timeout`/`try_recv` plus `tx.try_send`/error handling so the thread can
exit promptly when the receiver is gone.
---
Nitpick comments:
In `@Cargo.toml`:
- Line 104: The Cargo.toml currently sets default = ["grpc"], which forces heavy
dependencies (tonic, prost, protoc-bin-vendored) into every build; change the
default feature set to an empty list (e.g., default = []) so grpc is opt-in, and
ensure the "grpc" feature remains defined so users can enable it explicitly;
update any README or feature docs that reference default behavior to instruct
users to opt into the grpc feature if they need gRPC/proto compilation.
- Around line 84-101: Add a short MSRV note stating that the gRPC feature
requires Rust 1.75+ alongside the existing MSRV comment for `http` and `aws`;
place this note near the gRPC-related dependencies (tonic, tonic-prost, prost,
tokio-stream and build deps tonic-prost-build, protoc-bin-vendored) in
Cargo.toml so users reading the `grpc` feature block see the Rust 1.75+
requirement.
In `@src/endpoints/file.rs`:
- Around line 440-443: Extract the magic metadata key into a constant (e.g.,
const FILE_OFFSET_KEY: &str = "file_offset") and replace the literal usage in
the insert call (msg.metadata.insert(...)) with FILE_OFFSET_KEY.to_string();
also update the corresponding commit function to read from msg.metadata using
the same constant, validate/parse the value with proper error handling (log/warn
and skip or fail instead of silently treating missing/invalid value), and
consider adding an explicit optional field on your Message struct (e.g.,
file_offset: Option<u64>) to set when group_id.is_some() so offset propagation
does not solely rely on a mutable metadata string.
In `@src/endpoints/grpc.rs`:
- Around line 26-49: The GrpcConsumer struct currently stores a Mutex-wrapped
_client that is only used in new() and never referenced afterwards; remove the
unused field _client from GrpcConsumer and adjust the constructor new() to no
longer create or store the client (keep only the subscribed stream in stream),
or if you intend to retain the connection for a reason, add a clear comment
explaining why _client must be held alive and use a non-underscore name; update
any references to GrpcConsumer::new and the struct definition accordingly so the
code compiles with only the stream field (or documents the rationale if you keep
_client).
- Around line 142-146: The mapping recreates msg.metadata unnecessarily; since
msg.metadata is already a HashMap<String,String> and BridgeMessage.metadata
expects the same type, stop rebuilding it—use msg.metadata directly in the
BridgeMessage construction (inside the bridge_messages mapping that builds
BridgeMessage from msg) so replace the metadata:
msg.metadata.into_iter().collect() expression with metadata: msg.metadata.
In `@src/endpoints/sled.rs`:
- Around line 28-40: The SLED_DBS registry retains strong Db instances forever,
causing leaked file handles; change the registry to hold weak references and
prune expired entries in get_or_open_db: replace static SLED_DBS:
Lazy<Mutex<HashMap<String, Db>>> with a HashMap<String, Weak<Db>> (store and
return Arc/Db clones), on get_or_open_db attempt to upgrade the Weak to a strong
Arc and return it if present, otherwise open a new Db wrapped in Arc, insert its
Weak into the map, and remove any entries whose Weak::upgrade() fails during
lookup (or run a short cleanup pass) so closed DBs are evicted; reference
symbols: SLED_DBS, get_or_open_db, Db.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.tomlbuild.rssrc/endpoints/file.rssrc/endpoints/grpc.rssrc/endpoints/sled.rssrc/models.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- build.rs
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
Cargo.toml (1)
84-101:⚠️ Potential issue | 🟡 MinorAdd MSRV documentation for the grpc feature, aligned with existing notes for http and aws.
The
grpcfeature requires Rust 1.82+ (due toprost 0.14), which is 12 minor versions above the crate's declared MSRV of 1.70. Documentation for this MSRV delta is missing. Thehttpandawsfeatures already have MSRV notes in Cargo.toml; add a similar note forgrpcto inform users who enable that feature:# Note: The grpc feature requires Rust 1.82+ (due to prost 0.14), which is higher than the crate MSRV (1.70).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` around lines 84 - 101, Add an MSRV note next to the grpc-related dependency entries in Cargo.toml (the tonic/prost/prost 0.14 / tonic-prost-build lines) explaining that enabling the grpc feature requires Rust 1.82+; insert a comment like: "# Note: The grpc feature requires Rust 1.82+ (due to prost 0.14), which is higher than the crate MSRV (1.70)." so users enabling the grpc feature see the higher MSRV requirement.
♻️ Duplicate comments (3)
src/endpoints/grpc.rs (3)
415-419:⚠️ Potential issue | 🟡 MinorAdd a timeout around the e2e receive loop to prevent hangs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 415 - 419, The receive loop using mem_dest_consumer.receive_batch(...) can hang indefinitely; wrap each await in a timeout (e.g., tokio::time::timeout) or use tokio::select with a sleep deadline and handle timeout errors instead of unwrap. Replace the await unwrap on mem_dest_consumer.receive_batch(5).await.unwrap() with a timeout call (e.g., timeout(Duration::from_secs(X), mem_dest_consumer.receive_batch(5)).await) and handle the Err(timeout) by failing the test or breaking out with a clear error, while preserving the existing logic that extends received_messages until received_messages.len() >= messages_to_send.len().
349-363:⚠️ Potential issue | 🟡 MinorUse ephemeral ports in gRPC tests to avoid CI flakes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 349 - 363, The test test_grpc_route_end_to_end currently binds to a fixed port via addr and calls Server::builder().serve(...), which can cause CI port conflicts; change it to bind a TcpListener on an ephemeral port (e.g. tokio::net::TcpListener::bind("127.0.0.1:0").await), obtain the actual socket address via listener.local_addr(), spawn the server using Server::builder().serve_with_incoming or the appropriate tonic incoming stream built from that listener (instead of serve(addr,...)), and use the discovered local_addr for the test client connection so the MockBridge/bridge server runs on a safe ephemeral port.
273-289:⚠️ Potential issue | 🟡 MinorUse ephemeral ports in gRPC tests to avoid CI flakes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 273 - 289, The test test_grpc_publisher_and_consumer currently hardcodes addr = "[::1]:50051", which causes CI flakes; change the test to bind to an ephemeral port instead: create a tokio::net::TcpListener bound to "[::1]:0", get the assigned socket address via listener.local_addr(), use tonic's incoming stream (or serve_with_incoming) to start Server::builder() with that listener, and use the same retrieved address for any client connections; update the code locations around addr, Server::builder(), and BridgeServer::new(bridge) to use the listener/incoming and the retrieved local_addr so the test uses a dynamic ephemeral port.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Cargo.toml`:
- Around line 129-130: Update the comment that precedes the grpc feature
definition to remove the outdated reference to protobuf-src and instead mention
that the grpc feature uses protoc-bin-vendored (which affects build times);
locate the grpc feature line (grpc = ["tonic", "prost", "tokio-stream",
"tonic-prost", "tonic-prost-build", "protoc-bin-vendored"]) and replace the note
so it accurately documents the current dependency (protoc-bin-vendored) and its
impact on build time.
In `@src/endpoints/file.rs`:
- Around line 713-748: The commit closure currently uses .max() over
dispositions which can advance the offset past Nacked messages; change it to
"leading-ack only" semantics by scanning the zipped dispositions and
captured_messages in order and stopping at the first non-ack (or when a message
lacks a parsable "file_offset"), building a last_contiguous_offset from
consecutive leading Ack/Reply entries only; then use that last_contiguous_offset
(if any) to rewind/truncate/write the offset_file. Update the closure created in
the commit binding (the Box::new move |dispositions| { ... } block), replacing
the dispositions.iter().zip(...).filter_map(...).max() logic with an ordered
loop that breaks on the first non-ack/non-parsable offset so only contiguous
leading acks advance the file.
In `@src/endpoints/grpc.rs`:
- Around line 33-149: The GrpcConfig TLS fields aren't applied in
GrpcConsumer::new and GrpcPublisher::new; update both to build a
tonic::transport::ClientTlsConfig from config.tls (load CA from ca_file into
Certificate, load cert+key into Identity when present, set domain
validation/accept_invalid_certs) and call endpoint =
endpoint.tls_config(client_tls) when tls.required or any TLS data present, and
return an error if tls.required is true but the URL scheme is plaintext
(non-tls); use ClientTlsConfig::new(), Certificate, and Identity to wire
CA/cert/key and ensure endpoint.connect()/BridgeClient::new() use the configured
endpoint.
In `@src/endpoints/sled.rs`:
- Around line 235-371: SledConsumer currently leaves messages in inflight_tree
across restarts (in SledConsumer::new / inflight_tree), which strands messages;
fix by, during construction of SledConsumer (in SledConsumer::new) when
delete_after_read is true, scanning inflight_tree (e.g., inflight_tree.iter())
and atomically moving each entry back into tree (or re-inserting each key/value
into tree and removing from inflight_tree) so inflight entries are requeued on
startup; ensure this runs before returning the SledConsumer and handle sled
errors (map to anyhow) to avoid silent loss.
In `@src/models.rs`:
- Around line 676-706: The custom Deserialize for FileConfig currently defaults
a missing "mode" to "consume" by inserting into the flattened `extra` (in
FileConfig::deserialize using FileConfigHelper), which silently ignores a legacy
`subscribe_mode` field; change the logic in the deserialize impl to detect and
prefer legacy `subscribe_mode` when present (e.g., read `helper.extra`/map for
"subscribe_mode" and translate it to the corresponding FileConsumerMode before
inserting/setting "mode"), or else return an explicit error if neither is
present—update the path where `extra` is inspected and where `mode` is derived
(the code that constructs `mode: FileConsumerMode` from `extra`) to apply this
mapping so legacy configs are correctly honored.
---
Outside diff comments:
In `@Cargo.toml`:
- Around line 84-101: Add an MSRV note next to the grpc-related dependency
entries in Cargo.toml (the tonic/prost/prost 0.14 / tonic-prost-build lines)
explaining that enabling the grpc feature requires Rust 1.82+; insert a comment
like: "# Note: The grpc feature requires Rust 1.82+ (due to prost 0.14), which
is higher than the crate MSRV (1.70)." so users enabling the grpc feature see
the higher MSRV requirement.
---
Duplicate comments:
In `@src/endpoints/grpc.rs`:
- Around line 415-419: The receive loop using
mem_dest_consumer.receive_batch(...) can hang indefinitely; wrap each await in a
timeout (e.g., tokio::time::timeout) or use tokio::select with a sleep deadline
and handle timeout errors instead of unwrap. Replace the await unwrap on
mem_dest_consumer.receive_batch(5).await.unwrap() with a timeout call (e.g.,
timeout(Duration::from_secs(X), mem_dest_consumer.receive_batch(5)).await) and
handle the Err(timeout) by failing the test or breaking out with a clear error,
while preserving the existing logic that extends received_messages until
received_messages.len() >= messages_to_send.len().
- Around line 349-363: The test test_grpc_route_end_to_end currently binds to a
fixed port via addr and calls Server::builder().serve(...), which can cause CI
port conflicts; change it to bind a TcpListener on an ephemeral port (e.g.
tokio::net::TcpListener::bind("127.0.0.1:0").await), obtain the actual socket
address via listener.local_addr(), spawn the server using
Server::builder().serve_with_incoming or the appropriate tonic incoming stream
built from that listener (instead of serve(addr,...)), and use the discovered
local_addr for the test client connection so the MockBridge/bridge server runs
on a safe ephemeral port.
- Around line 273-289: The test test_grpc_publisher_and_consumer currently
hardcodes addr = "[::1]:50051", which causes CI flakes; change the test to bind
to an ephemeral port instead: create a tokio::net::TcpListener bound to
"[::1]:0", get the assigned socket address via listener.local_addr(), use
tonic's incoming stream (or serve_with_incoming) to start Server::builder() with
that listener, and use the same retrieved address for any client connections;
update the code locations around addr, Server::builder(), and
BridgeServer::new(bridge) to use the listener/incoming and the retrieved
local_addr so the test uses a dynamic ephemeral port.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
Cargo.tomlbuild.rssrc/endpoints/file.rssrc/endpoints/grpc.rssrc/endpoints/reader.rssrc/endpoints/sled.rssrc/models.rs
| # Note: The grpc feature requires protobuf-src which can significantly increase build times. | ||
| grpc = ["tonic", "prost", "tokio-stream", "tonic-prost", "tonic-prost-build", "protoc-bin-vendored"] |
There was a problem hiding this comment.
Update grpc feature note — protobuf-src isn’t used anymore.
The comment mentions protobuf-src, but the feature now uses protoc-bin-vendored. This is minor but user-facing documentation should stay accurate.
✏️ Suggested update
-# Note: The grpc feature requires protobuf-src which can significantly increase build times.
+# Note: The grpc feature pulls in vendored protoc (protoc-bin-vendored), which can increase build times.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Note: The grpc feature requires protobuf-src which can significantly increase build times. | |
| grpc = ["tonic", "prost", "tokio-stream", "tonic-prost", "tonic-prost-build", "protoc-bin-vendored"] | |
| # Note: The grpc feature pulls in vendored protoc (protoc-bin-vendored), which can increase build times. | |
| grpc = ["tonic", "prost", "tokio-stream", "tonic-prost", "tonic-prost-build", "protoc-bin-vendored"] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@Cargo.toml` around lines 129 - 130, Update the comment that precedes the grpc
feature definition to remove the outdated reference to protobuf-src and instead
mention that the grpc feature uses protoc-bin-vendored (which affects build
times); locate the grpc feature line (grpc = ["tonic", "prost", "tokio-stream",
"tonic-prost", "tonic-prost-build", "protoc-bin-vendored"]) and replace the note
so it accurately documents the current dependency (protoc-bin-vendored) and its
impact on build time.
| let commit: crate::traits::BatchCommitFunc = if let Some(offset_file) = | ||
| &c.offset_file | ||
| { | ||
| let offset_file = offset_file.clone(); | ||
| let captured_messages = messages.clone(); | ||
|
|
||
| Box::new( | ||
| move |dispositions: Vec<crate::traits::MessageDisposition>| { | ||
| Box::pin(async move { | ||
| let max_offset = dispositions | ||
| .iter() | ||
| .zip(captured_messages.iter()) | ||
| .filter_map(|(d, m)| match d { | ||
| crate::traits::MessageDisposition::Ack | ||
| | crate::traits::MessageDisposition::Reply(_) => m | ||
| .metadata | ||
| .get("file_offset") | ||
| .and_then(|s| s.parse::<u64>().ok()), | ||
| _ => None, | ||
| }) | ||
| .max(); | ||
|
|
||
| if let Some(offset) = max_offset { | ||
| let mut file = offset_file.lock().await; | ||
| if let Err(e) = file.rewind().await { | ||
| tracing::error!("Failed to rewind offset file: {}", e); | ||
| } else if let Err(e) = file.set_len(0).await { | ||
| tracing::error!("Failed to truncate offset file: {}", e); | ||
| } else if let Err(e) = | ||
| file.write_all(offset.to_string().as_bytes()).await | ||
| { | ||
| tracing::error!("Failed to write offset file: {}", e); | ||
| } else if let Err(e) = file.flush().await { | ||
| tracing::error!("Failed to flush offset file: {}", e); | ||
| } | ||
| } |
There was a problem hiding this comment.
Avoid skipping Nacked messages when persisting group offsets.
The offset file is advanced to the max acked offset even if earlier messages are Nacked. That can skip unacked messages on restart (data loss). This needs “leading-ack only” semantics similar to queue mode.
✅ Suggested fix (advance only through contiguous Acks)
- let max_offset = dispositions
- .iter()
- .zip(captured_messages.iter())
- .filter_map(|(d, m)| match d {
- crate::traits::MessageDisposition::Ack
- | crate::traits::MessageDisposition::Reply(_) => m
- .metadata
- .get("file_offset")
- .and_then(|s| s.parse::<u64>().ok()),
- _ => None,
- })
- .max();
+ let mut max_offset = None;
+ for (d, m) in dispositions.iter().zip(captured_messages.iter()) {
+ match d {
+ crate::traits::MessageDisposition::Ack
+ | crate::traits::MessageDisposition::Reply(_) => {
+ if let Some(off) = m
+ .metadata
+ .get("file_offset")
+ .and_then(|s| s.parse::<u64>().ok())
+ {
+ max_offset = Some(off);
+ }
+ }
+ crate::traits::MessageDisposition::Nack => break,
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let commit: crate::traits::BatchCommitFunc = if let Some(offset_file) = | |
| &c.offset_file | |
| { | |
| let offset_file = offset_file.clone(); | |
| let captured_messages = messages.clone(); | |
| Box::new( | |
| move |dispositions: Vec<crate::traits::MessageDisposition>| { | |
| Box::pin(async move { | |
| let max_offset = dispositions | |
| .iter() | |
| .zip(captured_messages.iter()) | |
| .filter_map(|(d, m)| match d { | |
| crate::traits::MessageDisposition::Ack | |
| | crate::traits::MessageDisposition::Reply(_) => m | |
| .metadata | |
| .get("file_offset") | |
| .and_then(|s| s.parse::<u64>().ok()), | |
| _ => None, | |
| }) | |
| .max(); | |
| if let Some(offset) = max_offset { | |
| let mut file = offset_file.lock().await; | |
| if let Err(e) = file.rewind().await { | |
| tracing::error!("Failed to rewind offset file: {}", e); | |
| } else if let Err(e) = file.set_len(0).await { | |
| tracing::error!("Failed to truncate offset file: {}", e); | |
| } else if let Err(e) = | |
| file.write_all(offset.to_string().as_bytes()).await | |
| { | |
| tracing::error!("Failed to write offset file: {}", e); | |
| } else if let Err(e) = file.flush().await { | |
| tracing::error!("Failed to flush offset file: {}", e); | |
| } | |
| } | |
| let commit: crate::traits::BatchCommitFunc = if let Some(offset_file) = | |
| &c.offset_file | |
| { | |
| let offset_file = offset_file.clone(); | |
| let captured_messages = messages.clone(); | |
| Box::new( | |
| move |dispositions: Vec<crate::traits::MessageDisposition>| { | |
| Box::pin(async move { | |
| let mut max_offset = None; | |
| for (d, m) in dispositions.iter().zip(captured_messages.iter()) { | |
| match d { | |
| crate::traits::MessageDisposition::Ack | |
| | crate::traits::MessageDisposition::Reply(_) => { | |
| if let Some(off) = m | |
| .metadata | |
| .get("file_offset") | |
| .and_then(|s| s.parse::<u64>().ok()) | |
| { | |
| max_offset = Some(off); | |
| } | |
| } | |
| crate::traits::MessageDisposition::Nack => break, | |
| } | |
| } | |
| if let Some(offset) = max_offset { | |
| let mut file = offset_file.lock().await; | |
| if let Err(e) = file.rewind().await { | |
| tracing::error!("Failed to rewind offset file: {}", e); | |
| } else if let Err(e) = file.set_len(0).await { | |
| tracing::error!("Failed to truncate offset file: {}", e); | |
| } else if let Err(e) = | |
| file.write_all(offset.to_string().as_bytes()).await | |
| { | |
| tracing::error!("Failed to write offset file: {}", e); | |
| } else if let Err(e) = file.flush().await { | |
| tracing::error!("Failed to flush offset file: {}", e); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/endpoints/file.rs` around lines 713 - 748, The commit closure currently
uses .max() over dispositions which can advance the offset past Nacked messages;
change it to "leading-ack only" semantics by scanning the zipped dispositions
and captured_messages in order and stopping at the first non-ack (or when a
message lacks a parsable "file_offset"), building a last_contiguous_offset from
consecutive leading Ack/Reply entries only; then use that last_contiguous_offset
(if any) to rewind/truncate/write the offset_file. Update the closure created in
the commit binding (the Box::new move |dispositions| { ... } block), replacing
the dispositions.iter().zip(...).filter_map(...).max() logic with an ordered
loop that breaks on the first non-ack/non-parsable offset so only contiguous
leading acks advance the file.
| pub async fn new(config: &GrpcConfig) -> Result<Self> { | ||
| let mut endpoint = tonic::transport::Endpoint::from_shared(config.url.clone())?; | ||
| if let Some(timeout) = config.timeout_ms { | ||
| endpoint = endpoint.connect_timeout(Duration::from_millis(timeout)); | ||
| } | ||
| let channel = endpoint.connect().await?; | ||
| let mut client = BridgeClient::new(channel); | ||
|
|
||
| let topic = config | ||
| .topic | ||
| .clone() | ||
| .unwrap_or_else(|| "default".to_string()); | ||
|
|
||
| // Eagerly subscribe to ensure the consumer is ready upon creation. | ||
| let request = Request::new(SubscribeRequest { topic }); | ||
| let stream = if let Some(timeout) = config.timeout_ms { | ||
| tokio::time::timeout(Duration::from_millis(timeout), client.subscribe(request)) | ||
| .await | ||
| .map_err(|_| anyhow::anyhow!("gRPC subscribe timed out"))?? | ||
| } else { | ||
| client.subscribe(request).await? | ||
| } | ||
| .into_inner(); | ||
|
|
||
| Ok(Self { | ||
| _client: client, | ||
| stream: Some(stream), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl MessageConsumer for GrpcConsumer { | ||
| async fn receive_batch( | ||
| &mut self, | ||
| max_messages: usize, | ||
| ) -> Result<crate::outcomes::ReceivedBatch, ConsumerError> { | ||
| let mut messages = Vec::with_capacity(max_messages); | ||
| if let Some(stream) = self.stream.as_mut() { | ||
| // The first message will block until it arrives. Subsequent messages will be | ||
| // fetched with a short timeout to fill the batch without blocking indefinitely. | ||
| loop { | ||
| let msg_future = stream.message(); | ||
| let msg_result = if messages.is_empty() { | ||
| // Block for the first message | ||
| Ok(msg_future.await) | ||
| } else { | ||
| // Poll for subsequent messages | ||
| tokio::time::timeout(Duration::from_millis(GRPC_BATCH_POLL_MS), msg_future) | ||
| .await | ||
| }; | ||
|
|
||
| match msg_result { | ||
| Ok(Ok(Some(msg))) => { | ||
| let message_id = if msg.id.is_empty() { | ||
| None | ||
| } else if let Ok(uuid) = Uuid::parse_str(&msg.id) { | ||
| Some(uuid.as_u128()) | ||
| } else if let Ok(n) = | ||
| u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) | ||
| { | ||
| Some(n) | ||
| } else { | ||
| msg.id.parse::<u128>().ok() | ||
| }; | ||
|
|
||
| let canonical = CanonicalMessage::new(msg.payload, message_id) | ||
| .with_metadata(msg.metadata); | ||
| messages.push(canonical); | ||
| if messages.len() >= max_messages { | ||
| break; | ||
| } | ||
| } | ||
| Ok(Ok(None)) => break, // Stream ended | ||
| Ok(Err(e)) => return Err(ConsumerError::Connection(e.into())), | ||
| Err(_) => break, // Timeout on subsequent message | ||
| } | ||
| } | ||
| } else { | ||
| // This case should now be impossible if `new` always initializes the stream. | ||
| return Err(ConsumerError::Connection(anyhow::anyhow!( | ||
| "gRPC stream not initialized. This is a bug." | ||
| ))); | ||
| } | ||
|
|
||
| if messages.is_empty() { | ||
| Err(ConsumerError::EndOfStream) | ||
| } else { | ||
| Ok(crate::outcomes::ReceivedBatch { | ||
| messages, | ||
| commit: Box::new(|_| Box::pin(async { Ok(()) })), // Auto-ack for now | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
| } | ||
|
|
||
| pub struct GrpcPublisher { | ||
| client: BridgeClient<Channel>, | ||
| timeout: Option<Duration>, | ||
| } | ||
|
|
||
| impl GrpcPublisher { | ||
| pub async fn new(config: &GrpcConfig) -> Result<Self> { | ||
| let mut endpoint = tonic::transport::Endpoint::from_shared(config.url.clone())?; | ||
| if let Some(timeout) = config.timeout_ms { | ||
| endpoint = endpoint.connect_timeout(Duration::from_millis(timeout)); | ||
| } | ||
| let client = BridgeClient::new(endpoint.connect().await?); | ||
| Ok(Self { | ||
| client, | ||
| timeout: config.timeout_ms.map(Duration::from_millis), | ||
| }) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.rs" | head -20Repository: marcomq/mq-bridge
Length of output: 552
🏁 Script executed:
rg "struct GrpcConfig" -A 20Repository: marcomq/mq-bridge
Length of output: 1044
🏁 Script executed:
rg "TlsConfig\|tls\|TLS" src/endpoints/grpc.rsRepository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
rg "struct TlsConfig" -A 15Repository: marcomq/mq-bridge
Length of output: 900
🏁 Script executed:
rg "pub tls:" src/models.rs -B 5 -A 10Repository: marcomq/mq-bridge
Length of output: 6357
🌐 Web query:
tonic 0.14 transport endpoint TLS configuration ClientTlsConfig
💡 Result:
In tonic 0.14, client TLS is configured by building a ClientTlsConfig and passing it to Endpoint::tls_config(...) (or Channel::from_static(...).tls_config(...), since Channel is built from an Endpoint). The tls_config method returns Result<Self, tonic::transport::Error>. [1][2]
Cargo features (required for TLS)
Enable a rustls backend feature (pick one) and (optionally) roots features, e.g.: tls-ring or tls-aws-lc, plus tls-native-roots / tls-webpki-roots if you want built-in roots. [3]
Typical usage
use tonic::transport::{Channel, ClientTlsConfig, Certificate};
let ca_pem = std::fs::read("ca.pem")?;
let channel = Channel::from_static("https://example.com")
.tls_config(
ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(ca_pem))
.domain_name("example.com")
)?
.connect()
.await?;This pattern (Channel::from_static(...).tls_config(ClientTlsConfig::new()...)) is shown in the tonic transport docs. [2]
What you can set on ClientTlsConfig
ClientTlsConfig is the TLS builder for endpoints; it supports (among others) new(), domain_name(...), ca_certificate(...), identity(...), plus options like with_native_roots(), with_enabled_roots(), and timeout(...) (handshake timeout). [1]
Sources: ClientTlsConfig implementation (tonic transport client TLS) [1], Endpoint::tls_config(...) source [4], tonic transport TLS example [2], tonic 0.14 feature flags list [3].
References:
[1] (docs.rs)
[2] (docs.rs)
[3] (lib.rs)
[4] (prisma.github.io)
Citations:
- 1: https://docs.rs/tonic/latest/src/tonic/transport/channel/tls.rs.html?utm_source=openai
- 2: https://docs.rs/tonic/latest/tonic/transport/?utm_source=openai
- 3: https://lib.rs/crates/tonic/features?utm_source=openai
- 4: https://prisma.github.io/prisma-engines/doc/src/tonic/transport/channel/endpoint.rs.html?utm_source=openai
Wire TlsConfig into gRPC endpoint initialization using ClientTlsConfig.
GrpcConfig exposes TLS settings (required, ca_file, cert_file, key_file, accept_invalid_certs), but GrpcConsumer::new() and GrpcPublisher::new() do not apply them. If tls.required is set, the client can still connect over plaintext, silently violating the config. In tonic 0.14, use endpoint.tls_config(ClientTlsConfig::new()) to apply CA certificates, client identity, and domain validation. Reject non-TLS URLs when tls.required is true.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/endpoints/grpc.rs` around lines 33 - 149, The GrpcConfig TLS fields
aren't applied in GrpcConsumer::new and GrpcPublisher::new; update both to build
a tonic::transport::ClientTlsConfig from config.tls (load CA from ca_file into
Certificate, load cert+key into Identity when present, set domain
validation/accept_invalid_certs) and call endpoint =
endpoint.tls_config(client_tls) when tls.required or any TLS data present, and
return an error if tls.required is true but the URL scheme is plaintext
(non-tls); use ClientTlsConfig::new(), Certificate, and Identity to wire
CA/cert/key and ensure endpoint.connect()/BridgeClient::new() use the configured
endpoint.
| impl<'de> Deserialize<'de> for FileConfig { | ||
| fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> | ||
| where | ||
| D: Deserializer<'de>, | ||
| { | ||
| struct FileConfigVisitor; | ||
| impl<'de> Visitor<'de> for FileConfigVisitor { | ||
| type Value = FileConfig; | ||
| fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { | ||
| formatter.write_str("string or map") | ||
| } | ||
| fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> | ||
| where | ||
| E: serde::de::Error, | ||
| { | ||
| Ok(FileConfig { | ||
| path: value.to_string(), | ||
| subscribe_mode: false, | ||
| delete: None, | ||
| }) | ||
| } | ||
| fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error> | ||
| where | ||
| M: MapAccess<'de>, | ||
| { | ||
| let mut path = None; | ||
| let mut consume = true; | ||
| let mut subscribe_mode = None; | ||
| let mut delete = None; | ||
| while let Some(key) = map.next_key::<String>()? { | ||
| match key.as_str() { | ||
| "path" => { | ||
| if path.is_some() { | ||
| return Err(serde::de::Error::duplicate_field("path")); | ||
| } | ||
| path = Some(map.next_value()?); | ||
| } | ||
| "consume" => { | ||
| consume = map.next_value()?; | ||
| } | ||
| "subscribe_mode" => { | ||
| if subscribe_mode.is_some() { | ||
| return Err(serde::de::Error::duplicate_field("subscribe_mode")); | ||
| } | ||
| subscribe_mode = Some(map.next_value()?); | ||
| } | ||
| "delete" => { | ||
| if delete.is_some() { | ||
| return Err(serde::de::Error::duplicate_field("delete")); | ||
| } | ||
| delete = map.next_value()?; | ||
| } | ||
| _ => { | ||
| let _ = map.next_value::<serde::de::IgnoredAny>()?; | ||
| } | ||
| } | ||
| } | ||
| let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?; | ||
| Ok(FileConfig { | ||
| path, | ||
| subscribe_mode: subscribe_mode.unwrap_or(!consume), | ||
| delete, | ||
| }) | ||
| #[derive(Deserialize)] | ||
| struct FileConfigHelper { | ||
| path: String, | ||
| #[serde(flatten)] | ||
| extra: serde_json::Value, | ||
| } | ||
|
|
||
| let helper = FileConfigHelper::deserialize(deserializer)?; | ||
| let mut extra = helper.extra; | ||
|
|
||
| if let serde_json::Value::Object(ref mut map) = extra { | ||
| if !map.contains_key("mode") { | ||
| map.insert( | ||
| "mode".to_string(), | ||
| serde_json::Value::String("consume".to_string()), | ||
| ); | ||
| } | ||
| } | ||
| deserializer.deserialize_any(FileConfigVisitor) | ||
|
|
||
| let mode: FileConsumerMode = | ||
| serde_json::from_value(extra).map_err(serde::de::Error::custom)?; | ||
|
|
||
| Ok(FileConfig { | ||
| path: helper.path, | ||
| mode, | ||
| }) |
There was a problem hiding this comment.
Back-compat bug: legacy subscribe_mode silently maps to Consume.
If a config omits mode but includes legacy subscribe_mode, the current logic inserts mode=consume, ignoring the legacy flag and changing behavior. That’s a breaking change in practice. Consider mapping legacy flags to the correct mode (or explicitly error).
🛠️ Suggested compatibility mapping
if let serde_json::Value::Object(ref mut map) = extra {
if !map.contains_key("mode") {
- map.insert(
- "mode".to_string(),
- serde_json::Value::String("consume".to_string()),
- );
+ let subscribe_mode = map.get("subscribe_mode").and_then(|v| v.as_bool());
+ let delete = map.get("delete").and_then(|v| v.as_bool()).unwrap_or(false);
+ if let Some(subscribe) = subscribe_mode {
+ let mode = if subscribe { "subscribe" } else { "consume" };
+ map.insert("mode".to_string(), serde_json::Value::String(mode.to_string()));
+ map.insert("delete".to_string(), serde_json::Value::Bool(delete));
+ } else {
+ map.insert(
+ "mode".to_string(),
+ serde_json::Value::String("consume".to_string()),
+ );
+ }
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/models.rs` around lines 676 - 706, The custom Deserialize for FileConfig
currently defaults a missing "mode" to "consume" by inserting into the flattened
`extra` (in FileConfig::deserialize using FileConfigHelper), which silently
ignores a legacy `subscribe_mode` field; change the logic in the deserialize
impl to detect and prefer legacy `subscribe_mode` when present (e.g., read
`helper.extra`/map for "subscribe_mode" and translate it to the corresponding
FileConsumerMode before inserting/setting "mode"), or else return an explicit
error if neither is present—update the path where `extra` is inspected and where
`mode` is derived (the code that constructs `mode: FileConsumerMode` from
`extra`) to apply this mapping so legacy configs are correctly honored.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (4)
src/endpoints/sled.rs (1)
235-245:⚠️ Potential issue | 🟠 MajorRecover inflight entries on startup to avoid stranded messages.
If the consumer crashes, entries remain in
inflight_treeand are never re-queued, which can permanently hide messages. Consider requeuing inflight entries whendelete_after_readis enabled.🧯 Minimal recovery on startup
let inflight_tree = db .open_tree(format!("{}_inflight", tree_name)) .context("Failed to open Sled inflight tree")?; + + if config.delete_after_read { + for item in inflight_tree.iter() { + let (k, v) = item.map_err(|e| anyhow!(e))?; + tree.insert(k, v).map_err(|e| anyhow!(e))?; + } + inflight_tree.clear().map_err(|e| anyhow!(e))?; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/sled.rs` around lines 235 - 245, SledConsumer::new currently opens both the main tree and the inflight_tree but does not recover leftover inflight entries; update SledConsumer::new to, when config.delete_after_read is true, iterate over all key/value pairs in inflight_tree on startup and requeue them back into the main tree (or restore the original state expected by dequeue), then remove those entries from inflight_tree; perform the move in a single sled transaction or use a batch to ensure atomicity and avoid duplicates, and reference the existing symbols tree, inflight_tree, get_or_open_db, and delete_after_read when locating where to add this startup recovery logic.src/endpoints/grpc.rs (3)
418-421:⚠️ Potential issue | 🟡 MinorE2E receive loop still has no timeout — hangs indefinitely if messages are dropped.
The
whileloop will block forever when the gRPC-to-memory route fails to deliver messages (e.g., a race between route setup and sends). This was raised in a previous review and marked addressed but the fix is absent.⏱️ Proposed fix
- let mut received_messages = Vec::new(); - while received_messages.len() < messages_to_send.len() { - let batch = mem_dest_consumer.receive_batch(5).await.unwrap(); - received_messages.extend(batch.messages); - } + let mut received_messages = Vec::new(); + tokio::time::timeout(std::time::Duration::from_secs(10), async { + while received_messages.len() < messages_to_send.len() { + if let Ok(batch) = mem_dest_consumer.receive_batch(5).await { + received_messages.extend(batch.messages); + } + } + }) + .await + .expect("Timed out waiting for e2e messages");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 418 - 421, The receive loop using while received_messages.len() < messages_to_send.len() can hang indefinitely if mem_dest_consumer.receive_batch(...) never returns messages; wrap the receive logic in a timeout and fail the test (or break) when the overall wait exceeds a safe threshold. Concretely, use a timeout mechanism (e.g., tokio::time::timeout) around mem_dest_consumer.receive_batch(...) or around each await, track elapsed time and return an error or panic with a descriptive message when the cumulative wait passes a configured deadline; update the loop that references received_messages, messages_to_send, and mem_dest_consumer.receive_batch to use that timeout and surface a failure instead of hanging.
33-61:⚠️ Potential issue | 🟠 MajorTLS configuration from
GrpcConfigis still not applied to either constructor.Both
GrpcConsumer::newandGrpcPublisher::newconstruct a plainEndpointand connect without inspecting any TLS fields. Iftls.requiredistruein the config, the client will silently use plaintext, violating the caller's intent.🔒 Proposed skeleton for TLS wiring
use tonic::transport::Channel; +use tonic::transport::{Certificate, ClientTlsConfig, Identity};pub async fn new(config: &GrpcConfig) -> Result<Self> { let mut endpoint = tonic::transport::Endpoint::from_shared(config.url.clone())?; if let Some(timeout) = config.timeout_ms { endpoint = endpoint.connect_timeout(Duration::from_millis(timeout)); } + if let Some(tls) = &config.tls { + if tls.required { + let mut tls_config = ClientTlsConfig::new(); + if let Some(ca_file) = &tls.ca_file { + let ca_pem = std::fs::read(ca_file)?; + tls_config = tls_config.ca_certificate(Certificate::from_pem(ca_pem)); + } + if let (Some(cert_file), Some(key_file)) = (&tls.cert_file, &tls.key_file) { + let cert = std::fs::read(cert_file)?; + let key = std::fs::read(key_file)?; + tls_config = tls_config.identity(Identity::from_pem(cert, key)); + } + endpoint = endpoint.tls_config(tls_config)?; + } + } let channel = endpoint.connect().await?;Apply the same pattern in
GrpcPublisher::new.Also applies to: 138-149
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 33 - 61, The constructor is ignoring TLS settings in GrpcConfig so the Endpoint is always plaintext; update GrpcConsumer::new (the pub async fn new(config: &GrpcConfig) -> Result<Self>) to build tonic::transport::ClientTlsConfig when config.tls.required is true (use tls.ca_cert / tls.client_cert / tls.client_key / tls.domain_name as available), apply it via endpoint.tls_config(...) before calling endpoint.connect(), and fall back to plaintext when not required; mirror the exact same TLS wiring in GrpcPublisher::new so both constructors honor GrpcConfig.tls.
275-275:⚠️ Potential issue | 🟡 MinorHardcoded ports
50051/50052are still present — port-collision flakiness in CI.Both tests bind to fixed IPv6 loopback ports. If these ports are in use (parallel test runs, CI environment), the server
serve()call will fail, aborting the test. Bind to port0and read back the assigned address instead.🔧 Proposed fix (apply similarly to port 50052 in test_grpc_route_end_to_end)
- let addr = "[::1]:50051".parse().unwrap(); + let listener = tokio::net::TcpListener::bind("[::1]:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); ... let server_handle = tokio::spawn(async move { Server::builder() - .serve(addr, BridgeServer::new(bridge)) + .serve_with_incoming( + tokio_stream::wrappers::TcpListenerStream::new(listener), + BridgeServer::new(bridge), + ) .await .unwrap(); });Also applies to: 353-353
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` at line 275, Replace the hardcoded "[::1]:50051" (and similarly "[::1]:50052" in test_grpc_route_end_to_end) with a bind-to-port-0 pattern: create a TcpListener bound to "[::1]:0" (or SocketAddr with port 0), call listener.local_addr() to read the assigned port, and use that address for the server serve() call (and for any client dial). Update the code that constructs addr (the addr variable) and the server start logic to accept the listener/local_addr instead of a fixed port to avoid CI port collisions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/endpoints/grpc.rs`:
- Around line 87-97: The parsing logic for message_id mis-parses plain decimal
strings as hex; update the branch that attempts hex parsing so it only runs when
the input actually has a hex prefix (e.g., check msg.id starts_with "0x" / "0X"
before calling u128::from_str_radix on msg.id.trim_start_matches("0x")). Keep
the existing Uuid::parse_str(&msg.id) branch first, then conditionally parse hex
only when a "0x"/"0X" prefix is present, and finally fall back to
msg.id.parse::<u128>().ok() so pure decimal strings are interpreted as decimal;
reference the message_id variable and the Uuid::parse_str/ u128::from_str_radix
branches in your change.
In `@src/endpoints/sled.rs`:
- Around line 385-403: The receive_batch function currently blocks and returns
one message when called with max_messages == 0; add an early guard at the start
of receive_batch (before allocating vectors and before calling self.receive) to
immediately return an empty ReceivedBatch (and no error) when max_messages is 0
to avoid blocking/over-delivery; update any return type usage of
ReceivedBatch/ConsumerError accordingly so callers receive an empty batch
instead of a single-message batch.
In `@src/traits.rs`:
- Around line 84-88: The AsyncHandler trait currently uses RPITIT (impl Trait in
return position) in AsyncHandler::handle which requires Rust ≥1.75; either raise
the crate MSRV to 1.75 or avoid RPITIT by changing the signature to return a
boxed future (e.g., use BoxFuture<Result<Handled, HandlerError>> from futures or
Pin<Box<dyn Future<Output = Result<Handled, HandlerError>> + Send>>) and import
the appropriate Future/BoxFuture types; update AsyncHandler::handle, add
necessary use statements and Cargo/MSRV if you choose the MSRV path, or gate the
AsyncHandler trait behind a feature to keep MSRV 1.70.
---
Duplicate comments:
In `@src/endpoints/grpc.rs`:
- Around line 418-421: The receive loop using while received_messages.len() <
messages_to_send.len() can hang indefinitely if
mem_dest_consumer.receive_batch(...) never returns messages; wrap the receive
logic in a timeout and fail the test (or break) when the overall wait exceeds a
safe threshold. Concretely, use a timeout mechanism (e.g., tokio::time::timeout)
around mem_dest_consumer.receive_batch(...) or around each await, track elapsed
time and return an error or panic with a descriptive message when the cumulative
wait passes a configured deadline; update the loop that references
received_messages, messages_to_send, and mem_dest_consumer.receive_batch to use
that timeout and surface a failure instead of hanging.
- Around line 33-61: The constructor is ignoring TLS settings in GrpcConfig so
the Endpoint is always plaintext; update GrpcConsumer::new (the pub async fn
new(config: &GrpcConfig) -> Result<Self>) to build
tonic::transport::ClientTlsConfig when config.tls.required is true (use
tls.ca_cert / tls.client_cert / tls.client_key / tls.domain_name as available),
apply it via endpoint.tls_config(...) before calling endpoint.connect(), and
fall back to plaintext when not required; mirror the exact same TLS wiring in
GrpcPublisher::new so both constructors honor GrpcConfig.tls.
- Line 275: Replace the hardcoded "[::1]:50051" (and similarly "[::1]:50052" in
test_grpc_route_end_to_end) with a bind-to-port-0 pattern: create a TcpListener
bound to "[::1]:0" (or SocketAddr with port 0), call listener.local_addr() to
read the assigned port, and use that address for the server serve() call (and
for any client dial). Update the code that constructs addr (the addr variable)
and the server start logic to accept the listener/local_addr instead of a fixed
port to avoid CI port collisions.
In `@src/endpoints/sled.rs`:
- Around line 235-245: SledConsumer::new currently opens both the main tree and
the inflight_tree but does not recover leftover inflight entries; update
SledConsumer::new to, when config.delete_after_read is true, iterate over all
key/value pairs in inflight_tree on startup and requeue them back into the main
tree (or restore the original state expected by dequeue), then remove those
entries from inflight_tree; perform the move in a single sled transaction or use
a batch to ensure atomicity and avoid duplicates, and reference the existing
symbols tree, inflight_tree, get_or_open_db, and delete_after_read when locating
where to add this startup recovery logic.
| let message_id = if msg.id.is_empty() { | ||
| None | ||
| } else if let Ok(uuid) = Uuid::parse_str(&msg.id) { | ||
| Some(uuid.as_u128()) | ||
| } else if let Ok(n) = | ||
| u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) | ||
| { | ||
| Some(n) | ||
| } else { | ||
| msg.id.parse::<u128>().ok() | ||
| }; |
There was a problem hiding this comment.
Decimal ID strings are silently misinterpreted as hex.
u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) matches any string whose characters are all 0-9 or a-f. A purely decimal ID like "12345" parses successfully as hex 0x12345 = 74565, so the decimal fallback at line 96 is unreachable for any numeric-only string.
If the intent is to handle 0x-prefixed hex and decimal as fallback, key the hex branch on the 0x prefix:
🐛 Proposed fix
- } else if let Ok(n) =
- u128::from_str_radix(msg.id.trim_start_matches("0x"), 16)
- {
- Some(n)
- } else {
- msg.id.parse::<u128>().ok()
- };
+ } else if msg.id.starts_with("0x") || msg.id.starts_with("0X") {
+ u128::from_str_radix(&msg.id[2..], 16).ok()
+ } else {
+ msg.id.parse::<u128>().ok()
+ };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let message_id = if msg.id.is_empty() { | |
| None | |
| } else if let Ok(uuid) = Uuid::parse_str(&msg.id) { | |
| Some(uuid.as_u128()) | |
| } else if let Ok(n) = | |
| u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) | |
| { | |
| Some(n) | |
| } else { | |
| msg.id.parse::<u128>().ok() | |
| }; | |
| let message_id = if msg.id.is_empty() { | |
| None | |
| } else if let Ok(uuid) = Uuid::parse_str(&msg.id) { | |
| Some(uuid.as_u128()) | |
| } else if msg.id.starts_with("0x") || msg.id.starts_with("0X") { | |
| u128::from_str_radix(&msg.id[2..], 16).ok() | |
| } else { | |
| msg.id.parse::<u128>().ok() | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/endpoints/grpc.rs` around lines 87 - 97, The parsing logic for message_id
mis-parses plain decimal strings as hex; update the branch that attempts hex
parsing so it only runs when the input actually has a hex prefix (e.g., check
msg.id starts_with "0x" / "0X" before calling u128::from_str_radix on
msg.id.trim_start_matches("0x")). Keep the existing Uuid::parse_str(&msg.id)
branch first, then conditionally parse hex only when a "0x"/"0X" prefix is
present, and finally fall back to msg.id.parse::<u128>().ok() so pure decimal
strings are interpreted as decimal; reference the message_id variable and the
Uuid::parse_str/ u128::from_str_radix branches in your change.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
Cargo.toml (1)
18-22:⚠️ Potential issue | 🟡 MinorMSRV note still references 1.70 after rust-version bump.
The comment should align withrust-version = "1.75"to avoid confusion.✏️ Suggested update
-# The "http" feature (actix-web) and "aws" feature (aws-sdk-*) may require a newer Rust version than the crate MSRV (1.70). +# The "http" feature (actix-web) and "aws" feature (aws-sdk-*) may require a newer Rust version than the crate MSRV (1.75).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` around lines 18 - 22, The MSRV note currently mentions "1.70" which is inconsistent with the crate's rust-version = "1.75"; update the comment text to reference Rust 1.75 (e.g., change "1.70" to "1.75" and any related phrasing) so the note about the "http" and "aws" features aligns with the declared rust-version; look for the comment block containing the MSRV note in Cargo.toml and update it accordingly.
♻️ Duplicate comments (6)
src/models.rs (1)
691-705:⚠️ Potential issue | 🟠 MajorBack-compat gap: legacy
subscribe_modeignored whenmodemissing.
Current deserialization insertsmode=consumewithout honoring older configs. This can change behavior silently.🛠️ Suggested compatibility mapping
if let serde_json::Value::Object(ref mut map) = extra { if !map.contains_key("mode") { - map.insert( - "mode".to_string(), - serde_json::Value::String("consume".to_string()), - ); + let subscribe_mode = map.get("subscribe_mode").and_then(|v| v.as_bool()); + let delete = map.get("delete").and_then(|v| v.as_bool()).unwrap_or(false); + if let Some(subscribe) = subscribe_mode { + let mode = if subscribe { "subscribe" } else { "consume" }; + map.insert("mode".to_string(), serde_json::Value::String(mode.to_string())); + map.insert("delete".to_string(), serde_json::Value::Bool(delete)); + } else { + map.insert( + "mode".to_string(), + serde_json::Value::String("consume".to_string()), + ); + } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/models.rs` around lines 691 - 705, The deserializer currently injects mode = "consume" into the serde_json::Value `extra` before creating `mode: FileConsumerMode`, which ignores legacy `subscribe_mode`; update the logic around `extra` (the block handling serde_json::Value::Object(ref mut map)) to detect an existing legacy key (e.g., "subscribe_mode") and, if present and "mode" is absent, map/copy its value into "mode" (or translate its values if formats differ) before inserting the default, so `FileConfig`'s `mode` honors older configs when present.Cargo.toml (1)
129-130:⚠️ Potential issue | 🟡 Minorgrpc feature note mentions protobuf-src, but grpc uses protoc-bin-vendored.
Please update the comment to reflect the current dependency to keep docs accurate.✏️ Suggested update
-# Note: The grpc feature requires protobuf-src which can significantly increase build times. +# Note: The grpc feature pulls in vendored protoc (protoc-bin-vendored), which can increase build times.#!/bin/bash # Verify grpc feature dependencies referenced in comments rg -n "grpc =|protobuf-src|protoc-bin-vendored" Cargo.toml🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Cargo.toml` around lines 129 - 130, Update the comment above the grpc feature to accurately reflect that the feature uses protoc-bin-vendored rather than protobuf-src: find the grpc feature entry (the line containing grpc = ["tonic", "prost", "tokio-stream", "tonic-prost", "tonic-prost-build", "protoc-bin-vendored"]) and replace the note that mentions protobuf-src with one that references protoc-bin-vendored (and optionally notes it may increase build time), so the Cargo.toml comment matches the actual dependency.src/endpoints/grpc.rs (4)
283-299:⚠️ Potential issue | 🟡 MinorgRPC tests bind to fixed ports (50051/50052).
These can collide in CI or parallel runs. Consider binding to port 0 and using the assigned port via aTcpListener.tonic Server::serve_with_incoming TcpListener port 0Also applies to: 361-374
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 283 - 299, The tests (e.g., test_grpc_publisher_and_consumer) are binding to fixed ports (50051/50052) which can collide; change the server startup to bind a TcpListener on port 0, retrieve its local_addr to construct the client target, and pass the listener into tonic's Server::serve_with_incoming (instead of Server::serve(addr,...)) so the OS assigns an available port; update the addr/client connect code to use the listener.local_addr() and apply the same TcpListener + serve_with_incoming approach to the other test(s) around lines 361-374.
33-59:⚠️ Potential issue | 🟠 MajorGrpcConfig.tls is ignored; TLS-required configs always error.
TLS fields are accepted but never applied. If TLS is intended, wireClientTlsConfigand enforce TLS URLs; otherwise consider removing/flagging these config fields to avoid silent misconfiguration.tonic 0.14 ClientTlsConfig Endpoint::tls_config exampleAlso applies to: 143-154
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 33 - 59, The GrpcConfig.tls fields are parsed but never applied in GrpcEndpoint::new (the new() function), causing TLS-enabled configs to always error; update new() to build and attach a tonic::transport::ClientTlsConfig to the Endpoint when GrpcConfig.tls.required or GrpcConfig.tls.settings are present (use Endpoint::tls_config with ClientTlsConfig::new(), set domain/ca/cert as provided), then call endpoint.connect().await to create the channel and pass it to BridgeClient::new as before; additionally validate the config URL scheme when tls.required (or error clearly) so TLS-required configs are enforced rather than ignored before calling client.subscribe(Request::new(SubscribeRequest{...})).
92-101:⚠️ Potential issue | 🟡 MinorDecimal IDs are parsed as hex when no 0x prefix.
Pure numeric IDs like"12345"are treated as hex; the decimal fallback never triggers.🐛 Suggested fix
- } else if let Ok(n) = - u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) - { - Some(n) - } else { - msg.id.parse::<u128>().ok() - }; + } else if msg.id.starts_with("0x") || msg.id.starts_with("0X") { + u128::from_str_radix(&msg.id[2..], 16).ok() + } else { + msg.id.parse::<u128>().ok() + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 92 - 101, The current message_id parsing treats pure numeric strings like "12345" as hex because u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) runs even when there is no "0x" prefix; update the logic in the message_id block (the branch that uses Uuid::parse_str, u128::from_str_radix and msg.id.parse::<u128>()) so that after checking UUID you only attempt hex parsing when the string has a "0x" prefix (e.g., check msg.id.starts_with("0x") before calling u128::from_str_radix) and otherwise fall back directly to decimal parsing via msg.id.parse::<u128>().ok().
429-433:⚠️ Potential issue | 🟡 MinorE2E receive loop can hang without a timeout.
Wrap the loop to fail fast on missing messages.tokio time::timeout usage example🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/endpoints/grpc.rs` around lines 429 - 433, The receive loop using received_messages and mem_dest_consumer.receive_batch(messages) can hang indefinitely; wrap the await in a timeout (use tokio::time::timeout) so each receive_batch call fails fast and the test/endpoint returns an error when messages are missing. Replace the direct await on mem_dest_consumer.receive_batch(5).await.unwrap() with a tokio::time::timeout(...) call (or wrap the whole loop in a timeout) and handle the timeout error path by bubbling up or failing the test, referencing received_messages, mem_dest_consumer.receive_batch, and messages_to_send so missing messages cause a deterministic failure instead of hanging.
🧹 Nitpick comments (1)
tests/integration/mongodb.rs (1)
63-65: Consider making the docker-dependent test opt-in (not silently skipped).
#[ignore]removes default coverage. If the intent is to run in CI only when Docker is available, consider gating with a feature/env switch and ensure a CI job runs it so regressions don’t slip by.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/mongodb.rs` around lines 63 - 65, The test annotated with #[tokio::test] named test_mongodb_subscriber_no_duplicates is currently marked #[ignore] which hides it by default; change this to an opt-in pattern so the test runs only when explicitly enabled (e.g., replace the unconditional #[ignore] with a conditional attribute such as #[cfg_attr(not(feature = "docker_integration"), ignore)] or gate the test with #[cfg(feature = "docker_integration')] so it is included only when the feature/env is enabled) and update CI to run the suite with that feature/environment enabled to ensure Docker-dependent tests are executed in CI.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@Cargo.toml`:
- Around line 18-22: The MSRV note currently mentions "1.70" which is
inconsistent with the crate's rust-version = "1.75"; update the comment text to
reference Rust 1.75 (e.g., change "1.70" to "1.75" and any related phrasing) so
the note about the "http" and "aws" features aligns with the declared
rust-version; look for the comment block containing the MSRV note in Cargo.toml
and update it accordingly.
---
Duplicate comments:
In `@Cargo.toml`:
- Around line 129-130: Update the comment above the grpc feature to accurately
reflect that the feature uses protoc-bin-vendored rather than protobuf-src: find
the grpc feature entry (the line containing grpc = ["tonic", "prost",
"tokio-stream", "tonic-prost", "tonic-prost-build", "protoc-bin-vendored"]) and
replace the note that mentions protobuf-src with one that references
protoc-bin-vendored (and optionally notes it may increase build time), so the
Cargo.toml comment matches the actual dependency.
In `@src/endpoints/grpc.rs`:
- Around line 283-299: The tests (e.g., test_grpc_publisher_and_consumer) are
binding to fixed ports (50051/50052) which can collide; change the server
startup to bind a TcpListener on port 0, retrieve its local_addr to construct
the client target, and pass the listener into tonic's
Server::serve_with_incoming (instead of Server::serve(addr,...)) so the OS
assigns an available port; update the addr/client connect code to use the
listener.local_addr() and apply the same TcpListener + serve_with_incoming
approach to the other test(s) around lines 361-374.
- Around line 33-59: The GrpcConfig.tls fields are parsed but never applied in
GrpcEndpoint::new (the new() function), causing TLS-enabled configs to always
error; update new() to build and attach a tonic::transport::ClientTlsConfig to
the Endpoint when GrpcConfig.tls.required or GrpcConfig.tls.settings are present
(use Endpoint::tls_config with ClientTlsConfig::new(), set domain/ca/cert as
provided), then call endpoint.connect().await to create the channel and pass it
to BridgeClient::new as before; additionally validate the config URL scheme when
tls.required (or error clearly) so TLS-required configs are enforced rather than
ignored before calling client.subscribe(Request::new(SubscribeRequest{...})).
- Around line 92-101: The current message_id parsing treats pure numeric strings
like "12345" as hex because
u128::from_str_radix(msg.id.trim_start_matches("0x"), 16) runs even when there
is no "0x" prefix; update the logic in the message_id block (the branch that
uses Uuid::parse_str, u128::from_str_radix and msg.id.parse::<u128>()) so that
after checking UUID you only attempt hex parsing when the string has a "0x"
prefix (e.g., check msg.id.starts_with("0x") before calling
u128::from_str_radix) and otherwise fall back directly to decimal parsing via
msg.id.parse::<u128>().ok().
- Around line 429-433: The receive loop using received_messages and
mem_dest_consumer.receive_batch(messages) can hang indefinitely; wrap the await
in a timeout (use tokio::time::timeout) so each receive_batch call fails fast
and the test/endpoint returns an error when messages are missing. Replace the
direct await on mem_dest_consumer.receive_batch(5).await.unwrap() with a
tokio::time::timeout(...) call (or wrap the whole loop in a timeout) and handle
the timeout error path by bubbling up or failing the test, referencing
received_messages, mem_dest_consumer.receive_batch, and messages_to_send so
missing messages cause a deterministic failure instead of hanging.
In `@src/models.rs`:
- Around line 691-705: The deserializer currently injects mode = "consume" into
the serde_json::Value `extra` before creating `mode: FileConsumerMode`, which
ignores legacy `subscribe_mode`; update the logic around `extra` (the block
handling serde_json::Value::Object(ref mut map)) to detect an existing legacy
key (e.g., "subscribe_mode") and, if present and "mode" is absent, map/copy its
value into "mode" (or translate its values if formats differ) before inserting
the default, so `FileConfig`'s `mode` honors older configs when present.
---
Nitpick comments:
In `@tests/integration/mongodb.rs`:
- Around line 63-65: The test annotated with #[tokio::test] named
test_mongodb_subscriber_no_duplicates is currently marked #[ignore] which hides
it by default; change this to an opt-in pattern so the test runs only when
explicitly enabled (e.g., replace the unconditional #[ignore] with a conditional
attribute such as #[cfg_attr(not(feature = "docker_integration"), ignore)] or
gate the test with #[cfg(feature = "docker_integration')] so it is included only
when the feature/env is enabled) and update CI to run the suite with that
feature/environment enabled to ensure Docker-dependent tests are executed in CI.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
Cargo.tomlsrc/endpoints/grpc.rssrc/endpoints/sled.rssrc/models.rssrc/traits.rstests/integration/mongodb.rs
Description
Type of Change
Testing
Checklist
Related Issues
Fixes #
Related to #
Additional Notes
Summary by CodeRabbit
New Features
Chores
Tests