Skip to content

Conversation

@datagutt
Copy link
Member

@datagutt datagutt commented Sep 1, 2025

Summary by CodeRabbit

  • New Features

    • Deferred listener bind after uplink validation, 1ms housekeeping timer, queued uplink reloads (Unix SIGHUP), periodic connection status reporting, and safer per-packet send tracking.
  • Bug Fixes

    • Stronger flow control and NAK/ACK forwarding, refined timeout/recovery semantics, reduced noisy RTT/logging, and larger default socket buffers.
  • Refactor

    • Expanded per-connection state with sequence tracking and stickiness; recovery-only handling replaces disruptive reconnects.
  • Tests

    • Added zero-sequence test and updated tests for optional receive timestamps, NAK handling, and recovery mode.
  • Chores / Docs / CI

    • .gitignore updated; README and CI workflows now require/document Rust nightly.

- Fix sequence number validation to allow sequence 0, matching C
implementation
- Forward NAK packets from SRTLA receiver back to SRT client
- Increase receive buffer size to 100MB for better large packet handling
- Update tests to reflect sequence 0 validation changes

Fixes issues with packet tracking and NAK attribution that were causing
sharp bitrate drops not present in the original C implementation.
@coderabbitai
Copy link

coderabbitai bot commented Sep 1, 2025

Warning

Rate limit exceeded

@datagutt has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 10 minutes and 2 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between e592244 and f6cf1aa.

📒 Files selected for processing (5)
  • .github/workflows/ci.yml (1 hunks)
  • src/connection.rs (13 hunks)
  • src/sender.rs (16 hunks)
  • src/tests/connection_tests.rs (3 hunks)
  • src/tests/sender_tests.rs (9 hunks)

Walkthrough

Switches SrtlaConnection.last_received to Option, revises timeout/recovery and ACK/NAK accounting, tightens flow-control scoring and window growth, adds packet-log-wrap diagnostics and socket buffer tuning; introduces sender housekeeping, pending-connection changes, sequence tracking and SIGHUP reload; tests, CI, and docs updated.

Changes

Cohort / File(s) Summary
Connection timing, ACK/NAK, flow control & recovery
src/connection.rs
last_received: InstantOption<Instant>; set to None on init/recovery and Some(now) on receive/reconnect; is_timed_out() treats None as timed out; get_score() enforces max_in_flight gating (may return 0); ACK handling recomputes unacked counts from packet_log; NAKs forwarded to SRT client path; mark_disconnected() renamed to mark_for_recovery() and resets window/in_flight/packet_log; packet_log-wrap diagnostics; RTT and major-window logs gated by thresholds.
Socket tuning
src/connection.rs (bind_from_ip)
Set socket send buffer to 32MB and receive buffer to 100MB; emit warnings and report actual sizes on failures.
Send API & tracking
src/connection.rs
Added send_data_with_tracking(&mut self, data: &[u8], seq: Option<u32>) -> Result<()>; sending failures now integrated with recovery flow; packet registration and tracking updated.
Sender loop, housekeeping, seq-tracking & API
src/sender.rs
Defer UDP bind until uplinks validated; add ~1ms housekeeping tick and status_counter; introduce PendingConnectionChanges and Unix SIGHUP enqueue; add seq_to_conn mapping and seq_order (VecDeque) for NAK attribution; add last_selected_idx, last_switch_time, all_failed_at; on send failures mark connection for recovery (REG2-like) instead of disruptive reconnect; extend apply_connection_changes to accept seq_order; add log_connection_status(...) and GLOBAL_TIMEOUT_MS.
Test helpers
src/test_helpers.rs
Fixtures updated for Option<Instant>: create_test_connection uses Some(Instant::now()); create_test_connections uses None where appropriate.
Connection tests
src/tests/connection_tests.rs
Tests updated to set last_received as Some(tokio::time::Instant::now() - …); added test_nak_handling_with_logged_packet and test_connection_recovery_mode; some mark_disconnected() usages replaced/adapted to new API/state.
Protocol tests
src/tests/protocol_tests.rs
Added test verifying buffer [0x00,0x00,0x00,0x00] yields Some(0x0000) for sequence extraction.
Sender tests
src/tests/sender_tests.rs
Import VecDeque, make connections mutable, initialize seq_order (VecDeque), pass it to apply_connection_changes, and assert seq_order.len() == seq_to_conn.len(); updated select_connection_idx calls to pass an additional Instant parameter.
Repo & CI docs
.gitignore, .github/workflows/*, README.md
.gitignore adds *.c, *.h, bond-bunny-main/; README updated to require Rust nightly and nightly commands; workflows updated to request nightly toolchain (note: one workflow contains duplicated with: entries causing malformed YAML).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Main as sender::main_loop
  participant Pending as PendingConnectionChanges
  participant Udp as UDP Listener
  participant Conns as SrtlaConnection[]

  rect rgb(250,250,255)
    Note over Main: Startup — read uplink IPs, create uplinks
    Main->>Main: create connections
    Main->>Udp: bind UDP listener (deferred until uplinks exist)
  end

  rect rgb(245,255,245)
    Note over Main: ~1ms housekeeping tick + main processing loop
    loop every ~1ms
      Main->>Main: housekeeping_tick()
      alt pending changes exist
        Main->>Pending: pop changes
        Main->>Conns: apply_connection_changes(seq_order)
      end
      Main->>Udp: recv SRT packets -> handle_srt_packet()
      loop send path
        Main->>Conns: send_data_with_tracking()
        alt send error
          Main->>Conns: mark_for_recovery()
        end
      end
    end
  end

  OS-->>Main: SIGHUP
  Main->>Pending: enqueue reload (applied on next tick)
Loading
sequenceDiagram
  autonumber
  participant Peer as Remote
  participant Conn as SrtlaConnection
  participant Timer as last_received (Option)
  participant Flow as Window/Score

  Peer-->>Conn: DATA / ACK / NAK
  Conn->>Timer: set last_received = Some(now)
  alt ACK received
    Conn->>Conn: recompute acked_count from packet_log
    Conn->>Conn: in_flight_packets = in_flight.saturating_sub(acked_count)
    alt eligible for growth (connected && last_received.is_some())
      Conn->>Flow: increase window (WINDOW_INCR adjusted)
    end
  else NAK received
    Conn->>Conn: forward NAK to SRT client path
  end

  Conn->>Flow: get_score()
  Flow->>Conn: if in_flight >= max_in_flight -> score = 0 else score = window/(in_flight+1)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Fix bitrate drops? #2 — Direct overlap on src/connection.rs changes (last_received → Option, ACK/NAK and window logic) and src/sender.rs sequencing/state changes; likely highly related.
  • Test suite #1 — Overlaps on connection/test scaffolding and visibility changes for testing; likely related.

Poem

I’m a rabbit with a tiny clock,
I made last_received optional — what a shock.
I hop through housekeeping, milliseconds bright,
SIGHUP nudges uplinks into the night.
ACKs, NAKs, windows dance — hop, hop, delight! 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/housekeeping

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/sender.rs (3)

76-89: Blocking std::sync::mpsc::Receiver::recv inside tokio task will block the runtime.

Switch to non-blocking try_recv loop (or tokio::mpsc). Minimal change below avoids blocking.

Apply:

-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
-                let client_addr = {
-                    match shared_client_addr_clone.lock() {
-                        Ok(addr_guard) => *addr_guard,
-                        _ => None,
-                    }
-                };
-                if let Some(client) = client_addr {
-                    let _ = local_listener_clone.send_to(&ack_packet, client).await;
-                }
-            }
-        });
+        tokio::spawn(async move {
+            loop {
+                match instant_rx.try_recv() {
+                    Ok(ack_packet) => {
+                        let client_addr = match shared_client_addr_clone.lock() {
+                            Ok(addr_guard) => *addr_guard,
+                            _ => None,
+                        };
+                        if let Some(client) = client_addr {
+                            let _ = local_listener_clone.send_to(&ack_packet, client).await;
+                        }
+                    }
+                    Err(std::sync::mpsc::TryRecvError::Empty) => {
+                        tokio::task::yield_now().await;
+                        tokio::time::sleep(Duration::from_millis(1)).await;
+                    }
+                    Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
+                }
+            }
+        });

502-502: Exploration window condition is broken; always near-true.

Instant::now().elapsed() is ~0; modulo check makes exploration almost always on. Use a monotonic wall-clock (e.g., now_ms()).

Apply:

+use crate::utils::now_ms;
@@
-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore && (now_ms() % 5000) < 300;

592-603: Sequence→connection map becomes invalid after removals; clear it.

Retaining entries by conn_idx < connections.len() still points to shifted indices. Without reverse mapping, safest is to clear.

Apply:

-        // Clean up sequence tracking for removed connections
-        seq_to_conn.retain(|_, &mut conn_idx| conn_idx < connections.len());
-
-        // Rebuild sequence tracking with correct indices
-        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
-        for (seq, &old_idx) in seq_to_conn.iter() {
-            if old_idx < connections.len() {
-                new_seq_to_conn.insert(*seq, old_idx);
-            }
-        }
-        *seq_to_conn = new_seq_to_conn;
+        // Sequence indices are no longer reliable after removals; clear mapping
+        seq_to_conn.clear();
🧹 Nitpick comments (3)
src/connection.rs (1)

312-333: ACK accounting adds cumulative handling; consider seq wrap-around.

The O(PKT_LOG_SIZE) backward scan is fine, but val > 0 && val < ack will break near sequence wrap. If SRT seqs can wrap in your workload, implement modular comparison (e.g., seq_lt/seq_le with half-range rule) to avoid mis-acking.

Would you like a small helper (seq_lt/seq_le) patch to make these comparisons wrap-safe?

src/tests/protocol_tests.rs (1)

31-34: Add max-valid boundary case (0x7FFF_FFFF) for completeness

Consider asserting the highest data SN right below the control-bit threshold.

         assert_eq!(get_srt_sequence_number(&buf), Some(0x0000));
 
+        // Max valid sequence number (control bit still clear)
+        let buf = [0x7F, 0xFF, 0xFF, 0xFF];
+        assert_eq!(get_srt_sequence_number(&buf), Some(0x7FFF_FFFF));
+
src/tests/connection_tests.rs (1)

221-221: Consider explicitly testing None semantics for last_received

If None has special timeout behavior (unknown/unset), add an assertion to lock that in (e.g., timed-out vs not). This prevents regressions if the policy changes.

Would you like me to draft a small test addition once you confirm the intended None behavior?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 06c9c76 and ba06244.

📒 Files selected for processing (5)
  • src/connection.rs (11 hunks)
  • src/sender.rs (5 hunks)
  • src/test_helpers.rs (2 hunks)
  • src/tests/connection_tests.rs (1 hunks)
  • src/tests/protocol_tests.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/tests/protocol_tests.rs (1)
src/protocol.rs (1)
  • get_srt_sequence_number (49-59)
src/test_helpers.rs (1)
src/tests/sender_tests.rs (1)
  • test_select_connection_idx_stickiness (32-43)
src/tests/connection_tests.rs (1)
src/tests/sender_tests.rs (1)
  • test_select_connection_idx_stickiness (32-43)
src/sender.rs (1)
src/tests/sender_tests.rs (2)
  • test_apply_connection_changes_remove_stale (141-174)
  • tests (2-327)
🪛 GitHub Check: Test
src/connection.rs

[warning] 468-468:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 468-468:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test (Windows)
🔇 Additional comments (18)
src/connection.rs (8)

63-66: Option-izing last_received looks good.

Clearer semantics for “unset/unknown.” Callers appear updated in tests.


150-150: Initialize last_received to None on connect.

Matches new timeout semantics and reconnection flow.


229-229: Timestamp on inbound receive is correct.

Keeps timeout logic simple and enables window-growth gating.


260-262: Forwarding NAKs to the SRT client is aligned with the C implementation.

Good parity improvement.


467-472: Specific SRTLA ACK recovery mirrors C (+WINDOW_INCR-1) with cap.

Looks correct.


487-505: Global SRTLA ACK window +1 gated on having received data.

Reasonable guard; prevents growth before any inbound.


611-614: Timeout logic with Option is clear and conservative.

None → timed out makes reconnection behavior explicit.


686-686: Reset last_received to None on reconnect.

Consistent with new semantics.

src/sender.rs (6)

58-62: Bind SRT UDP listener after uplinks validation.

Sensibly orders setup; good.


135-151: Per-iteration housekeeping mirrors C behavior.

Reasonable approach; scoped error handling is fine.


157-166: Queued changes applied on a lightweight tick.

Looks good; avoids mid-iteration mutation hazards.


183-199: Non-Unix loop parity maintained.

Consistent structure; good.


205-214: Apply pending changes on non-Unix tick as well.

Keeps behavior consistent cross-platform.


284-290: Disable connection on send failure.

Good fail-fast behavior that prevents repeated errors.

src/test_helpers.rs (2)

29-29: Set last_received=Some(...) for single test connection.

Matches new Option API; avoids immediate timeout for tests that expect “connected.”


69-69: Initialize last_received=None for multi-conn tests.

Good for exercising timeout/first-traffic paths. If any existing test assumes “active” immediately, set Some(Instant::now()) in that specific test.

src/tests/protocol_tests.rs (1)

31-34: Good edge-case addition for SN=0

Covers the lower-bound data-packet SN with control bit clear. Looks correct.

src/tests/connection_tests.rs (1)

221-221: Option migration handled correctly in timeout test

Using Some(Instant::now() - Duration::from_secs(CONN_TIMEOUT + 1)) aligns with the new Option<Instant> semantics and should deterministically trip the timeout.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/connection.rs (2)

173-177: Clamp max_in_flight to at least 1 to avoid hard stall when window < WINDOW_MULT

If window/WINDOW_MULT is 0, get_score will return 0 forever. Clamp to 1.

Apply:

-        let max_in_flight = self.window / WINDOW_MULT;
+        let max_in_flight = (self.window / WINDOW_MULT).max(1);

721-729: Best-effort socket buffer tuning — don’t fail connect when OS caps/denies

Current .context(...)? will abort on permission/cap limits. Warn and continue; optionally log effective sizes.

Apply:

-    // Set send buffer size (32MB)
-    const SEND_BUF_SIZE: usize = 32 * 1024 * 1024;
-    sock.set_send_buffer_size(SEND_BUF_SIZE)
-        .context("set send buffer size")?;
+    // Best-effort: set send buffer size (32MB)
+    const SEND_BUF_SIZE: usize = 32 * 1024 * 1024;
+    if let Err(e) = sock.set_send_buffer_size(SEND_BUF_SIZE) {
+        warn!("could not set send buffer size to {}: {}", SEND_BUF_SIZE, e);
+    }
@@
-    // Set receive buffer size to handle large SRT packets (100MB)
-    const RECV_BUF_SIZE: usize = 100 * 1024 * 1024;
-    sock.set_recv_buffer_size(RECV_BUF_SIZE)
-        .context("set receive buffer size")?;
+    // Best-effort: set receive buffer size (100MB)
+    const RECV_BUF_SIZE: usize = 100 * 1024 * 1024;
+    if let Err(e) = sock.set_recv_buffer_size(RECV_BUF_SIZE) {
+        warn!("could not set recv buffer size to {}: {}", RECV_BUF_SIZE, e);
+    }
+    if let (Ok(tx), Ok(rx)) = (sock.send_buffer_size(), sock.recv_buffer_size()) {
+        debug!("socket buffers effective: send={}B recv={}B", tx, rx);
+    }
🧹 Nitpick comments (3)
src/connection.rs (3)

260-262: Forwarding NAKs to the SRT client — confirm duplication risk

You now forward NAKs (and ACKs both instantly and batched). Ensure the downstream client won’t double-handle these or regress timing. If both paths hit the same sink, consider forwarding only once or deduping.


312-333: ACK accounting looks correct; add quick exits to cut scan cost

  • Early-return if in_flight_packets == 0 to skip the full PKT_LOG_SIZE scan.
  • Optionally limit the backward scan to min(PKT_LOG_SIZE, in_flight_packets as usize + small_slack).

Low-risk perf win on large PKT_LOG_SIZE.

Apply:

-    pub fn handle_srt_ack(&mut self, ack: i32) {
+    pub fn handle_srt_ack(&mut self, ack: i32) {
+        if self.in_flight_packets == 0 {
+            return;
+        }
         let mut acked_count = 0;

467-479: Inconsistent window increment (WINDOW_INCR - 1 here vs WINDOW_INCR elsewhere)

If intentional (parity with C), add a comment tying constants together; otherwise unify increments to avoid drift between ACK paths.

Apply:

-                    self.window += WINDOW_INCR - 1; // Note: WINDOW_INCR - 1 in C code
+                    // Use a dedicated constant to document intent and avoid silent drift.
+                    const SRTLA_WINDOW_INCR: i32 = WINDOW_INCR - 1;
+                    self.window += SRTLA_WINDOW_INCR;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ba06244 and acbcb98.

📒 Files selected for processing (3)
  • src/connection.rs (11 hunks)
  • src/test_helpers.rs (2 hunks)
  • src/tests/connection_tests.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/test_helpers.rs
  • src/tests/connection_tests.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/connection.rs (1)
src/sender.rs (2)
  • None (67-67)
  • apply_connection_changes (509-568)
🪛 GitHub Check: Test
src/connection.rs

[failure] 615-615:
this map_or can be simplified

🪛 GitHub Actions: CI
src/connection.rs

[error] 615-615: Clippy: this map_or can be simplified. Command: cargo clippy -- -D warnings.

🔇 Additional comments (5)
src/connection.rs (5)

63-66: Good change: last_received as Option

This avoids panics on elapsed() and lets you encode “never received.” Call sites appear updated.


150-150: Init last_received to None on connect

Sane default aligning with the new semantics.


229-229: Touch last_received on any inbound packet

Matches timeout logic and global recovery gating.


488-506: Gate global SRTLA ACK window recovery on last_received.is_some()

Reasonable safeguard to avoid boosting dormant links.


690-690: Reset last_received on reconnect

Good: prevents premature global recovery until new traffic arrives.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/sender.rs (1)

504-505: Exploration toggle is effectively always-on (logic bug)

Instant::now().elapsed() is ~0 at call time, so (… % 5000) < 300 is almost always true. Tie exploration to last_switch (or another persistent epoch).

Apply:

-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore
+        && last_switch
+            .map(|ts| ts.elapsed().as_millis() % 5_000 < 300)
+            .unwrap_or(false);
♻️ Duplicate comments (1)
src/sender.rs (1)

91-126: Fix duplicate state redeclarations to unblock compilation

seq_to_conn, seq_order, last_selected_idx, last_switch_time, all_failed_at, and pending_changes are declared twice in this scope. Keep a single canonical set (the later one) and remove the earlier block with the #[allow(...)] attributes.

Apply:

@@
-    #[allow(unused_variables, unused_mut)]
-    let mut recv_buf = vec![0u8; MTU];
-    #[allow(unused_variables, unused_mut)]
-    let mut housekeeping_timer = time::interval(Duration::from_millis(1));
-    #[allow(unused_variables, unused_mut)]
-    let mut last_client_addr: Option<SocketAddr> = None;
-    // Sequence → connection index mapping for correct NAK attribution
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;
+    let mut recv_buf = vec![0u8; MTU];
+    let mut housekeeping_timer = time::interval(Duration::from_millis(1));
+    let mut last_client_addr: Option<SocketAddr> = None;
🧹 Nitpick comments (4)
src/sender.rs (4)

72-89: Avoid blocking std::sync::mpsc::Receiver::recv() in an async task

recv() blocks a runtime worker thread. Prefer tokio::sync::mpsc and async recv, or move this loop to a dedicated std::thread and use a tokio::sync::mpsc to hand off to the async sender.

I can generate a follow-up patch switching drain_incoming and this path to tokio::sync::mpsc end-to-end. Want me to?


284-290: Optional: try a fallback uplink on send failure to reduce drops

If send_data_with_tracking fails, we disable the connection and drop the packet. Consider reselecting another active connection and retrying once to mitigate bitrate drops.

Apply:

-                    if let Err(e) = conn.send_data_with_tracking(pkt, seq).await {
+                    if let Err(e) = conn.send_data_with_tracking(pkt, seq).await {
                         warn!(
                             "{}: sendto() failed, disabling the connection: {}",
                             conn.label, e
                         );
                         conn.mark_disconnected();
+                        // Best-effort single retry on an alternate uplink
+                        if let Some(alt_idx) = select_connection_idx(
+                            connections,
+                            None,
+                            None,
+                            effective_enable_quality,
+                            false,
+                            classic,
+                        ) {
+                            if alt_idx != sel_idx && alt_idx < connections.len() {
+                                let _ = connections[alt_idx].send_data_with_tracking(pkt, seq).await;
+                            }
+                        }
                     }

552-566: Use async file I/O in async context

read_ip_list uses std::fs::read_to_string in an async fn. Switch to tokio::fs to avoid blocking the runtime thread.

Apply:

-pub async fn read_ip_list(path: &str) -> Result<Vec<IpAddr>> {
-    let text = std::fs::read_to_string(Path::new(path)).context("read IPs file")?;
+pub async fn read_ip_list(path: &str) -> Result<Vec<IpAddr>> {
+    let text = tokio::fs::read_to_string(Path::new(path))
+        .await
+        .context("read IPs file")?;

157-166: Nit: collapse nested if-let when applying pending changes

Minor readability tweak suggested by static analysis: pattern-match PendingConnectionChanges in a single if-let.

Apply:

-                if let Some(changes) = pending_changes.take() {
-                    if let Some(new_ips) = changes.new_ips {
+                if let Some(PendingConnectionChanges { new_ips: Some(new_ips), receiver_host, receiver_port }) = pending_changes.take() {
                         info!("applying queued connection changes: {} IPs", new_ips.len());
-                        apply_connection_changes(&mut connections, &new_ips, &changes.receiver_host, changes.receiver_port, &mut last_selected_idx, &mut seq_to_conn).await;
+                        apply_connection_changes(&mut connections, &new_ips, &receiver_host, receiver_port, &mut last_selected_idx, &mut seq_to_conn, &mut seq_order).await;
                         info!("connection changes applied successfully");
-                    }
-                }
+                }

Also applies to: 205-214

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between acbcb98 and ebb3740.

📒 Files selected for processing (1)
  • src/sender.rs (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/sender.rs (1)
src/connection.rs (3)
  • SrtlaConnection (22-129)
  • SrtlaConnection (131-696)
  • handle_srt_ack (304-376)
🪛 GitHub Check: Test
src/sender.rs

[failure] 159-159:
this if statement can be collapsed

🔇 Additional comments (1)
src/sender.rs (1)

58-62: Listener binding ordering LGTM

Binding after validating and creating uplinks is a sane flow; error context added is helpful.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/sender.rs (2)

65-89: Blocking std::sync::mpsc::Receiver::recv inside tokio task.

This can block a runtime worker thread. Use tokio::sync::mpsc.

Apply:

-    let (instant_tx, instant_rx) = std::sync::mpsc::channel::<Vec<u8>>();
+    let (instant_tx, mut instant_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
@@
-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
+        tokio::spawn(async move {
+            while let Some(ack_packet) = instant_rx.recv().await {
                 let client_addr = {
                     match shared_client_addr_clone.lock() {
                         Ok(addr_guard) => *addr_guard,
                         _ => None,
                     }
                 };
                 if let Some(client) = client_addr {
                     let _ = local_listener_clone.send_to(&ack_packet, client).await;
                 }
             }
         });

520-521: Exploration timing bug: Instant::now().elapsed() ~ 0ms ⇒ always exploring.

Use a stable clock (e.g., now_ms) or track a start Instant.

Apply:

-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    use crate::utils::now_ms;
+    let explore_now = enable_explore && (now_ms() % 5000) < 300;
♻️ Duplicate comments (2)
src/connection.rs (1)

173-179: Window clamp avoids hard stall.

max_in_flight.max(1) addresses the zero-window edge case.

src/sender.rs (1)

100-121: Duplicate variable declarations (compile error).

seq_to_conn, seq_order, last_selected_idx, last_switch_time are declared twice. Remove the earlier copies.

Apply:

-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;
🧹 Nitpick comments (3)
src/connection.rs (1)

716-744: Best-effort socket buffer tuning is right; fix rustfmt nits.

The warn-and-continue approach is correct; CI shows a rustfmt change around here—please run cargo fmt.

src/tests/sender_tests.rs (1)

176-181: Fix rustfmt failure (trailing whitespace/blank lines).

CI flags formatting at/near these lines. Remove extra blank lines/spaces.

Apply:

-        
-
-        // Should have cleaned up seq_order to match seq_to_conn
+        // Should have cleaned up seq_order to match seq_to_conn
         assert_eq!(seq_order.len(), seq_to_conn.len());
src/sender.rs (1)

95-97: 1 ms housekeeping tick may be CPU-heavy.

Consider 5–10 ms or adaptive interval; pending changes don’t need 1 ms granularity.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ebb3740 and a26f76f.

📒 Files selected for processing (3)
  • src/connection.rs (11 hunks)
  • src/sender.rs (8 hunks)
  • src/tests/sender_tests.rs (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/connection.rs (1)
src/tests/connection_tests.rs (3)
  • tests (2-314)
  • test_srt_ack_handling (50-74)
  • test_srtla_ack_handling (107-131)
🪛 GitHub Actions: CI
src/tests/sender_tests.rs

[error] 175-175: Rustfmt: formatting change detected in src/tests/sender_tests.rs (line 175). Run 'cargo fmt' to fix formatting.

src/connection.rs

[error] 732-732: Rustfmt: formatting change detected in src/connection.rs (line 732). Run 'cargo fmt' to fix formatting.

🪛 GitHub Check: Test
src/connection.rs

[warning] 732-732:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 732-732:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs

🔇 Additional comments (5)
src/connection.rs (3)

223-231: Good: update last_received on every inbound read.

This integrates cleanly with timeout/ACK logic.


311-333: ACK handling improvements look correct.

Counting cumulative/exact ACKs and saturating_sub on in_flight_packets is sound.


487-506: Reasonable gating for global SRTLA ACK window bumps.

Requiring connected and last_received.is_some() matches intent.

src/sender.rs (2)

512-516: Stickiness bounds check is solid.

Prevents out-of-range/disabled-target issues.


611-624: Sequence-order cleanup on connection removal looks good.

Retaining only keys present in seq_to_conn prevents stale growth.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/connection.rs (2)

173-177: Nice: clamp max_in_flight to at least 1.

Prevents hard stall when window/WINDOW_MULT is 0.


612-619: Fix: fresh connections read as “timed out” (reconnect thrash risk).

Treating None as timed out will immediately flag just-connected links before first RX. Make None => not timed out (or add a grace period).

Apply:

 pub fn is_timed_out(&self) -> bool {
-        !self.connected
-            || if let Some(lr) = self.last_received {
-                lr.elapsed().as_secs() >= CONN_TIMEOUT
-            } else {
-                true
-            }
+        if !self.connected {
+            return true;
+        }
+        match self.last_received {
+            Some(lr) => lr.elapsed().as_secs() >= CONN_TIMEOUT,
+            None => false, // allow grace until first receive
+        }
 }
🧹 Nitpick comments (1)
src/connection.rs (1)

467-472: Confirm WINDOW_INCR - 1 intent.

The -1 matches your C reference; consider a brief comment with the source line or rationale to avoid future “off-by-one” regressions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a0c6c68 and eca1c22.

📒 Files selected for processing (1)
  • src/connection.rs (11 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/connection.rs (2)
src/sender.rs (2)
  • None (67-67)
  • apply_connection_changes (509-568)
src/tests/connection_tests.rs (3)
  • tests (2-314)
  • test_srt_ack_handling (50-74)
  • test_srtla_ack_handling (107-131)
🪛 GitHub Check: Test
src/connection.rs

[warning] 732-732:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 732-732:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs

🪛 GitHub Actions: CI
src/connection.rs

[error] 732-732: Cargo fmt --check failed. Formatting changes are required in this file. Run 'cargo fmt' to fix.

🔇 Additional comments (11)
src/connection.rs (11)

63-66: Good change: make last_received optional.

Switching to Option avoids invalid elapsed() calls before first receive. Ensure all timeout checks handle None correctly (see comment on is_timed_out).


150-150: Init to None is correct, but update timeout logic accordingly.

Fresh connections shouldn’t be considered timed out solely because last_received is None (see is_timed_out fix below).


229-229: Correct: stamp last_received on any inbound read.

This anchors timeout/window recovery to real traffic.


260-262: Forwarding NAKs to client matches C behavior.

Keeps the SRT client informed for its own recovery logic.


312-312: ACK bookkeeping looks good.

Local acked_count + later saturating_sub avoids underflow.


318-321: Exact ACK handling is correct.

Marking the specific entry -1 and capturing send timestamp is sound.


325-329: Cumulative ACK handling is correct.

Older (< ack) entries marked acknowledged; consistent with SRT semantics.


331-333: Good: saturating_sub on in_flight_packets.

Prevents negative counts when logs contain duplicates/holes.


488-506: SRTLA global ACK gating is sensible.

Only recovering window if connected and we’ve actually received data prevents unfair boosts.


692-692: Good: seed last_received on reconnect.

Prevents immediate timeout after a successful reconnect.


722-739: Lower log noise and fix formatting

  • Change severity to debug! for effective buffer size logs:
-        if let Ok(actual_size) = sock.send_buffer_size() {
-            warn!("Effective send buffer size: {}", actual_size);
-        }
+        if let Ok(actual_size) = sock.send_buffer_size() {
+            debug!("effective send buffer size: {}", actual_size);
+        }
@@
-        if let Ok(actual_size) = sock.recv_buffer_size() {
-            warn!("Effective receive buffer size: {}", actual_size);
-        }
+        if let Ok(actual_size) = sock.recv_buffer_size() {
+            debug!("effective receive buffer size: {}", actual_size);
+        }
  • Run cargo fmt -- --check locally and apply cargo fmt to ensure CI passes.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/connection.rs (1)

209-215: Use Tokio mpsc for instant forwarding; avoid blocking in async code.

drain_incoming takes &std::sync::mpsc::Sender<Vec<u8>>, forcing a blocking recv() on the consumer side. Switch to tokio::sync::mpsc::UnboundedSender here and in sender.rs to make the forwarding task async and non-blocking.

Example signature change:

-    pub async fn drain_incoming(
+    pub async fn drain_incoming(
         &mut self,
         conn_idx: usize,
         reg: &mut SrtlaRegistrationManager,
-        instant_forwarder: &std::sync::mpsc::Sender<Vec<u8>>,
+        instant_forwarder: &tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
     ) -> Result<SrtlaIncoming> {

And keep instant_forwarder.send(...) unchanged (it returns a Result).

Also applies to: 236-244

src/sender.rs (2)

67-90: Blocking std::sync::mpsc in async task (can stall the runtime).

instant_rx.recv() blocks inside tokio::spawn. Switch to tokio::sync::mpsc::unbounded_channel() and recv().await, and update connection.rs::drain_incoming sender type accordingly.

Apply (paired with the connection.rs change):

-    let (instant_tx, instant_rx) = std::sync::mpsc::channel::<Vec<u8>>();
+    let (instant_tx, mut instant_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
@@
-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
+        tokio::spawn(async move {
+            while let Some(ack_packet) = instant_rx.recv().await {
                 let client_addr = {
                     match shared_client_addr_clone.lock() {
                         Ok(addr_guard) => *addr_guard,
                         _ => None,
                     }
                 };
                 if let Some(client) = client_addr {
                     let _ = local_listener_clone.send_to(&ack_packet, client).await;
                 }
             }
         });

531-532: Exploration is effectively “always on.”

Instant::now().elapsed() is ~0, so as_millis() % 5000 < 300 is always true at call time. Gate exploration using a real time base.

+use crate::utils::now_ms;
@@
-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore && (now_ms() % 5_000) < 300;
♻️ Duplicate comments (1)
src/connection.rs (1)

620-628: Fresh connections are treated as timed out (reconnect thrash).

When last_received is None (e.g., right after connect), is_timed_out returns true, causing immediate recovery/reconnect behaviors and skewed status metrics. Treat None as “not timed out” (or add a grace period).

Apply:

 pub fn is_timed_out(&self) -> bool {
-        !self.connected
-            || if let Some(lr) = self.last_received {
-                lr.elapsed().as_secs() >= CONN_TIMEOUT
-            } else {
-                true
-            }
+        if !self.connected {
+            return true;
+        }
+        match self.last_received {
+            Some(lr) => lr.elapsed().as_secs() >= CONN_TIMEOUT,
+            None => false, // grace until first receive
+        }
 }
🧹 Nitpick comments (3)
src/connection.rs (2)

347-360: RTT “significant change” compares against the updated EMA.

You compute rtt_change_pct after updating estimated_rtt_ms, which understates spikes. Capture the previous EMA first and compare against that.

-            if rtt > 0 && rtt <= 10_000 {
-                self.estimated_rtt_ms = if self.estimated_rtt_ms == 0.0 {
+            if rtt > 0 && rtt <= 10_000 {
+                let prev_ema = self.estimated_rtt_ms;
+                self.estimated_rtt_ms = if prev_ema == 0.0 {
                     rtt as f64
                 } else {
-                    (self.estimated_rtt_ms * 0.875) + (rtt as f64 * 0.125)
+                    (prev_ema * 0.875) + (rtt as f64 * 0.125)
                 };
                 self.last_rtt_measurement_ms = now;
-                let rtt_change_pct = if self.estimated_rtt_ms > 0.0 {
-                    ((rtt as f64 - self.estimated_rtt_ms) / self.estimated_rtt_ms * 100.0).abs()
+                let rtt_change_pct = if prev_ema > 0.0 {
+                    ((rtt as f64 - prev_ema) / prev_ema * 100.0).abs()
                 } else {
                     100.0
                 };

477-482: Confirm WINDOW_INCR - 1 matches the C reference.

The “-1” is subtle and easy to regress. Please confirm it’s intentional and add a short comment citing the exact C line/commit.

src/sender.rs (1)

198-241: Status logging not compiled on non-Unix.

Mirror the Unix branch’s status_counter/log_connection_status in the non-Unix loop so diagnostics work cross-platform and the function is used.

         tokio::select! {
             res = local_listener.recv_from(&mut recv_buf) => {
                 handle_srt_packet(res, &mut recv_buf, &mut connections, &mut last_selected_idx, &mut last_switch_time, &mut seq_to_conn, &mut seq_order, &mut last_client_addr, &shared_client_addr, &toggles).await;
             }
             _ = housekeeping_timer.tick() => {
                 // Apply any pending connection changes at a safe point
                 if let Some(changes) = pending_changes.take() {
                     if let Some(new_ips) = changes.new_ips {
                         info!("applying queued connection changes: {} IPs", new_ips.len());
                         apply_connection_changes(
                             &mut connections,
                             &new_ips,
                             &changes.receiver_host,
                             changes.receiver_port,
                             &mut last_selected_idx,
                             &mut seq_to_conn,
                             &mut seq_order,
                         ).await;
                         info!("connection changes applied successfully");
                     }
                 }
+                // Periodic status reporting parity with Unix branch
+                status_counter += 1;
+                if status_counter % 30000 == 0 {
+                    log_connection_status(&connections, &seq_to_conn, &seq_order, last_selected_idx, &toggles);
+                }
             }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between eca1c22 and ae4c217.

📒 Files selected for processing (3)
  • src/connection.rs (14 hunks)
  • src/sender.rs (14 hunks)
  • src/tests/connection_tests.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/tests/connection_tests.rs (2)
src/test_helpers.rs (1)
  • create_test_connection (12-46)
src/connection.rs (2)
  • get_score (169-179)
  • is_timed_out (621-628)
src/sender.rs (3)
src/connection.rs (1)
  • nak_burst_count (652-654)
src/registration.rs (1)
  • active_connections (184-186)
src/tests/sender_tests.rs (2)
  • test_apply_connection_changes_remove_stale (141-174)
  • tests (2-327)
🪛 GitHub Actions: CI
src/tests/connection_tests.rs

[warning] 117-117: Rustfmt diff detected at line 117 in tests/connection_tests.rs.


[warning] 267-267: Rustfmt diff detected at line 267 in tests/connection_tests.rs.


[warning] 280-280: Rustfmt diff detected at line 280 in tests/connection_tests.rs.


[warning] 285-285: Rustfmt diff detected at line 285 in tests/connection_tests.rs.

src/connection.rs

[warning] 380-380: Rustfmt diff detected at line 380.


[warning] 513-513: Rustfmt diff detected at line 513.


[warning] 746-746: Rustfmt diff detected at line 746.

src/sender.rs

[warning] 2-2: Rustfmt diff detected at line 2.


[warning] 309-309: Rustfmt diff detected at line 309.


[warning] 424-424: Rustfmt diff detected at line 424.


[warning] 558-558: Rustfmt diff detected at line 558.


[warning] 696-696: Rustfmt diff detected at line 696.


[warning] 705-705: Rustfmt diff detected at line 705.


[warning] 720-720: Rustfmt diff detected at line 720.


[warning] 736-736: Rustfmt diff detected at line 736.


[warning] 751-751: Rustfmt diff detected at line 751.


[warning] 756-756: Rustfmt diff detected at line 756.

🪛 GitHub Check: Test
src/connection.rs

[warning] 380-380:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 513-513:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 746-746:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs

src/sender.rs

[warning] 309-309:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 424-424:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 558-558:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 720-720:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 705-705:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 696-696:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs

🪛 GitHub Check: Test (Windows)
src/sender.rs

[failure] 686-686:
function log_connection_status is never used

🔇 Additional comments (9)
src/connection.rs (3)

173-179: Flow-control clamp LGTM.

Clamping max_in_flight to at least 1 prevents hard stalls when window / WINDOW_MULT is 0.


737-754: Socket buffer tuning: best-effort handling LGTM.

Graceful fallback with effective-size logging is appropriate; avoids aborting on OS caps.


380-380: Rustfmt diffs reported by CI.

Run cargo fmt to resolve formatting diffs at the noted lines.

Also applies to: 513-513, 746-746

src/tests/connection_tests.rs (4)

106-123: Good regression test for “NAK with logged packet”.

Validates the early-return path in handle_nak and guards against accidental window/NAK-count penalties.


261-289: Recovery-mode test covers key invariants.

Confirms reset window, zero in-flight, and timeout perception via old timestamp. This remains compatible with treating None as non-timeout.


239-241: Time-based timeout assertion LGTM.

Explicit absolute timeout makes the test robust.


117-117: Fix rustfmt warnings in tests.

Please run cargo fmt to clear CI’s rustfmt diffs at these lines.

Also applies to: 267-267, 280-280, 285-285

src/sender.rs (2)

309-309: Rustfmt diffs reported by CI.

Please run cargo fmt to resolve formatting warnings at these lines.

Also applies to: 424-424, 558-558, 696-696, 705-705, 720-720, 736-736, 751-751, 756-756


685-767: Fix score_desc type mismatch
Use a String for all match arms to resolve the compile error and avoid borrowing a temporary:

- let score_desc = match score {
-     -1 => "DISCONNECTED",
-     0 => "AT_CAPACITY",
-     _ => &format!("{}", score),
- };
+ let score_desc: String = match score {
+     -1 => "DISCONNECTED".to_string(),
+     0 => "AT_CAPACITY".to_string(),
+     _ => score.to_string(),
+ };

This function is invoked at line 181 of src/sender.rs, so it won’t trigger an “unused” warning on any platform.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/sender.rs (4)

76-90: Don’t block the Tokio runtime with std::sync::mpsc::Receiver::recv().

recv() blocks a runtime worker thread inside tokio::spawn. Use a dedicated OS thread with try_send_to, or switch to tokio::sync::mpsc.

Minimal change (no API ripple) — move to std::thread and use non-async send:

-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
+        std::thread::spawn(move || {
+            while let Ok(ack_packet) = instant_rx.recv() {
                 let client_addr = {
                     match shared_client_addr_clone.lock() {
                         Ok(addr_guard) => *addr_guard,
                         _ => None,
                     }
                 };
                 if let Some(client) = client_addr {
-                    let _ = local_listener_clone.send_to(&ack_packet, client).await;
+                    let _ = local_listener_clone.try_send_to(&ack_packet, client);
                 }
             }
-        });
+        });

Alternative (cleaner): migrate to tokio::sync::mpsc end-to-end.


538-539: Exploration scheduling bug: Instant::now().elapsed() is ~0 → always explores.

Using elapsed() on a fresh Instant yields ~0ms, so explore_now becomes true every time. Use wall-clock millis.

-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore && (now_ms() % 5000) < 300;

544-576: Avoid selecting disconnected links due to clamping.

Clamping to max(1.0) can boost disconnected connections (base -1) to 1.0 when quality is enabled. Skip disconnected conns and clamp to ≥0.0 only.

-    for (i, c) in conns.iter().enumerate() {
+    for (i, c) in conns.iter().enumerate() {
+        if !c.connected {
+            continue;
+        }
         let base = c.get_score() as f64;
         let score = if !enable_quality {
             base
         } else {
             let quality_mult = calculate_quality_multiplier(c);
-            let final_score = (base * quality_mult).max(1.0);
+            let final_score = (base * quality_mult).max(0.0);

636-649: Fix seq_to_conn remap after retain(): indices shift and mis-attribute NAKs.

retain() reindexes the vector; copying old indices back keeps wrong mappings. Rebuild using labels (old index → label → new index) or clear mappings.

-        // Clean up sequence tracking for removed connections
-        seq_to_conn.retain(|_, &mut conn_idx| conn_idx < connections.len());
-
-        // Rebuild sequence tracking with correct indices
-        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
-        for (seq, &old_idx) in seq_to_conn.iter() {
-            if old_idx < connections.len() {
-                new_seq_to_conn.insert(*seq, old_idx);
-            }
-        }
-        *seq_to_conn = new_seq_to_conn;
-        // Drop stale sequence IDs from the order queue
-        seq_order.retain(|seq| seq_to_conn.contains_key(seq));
+        // Build old index → label map before changes
+        // (We need to place this map before `retain`; shown inline for brevity)
+        // let old_index_to_label: Vec<String> = /* capture before retain */ ;
+        // After `retain`, build new label → index map
+        let new_label_to_index: HashMap<String, usize> =
+            connections.iter().enumerate().map(|(i, c)| (c.label.clone(), i)).collect();
+
+        // Rebuild sequence tracking with correct indices via labels
+        let mut rebuilt: HashMap<u32, usize> = HashMap::with_capacity(seq_to_conn.len());
+        for (seq, old_idx) in std::mem::take(seq_to_conn) {
+            if let Some(label) = old_index_to_label.get(old_idx) {
+                if let Some(&new_idx) = new_label_to_index.get(label) {
+                    rebuilt.insert(seq, new_idx);
+                }
+            }
+        }
+        *seq_to_conn = rebuilt;
+        // Drop stale sequence IDs
+        seq_order.retain(|seq| seq_to_conn.contains_key(seq));

If capturing old_index_to_label pre-retain is awkward here, simplest safe fallback:

+        seq_to_conn.clear();
+        seq_order.clear();
♻️ Duplicate comments (1)
src/sender.rs (1)

103-114: Fix duplicate state declarations (compile error).

seq_to_conn, seq_order, last_selected_idx, last_switch_time, all_failed_at, pending_changes are declared twice in the same scope. Keep the later canonical block (Lines 119-129) and remove the earlier one (Lines 103-118).

Apply:

-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;

Also applies to: 116-129

🧹 Nitpick comments (3)
src/sender.rs (3)

71-73: Prefer tokio::sync::Mutex in async contexts.

std::sync::Mutex can block the runtime. The locks here are brief, but switching avoids priority inversions under load.

Sketch:

-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
+use tokio::sync::Mutex;

-    let shared_client_addr = Arc::new(Mutex::new(None::<SocketAddr>));
+    let shared_client_addr = Arc::new(Mutex::new(None::<SocketAddr>));

-            if let Ok(mut addr_guard) = shared_client_addr.lock() {
-                *addr_guard = Some(src);
-            }
+            if let Ok(mut addr_guard) = shared_client_addr.try_lock() {
+                *addr_guard = Some(src);
+            }

Or, if switching fully, adopt .lock().await in async sites and keep the std::thread ack forwarder using try_lock().

Also applies to: 345-347


161-177: Call apply_connection_changes with consistent formatting.

Calls look good; ensure tests updated to pass seq_order as added parameter.

If any older test call sites remain, update them:

-    apply_connection_changes(&mut connections, &new_ips, host, port, &mut last_selected_idx, &mut seq_to_conn).await;
+    apply_connection_changes(&mut connections, &new_ips, host, port, &mut last_selected_idx, &mut seq_to_conn, &mut seq_order).await;

Also applies to: 223-238


97-101: 1ms housekeeping/status cadence: verify intended overhead.

A 1ms tick with per-iteration housekeeping can be hot under load. If bitrate drops persist, consider 5–10ms interval or conditionally gating heavier work.

Also applies to: 179-184, 223-238

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ae4c217 and 4203998.

📒 Files selected for processing (1)
  • src/sender.rs (14 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/sender.rs (3)
src/utils.rs (1)
  • now_ms (7-12)
src/connection.rs (3)
  • nak_burst_count (652-654)
  • SrtlaConnection (22-129)
  • SrtlaConnection (131-696)
src/tests/sender_tests.rs (1)
  • tests (2-327)
🪛 GitHub Check: Test (Windows)
src/sender.rs

[failure] 695-695:
function log_connection_status is never used

🪛 GitHub Actions: CI
src/sender.rs

[error] 695-695: Rust compile error: function log_connection_status is never used.

datagutt and others added 2 commits September 1, 2025 23:46
Replace incremental in-flight counting with Bond Bunny's exact approach:
recalculate from scratch on each SRT ACK to eliminate drift errors.

This fixes 'funky bitrate behavior' where quality scoring amplified
incorrect base scores, causing traffic to concentrate on one connection.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/sender.rs (2)

74-91: Blocking std::sync::mpsc::Receiver::recv() inside async task.

This blocks a Tokio worker thread. Use spawn_blocking + Handle::block_on (or switch to tokio::sync::mpsc).

-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
+        let handle = tokio::runtime::Handle::current();
+        tokio::task::spawn_blocking(move || {
+            while let Ok(ack_packet) = instant_rx.recv() {
                 let client_addr = {
                     match shared_client_addr_clone.lock() {
                         Ok(addr_guard) => *addr_guard,
                         _ => None,
                     }
                 };
                 if let Some(client) = client_addr {
-                    let _ = local_listener_clone.send_to(&ack_packet, client).await;
+                    let _ = handle.block_on(local_listener_clone.send_to(&ack_packet, client));
                 }
             }
-        });
+        });

If acceptable, consider migrating to tokio::sync::mpsc end-to-end to avoid any blocking.


616-656: Reindexing seq_to_conn after retain is incorrect (index drift).

Retaining connections shifts indices; current rebuild only drops OOB but doesn’t remap to new indices → NAKs can be misattributed.

 pub async fn apply_connection_changes(
@@
-    let current_labels: HashSet<String> = connections.iter().map(|c| c.label.clone()).collect();
+    let current_labels: HashSet<String> = connections.iter().map(|c| c.label.clone()).collect();
+    // Preserve old index→label map before mutation
+    let old_labels: Vec<String> = connections.iter().map(|c| c.label.clone()).collect();
@@
-    if connections.len() != old_len {
+    if connections.len() != old_len {
         info!("removed {} stale connections", old_len - connections.len());
         *last_selected_idx = None; // Reset selection to prevent index issues
@@
-        // Rebuild sequence tracking with correct indices
-        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
-        for (seq, &old_idx) in seq_to_conn.iter() {
-            if old_idx < connections.len() {
-                new_seq_to_conn.insert(*seq, old_idx);
-            }
-        }
-        *seq_to_conn = new_seq_to_conn;
+        // Rebuild sequence tracking using stable labels to remap indices
+        use std::collections::HashMap as Map;
+        let label_to_new_idx: Map<&str, usize> =
+            connections.iter().enumerate().map(|(i, c)| (c.label.as_str(), i)).collect();
+        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
+        for (seq, &old_idx) in seq_to_conn.iter() {
+            if let Some(old_label) = old_labels.get(old_idx) {
+                if let Some(&new_idx) = label_to_new_idx.get(old_label.as_str()) {
+                    new_seq_to_conn.insert(*seq, new_idx);
+                }
+            }
+        }
+        *seq_to_conn = new_seq_to_conn;
         // Drop stale sequence IDs from the order queue
         seq_order.retain(|seq| seq_to_conn.contains_key(seq));
     }
♻️ Duplicate comments (4)
src/connection.rs (1)

624-632: Timeout treats None as timed-out (reconnect thrash on fresh links).

Freshly connected links (last_received=None) immediately time out until first packet. Make None = not timed-out.

 pub fn is_timed_out(&self) -> bool {
-        !self.connected
-            || if let Some(lr) = self.last_received {
-                lr.elapsed().as_secs() >= CONN_TIMEOUT
-            } else {
-                true
-            }
+        if !self.connected {
+            return true;
+        }
+        match self.last_received {
+            Some(lr) => lr.elapsed().as_secs() >= CONN_TIMEOUT,
+            None => false, // grace until first receive
+        }
 }
src/sender.rs (3)

93-129: Duplicate variable declarations (compile error).

State is declared twice (recv_buf, housekeeping_timer, status_counter, last_selected_idx, last_switch_time, seq_to_conn, seq_order, all_failed_at, pending_changes). Remove the first block.

-    // Main packet processing loop variables
-    #[allow(unused_variables, unused_mut)]
-    let mut recv_buf = vec![0u8; MTU];
-    #[allow(unused_variables, unused_mut)]
-    let mut housekeeping_timer = time::interval(Duration::from_millis(1)); // Frequent timer for periodic tasks
-    #[allow(unused_variables, unused_mut)]
-    let mut status_counter = 0;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_client_addr: Option<SocketAddr> = None;
-    // Sequence → connection index mapping for correct NAK attribution
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;

701-809: Temporaries in score_desc create dangling reference.

Use owned String to avoid referencing &format!(...) temporary.

-        let score = conn.get_score();
-        let score_desc = match score {
-            -1 => "DISCONNECTED",
-            0 => "AT_CAPACITY",
-            _ => &format!("{}", score),
-        };
+        let score = conn.get_score();
+        let score_desc: String = match score {
+            -1 => "DISCONNECTED".to_string(),
+            0 => "AT_CAPACITY".to_string(),
+            _ => score.to_string(),
+        };
@@
-            i, status, conn.label, score_desc, last_recv, conn.window, conn.in_flight_packets
+            i, status, conn.label, score_desc, last_recv, conn.window, conn.in_flight_packets

223-240: Windows parity + fix “never used” warning for log_connection_status.

Replicate periodic status logging in non-Unix loop.

             _ = housekeeping_timer.tick() => {
                 // Apply any pending connection changes at a safe point
                 if let Some(changes) = pending_changes.take() {
                     if let Some(new_ips) = changes.new_ips {
                         info!("applying queued connection changes: {} IPs", new_ips.len());
                         apply_connection_changes(
                             &mut connections,
                             &new_ips,
                             &changes.receiver_host,
                             changes.receiver_port,
                             &mut last_selected_idx,
                             &mut seq_to_conn,
                             &mut seq_order,
                         ).await;
                         info!("connection changes applied successfully");
                     }
                 }
+                // Periodic status reporting (~30s)
+                status_counter += 1;
+                if status_counter % 30000 == 0 {
+                    log_connection_status(&connections, &seq_to_conn, &seq_order, last_selected_idx, &toggles);
+                }
             }
🧹 Nitpick comments (4)
src/connection.rs (3)

498-501: Optional: trace when SRTLA ACK seq not found.

Low-level trace can help during investigations without spamming logs.

-        if !found {
-            // Not found in packet log
-        }
+        if !found {
+            debug!("{}: SRTLA ACK specific not found in packet log (seq={})", self.label, seq);
+        }

741-758: Best-effort socket buffer tuning LGTM.

Doesn’t fail the connection; logs effective sizes. Consider downgrading “Effective … size” to debug to reduce noise.


313-313: Rustfmt diffs detected.

Run cargo fmt to satisfy CI.

Also applies to: 333-333, 384-384, 517-517, 750-750

src/sender.rs (1)

528-528: Rustfmt diffs detected.

Run cargo fmt to appease CI.

Also applies to: 583-583, 590-590

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4203998 and 2ada245.

📒 Files selected for processing (2)
  • src/connection.rs (14 hunks)
  • src/sender.rs (15 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/connection.rs (1)
src/tests/connection_tests.rs (3)
  • tests (2-314)
  • test_srt_ack_handling (50-74)
  • test_srtla_ack_handling (107-131)
src/sender.rs (4)
src/utils.rs (1)
  • now_ms (7-12)
src/connection.rs (1)
  • nak_burst_count (656-658)
src/registration.rs (1)
  • active_connections (184-186)
src/tests/sender_tests.rs (2)
  • test_apply_connection_changes_remove_stale (141-174)
  • tests (2-327)
🪛 GitHub Check: Test
src/connection.rs

[warning] 333-333:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 313-313:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 384-384:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 517-517:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs


[warning] 750-750:
Diff in /home/runner/work/srtla_send/srtla_send/src/connection.rs

src/sender.rs

[warning] 528-528:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs


[warning] 590-590:
Diff in /home/runner/work/srtla_send/srtla_send/src/sender.rs

🪛 GitHub Actions: CI
src/connection.rs

[warning] 313-313: Rustfmt: formatting diff detected at line 313. Run 'cargo fmt' to fix formatting.


[warning] 333-333: Rustfmt: formatting diff detected at line 333. Run 'cargo fmt' to fix formatting.


[warning] 384-384: Rustfmt: formatting diff detected at line 384. Run 'cargo fmt' to fix formatting.


[warning] 517-517: Rustfmt: formatting diff detected at line 517. Run 'cargo fmt' to fix formatting.


[warning] 750-750: Rustfmt: formatting diff detected at line 750. Run 'cargo fmt' to fix formatting.

src/sender.rs

[warning] 528-528: Rustfmt: formatting diff detected at line 528. Run 'cargo fmt' to fix formatting.


[warning] 583-583: Rustfmt: formatting diff detected at line 583. Run 'cargo fmt' to fix formatting.


[warning] 590-590: Rustfmt: formatting diff detected at line 590. Run 'cargo fmt' to fix formatting.

🪛 GitHub Check: Test (Windows)
src/sender.rs

[failure] 702-702:
function log_connection_status is never used

🔇 Additional comments (20)
src/connection.rs (11)

63-66: Good change: last_received → Option.

This unlocks sane “not-yet-received” semantics and simplifies recovery logic. Verify all callers handle None.


150-150: Init as None is fine, but guard timeout logic.

With last_received = None at connect, ensure is_timed_out does not treat None as timed-out (see fix below).


173-177: Flow control guard LGTM.

Clamping max_in_flight to ≥1 and returning 0 when saturated prevents hard stalls and overrun.


228-229: Correct: touch last_received on any inbound.

This keeps timeout detection meaningful.


256-257: Forwarding NAKs upward matches C impl.

Reasonable choice for parity and attribution.


303-309: Helpful wrap diagnostics.

Low-noise, aids in investigating sequence churn.


407-408: Early return on matched NAK avoids double-penalty.

This matches intended behavior. Nice.


481-493: Window bump mirrors C logic.

Only grow when in_flight exceeds window; uses WINDOW_INCR - 1. Looks good.


506-524: Global +1 gated on having received data is sane.

Prevents cold links from inflating window spuriously.


634-642: Recovery sentinel via “old Instant” is OK.

Given the fix above, this will correctly appear timed-out until activity resumes; resets are thorough.


710-710: Re-seeding last_received on reconnect is correct.

src/sender.rs (9)

5-5: Import Ordering is appropriate.


17-17: PKT_LOG_SIZE import aligns with status reporting.


20-20: now_ms import used consistently across logs and timing.


60-64: Binding listener early is fine; logs are helpful.


136-156: Housekeeping each loop iteration preserves C behavior.


162-184: Unix: periodic status logging is great.

Provides observability every ~30s; see Windows parity fix below.


305-317: Switch logging with bounds check looks solid.


322-328: Graceful send failure handling is good.

Marking for recovery instead of tearing down reduces thrash.


553-572: Quality degradation/recovery logs are useful and bounded.

datagutt and others added 5 commits September 2, 2025 00:26
- Add periodic status reporting to non-Unix housekeeping loop for parity
- Add cfg_attr gate to prevent 'dead_code' warning on Windows
- Ensures log_connection_status is called on all platforms every 30 seconds
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/sender.rs (1)

93-129: Duplicate variable declarations — won’t compile.

Remove the first block of temporaries; keep the single canonical declarations.

-    // Main packet processing loop variables
-    #[allow(unused_variables, unused_mut)]
-    let mut recv_buf = vec![0u8; MTU];
-    #[allow(unused_variables, unused_mut)]
-    let mut housekeeping_timer = time::interval(Duration::from_millis(1)); // Frequent timer for periodic tasks
-    #[allow(unused_variables, unused_mut)]
-    let mut status_counter = 0;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_client_addr: Option<SocketAddr> = None;
-    // Sequence → connection index mapping for correct NAK attribution
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;
+    // Main packet processing loop variables
+    let mut recv_buf = vec![0u8; MTU];
+    let mut housekeeping_timer = time::interval(Duration::from_millis(1)); // Frequent timer for periodic tasks
+    let mut status_counter = 0;
+    let mut last_client_addr: Option<SocketAddr> = None;
♻️ Duplicate comments (4)
src/connection.rs (2)

352-364: RTT log compares against the updated estimate (misleading).

Capture prev estimate before updating and log delta vs previous.

-                self.estimated_rtt_ms = if self.estimated_rtt_ms == 0.0 {
-                    rtt as f64
-                } else {
-                    (self.estimated_rtt_ms * 0.875) + (rtt as f64 * 0.125)
-                };
-                self.last_rtt_measurement_ms = now;
-                // Log significant RTT changes (>20% difference)
-                let rtt_change_pct = if self.estimated_rtt_ms > 0.0 {
-                    ((rtt as f64 - self.estimated_rtt_ms) / self.estimated_rtt_ms * 100.0).abs()
-                } else {
-                    100.0
-                };
+                let prev = self.estimated_rtt_ms;
+                let new_est = if prev == 0.0 { rtt as f64 } else { (prev * 0.875) + (rtt as f64 * 0.125) };
+                self.last_rtt_measurement_ms = now;
+                let rtt_change_pct = if prev > 0.0 {
+                    ((rtt as f64 - prev) / prev * 100.0).abs()
+                } else {
+                    100.0
+                };
                 if rtt_change_pct > 20.0 {
                     debug!(
-                        "{}: RTT changed significantly: {} ms (was {:.1} ms, +{:.1}%)",
-                        self.label, rtt, self.estimated_rtt_ms, rtt_change_pct
+                        "{}: RTT changed significantly: {} ms (was {:.1} ms, +{:.1}%)",
+                        self.label, rtt, prev, rtt_change_pct
                     );
                 }
+                self.estimated_rtt_ms = new_est;

632-637: Fresh connections treated as timed out (thrash risk).

None should not mean timeout; newly connected links will be immediately “timed out”.

 pub fn is_timed_out(&self) -> bool {
-        !self.connected
-            || if let Some(lr) = self.last_received {
-                lr.elapsed().as_secs() >= CONN_TIMEOUT
-            } else {
-                true
-            }
+        if !self.connected {
+            return true;
+        }
+        match self.last_received {
+            Some(lr) => lr.elapsed().as_secs() >= CONN_TIMEOUT,
+            None => false, // grace until first receive
+        }
 }
src/sender.rs (2)

528-539: Exploration gate is effectively always true.

Instant::now().elapsed() ≈ 0 at call time; use a real clock.

-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore && (now_ms() % 5000) < 300;

779-783: Temporary String reference escapes scope (lifetime error).

Own the String.

-        let score_desc = match score {
-            -1 => "DISCONNECTED",
-            0 => "AT_CAPACITY",
-            _ => &format!("{}", score),
-        };
+        let score_desc: String = match score {
+            -1 => "DISCONNECTED".to_string(),
+            0 => "AT_CAPACITY".to_string(),
+            _ => score.to_string(),
+        };
🧹 Nitpick comments (7)
.github/workflows/build-debian.yml (1)

19-21: Nightly selection is OK; consider pinning for reproducibility.

Using “nightly” floats and can break builds unpredictably. Suggest pinning to a dated toolchain (e.g., nightly-YYYY-MM-DD) to make Debian packages reproducible across runs.

Also applies to: 85-87

.github/workflows/ci.yml (2)

108-110: Align toolchain on Windows with README’s nightly requirement.

Add toolchain: nightly so tests run on the same compiler as Linux.

       - name: Install Rust toolchain
         uses: actions-rust-lang/setup-rust-toolchain@v1
+        with:
+          toolchain: nightly

121-123: Align toolchain on macOS with README’s nightly requirement.

Mirror Linux job for consistency.

       - name: Install Rust toolchain
         uses: actions-rust-lang/setup-rust-toolchain@v1
+        with:
+          toolchain: nightly
README.md (2)

49-58: Prefer project-scoped override over global default.

-rustup default nightly  # Set nightly as default for this project
+rustup override set nightly  # Pin nightly only in this workspace

37-42: Minor phrasing fixes.

  • “Windows runs without that arm” → “Windows runs without that feature”
  • Consider: “This project requires Rust nightly (for rustfmt configuration).”
src/connection.rs (1)

520-527: Unreachable “Major window recovery” log condition.

old and self.window differ by 1 here; (>100) never triggers. Either lower threshold or remove.

-            if old < self.window && (self.window - old) > 100 {
+            if (self.window - old) >= 100 {
src/sender.rs (1)

161-184: Status logging only on Unix loop.

Either also log on non-Unix (mirror block in the not(unix) loop) or gate function with cfg to avoid dead_code when running clippy on Windows.

+                // Periodic status reporting (every 30 seconds)
+                status_counter += 1;
+                if status_counter % 30000 == 0 {
+                    log_connection_status(&connections, &seq_to_conn, &seq_order, last_selected_idx, &toggles);
+                }

And optionally:

- pub fn log_connection_status(
+ #[cfg_attr(not(unix), allow(dead_code))]
+ pub fn log_connection_status(
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a0c0d67 and f96fcbc.

📒 Files selected for processing (7)
  • .github/workflows/build-debian.yml (2 hunks)
  • .github/workflows/ci.yml (1 hunks)
  • README.md (1 hunks)
  • src/connection.rs (14 hunks)
  • src/sender.rs (15 hunks)
  • src/tests/connection_tests.rs (3 hunks)
  • src/tests/sender_tests.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/tests/sender_tests.rs
  • src/tests/connection_tests.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/sender.rs (2)
src/utils.rs (1)
  • now_ms (7-12)
src/connection.rs (1)
  • nak_burst_count (663-665)
🪛 actionlint (1.7.7)
.github/workflows/ci.yml

21-21: key "with" is duplicated in element of "steps" section. previously defined at line:19,col:9

(syntax-check)


23-23: key "with" is duplicated in element of "steps" section. previously defined at line:19,col:9

(syntax-check)

🪛 YAMLlint (1.37.1)
.github/workflows/ci.yml

[error] 21-21: duplication of key "with" in mapping

(key-duplicates)


[error] 23-23: duplication of key "with" in mapping

(key-duplicates)

🪛 LanguageTool
README.md

[grammar] ~37-~37: There might be a mistake here.
Context: ... - Rust nightly toolchain and Cargo - Unix (Linux/macOS) or Windows - Note: ...

(QB_NEW_EN)


[grammar] ~38-~38: There might be a mistake here.
Context: ...nd Cargo - Unix (Linux/macOS) or Windows - Note: SIGHUP-based IP reload is Unix-onl...

(QB_NEW_EN)

🔇 Additional comments (3)
.github/workflows/build-debian.yml (1)

56-64: Verify Debian package naming/metadata.

Package is “srtla” while the installed binary is “srtla_send”. Ensure this name won’t conflict with other packages and matches user expectations; consider Description including the binary name or renaming Package to “srtla-send”.

src/connection.rs (1)

173-177: Good: prevents hard stall when window/WINDOW_MULT is 0.

src/sender.rs (1)

422-436: Timeout handling path is sensible; relies on is_timed_out fix.

After fixing is_timed_out(None), this recovery/REG2 flow should behave as intended.

…ng lifetime issue

- Fix stickiness test to match Bond Bunny behavior (only sticky when last == best)
- Add test case for non-sticky behavior when last selection isn't best
- Fix temporary String reference in score_desc by using owned String values
- Capture old RTT estimate before updating self.estimated_rtt_ms
- Use old_rtt_estimate for change percentage calculation and logging
- Fixes misleading RTT change deltas in debug logs that showed updated values
- Add 'now' parameter to select_connection_idx for consistent timing
- Replace ts.elapsed() with now.duration_since(ts) for proper stickiness window check
- Fix exploration window to use now.elapsed() instead of Instant::now().elapsed()
- Update all call sites and tests to pass consistent time reference
- Prevents stickiness being always true due to near-zero elapsed time comparison
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/tests/sender_tests.rs (1)

166-178: Strengthen expectations to catch misindexed seq_to_conn after removals.
With current apply_connection_changes, a sequence can incorrectly remap to a different (new) connection index. Make the test assert full cleanup when original mappings point to removed connections.

Apply this diff:

-        // Should have cleaned up sequence tracking
-        assert!(seq_to_conn.len() < 2);
+        // All previous mappings (100→1, 200→2) must be dropped because those conns were removed
+        assert!(seq_to_conn.is_empty());

You can keep the seq_order vs seq_to_conn length equality check as-is.

Also applies to: 189-191

src/sender.rs (2)

68-91: Blocking std::sync::mpsc in async task can stall the runtime. Use Tokio mpsc.
recv() blocks a Tokio worker thread. Switch to tokio::sync::mpsc.

Apply:

-    let (instant_tx, instant_rx) = std::sync::mpsc::channel::<Vec<u8>>();
+    let (instant_tx, mut instant_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
@@
-        tokio::spawn(async move {
-            while let Ok(ack_packet) = instant_rx.recv() {
+        tokio::spawn(async move {
+            while let Some(ack_packet) = instant_rx.recv().await {

641-664: seq_to_conn reindexing is incorrect after retain().
Indices shift; current code only drops OOB entries but leaves surviving indices pointing at wrong connections. Remap by labels captured pre-filter.

Apply:

@@
-    use std::collections::HashSet;
+    use std::collections::{HashMap, HashSet};
@@
-    let current_labels: HashSet<String> = connections.iter().map(|c| c.label.clone()).collect();
+    let old_labels: Vec<String> = connections.iter().map(|c| c.label.clone()).collect();
+    let current_labels: HashSet<String> = connections.iter().map(|c| c.label.clone()).collect();
@@
-        // Clean up sequence tracking for removed connections
-        seq_to_conn.retain(|_, &mut conn_idx| conn_idx < connections.len());
-
-        // Rebuild sequence tracking with correct indices
-        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
-        for (seq, &old_idx) in seq_to_conn.iter() {
-            if old_idx < connections.len() {
-                new_seq_to_conn.insert(*seq, old_idx);
-            }
-        }
-        *seq_to_conn = new_seq_to_conn;
-        // Drop stale sequence IDs from the order queue
-        seq_order.retain(|seq| seq_to_conn.contains_key(seq));
+        // Rebuild sequence tracking by mapping old idx → old label → new idx
+        let index_by_label: HashMap<String, usize> =
+            connections.iter().enumerate().map(|(i, c)| (c.label.clone(), i)).collect();
+        let mut new_seq_to_conn = HashMap::with_capacity(seq_to_conn.len());
+        for (seq, &old_idx) in seq_to_conn.iter() {
+            if let Some(label) = old_labels.get(old_idx) {
+                if let Some(&new_idx) = index_by_label.get(label) {
+                    new_seq_to_conn.insert(*seq, new_idx);
+                }
+            }
+        }
+        *seq_to_conn = new_seq_to_conn;
+        // Drop stale sequence IDs from the order queue
+        seq_order.retain(|seq| seq_to_conn.contains_key(seq));
♻️ Duplicate comments (4)
src/sender.rs (4)

179-184: Periodic status logging hook: nice parity fix for non-Unix.
This addresses the earlier dead_code warning for Windows and adds useful telemetry.

Also applies to: 242-246


786-796: Good fix: avoid temporary reference in score_desc.
Owning String removes lifetime hazard from &format!.


103-118: Duplicate state declarations (won’t compile / shadows).
seq_to_conn, seq_order, last_selected_idx, last_switch_time, all_failed_at, pending_changes are declared twice. Remove the first block with #[allow(...)] and keep the canonical block below.

Apply:

-    #[allow(unused_variables, unused_mut)]
-    let mut seq_to_conn: HashMap<u32, usize> = HashMap::with_capacity(MAX_SEQUENCE_TRACKING);
-    #[allow(unused_variables, unused_mut)]
-    let mut seq_order: VecDeque<u32> = VecDeque::with_capacity(MAX_SEQUENCE_TRACKING);
-    // Stickiness
-    #[allow(unused_variables, unused_mut)]
-    let mut last_selected_idx: Option<usize> = None;
-    #[allow(unused_variables, unused_mut)]
-    let mut last_switch_time: Option<Instant> = None;
-    // Connection failure tracking
-    #[allow(unused_variables, unused_mut)]
-    let mut all_failed_at: Option<Instant> = None;
-
-    // Pending connection changes (applied safely between processing cycles)
-    #[allow(unused_variables, unused_mut)]
-    let mut pending_changes: Option<PendingConnectionChanges> = None;

544-545: Exploration gate is effectively always true.
Instant::now().elapsed() ≈ 0; modulo check biases to true. Use a real clock.

Apply:

-    let explore_now = enable_explore && (Instant::now().elapsed().as_millis() % 5000) < 300;
+    let explore_now = enable_explore && (now_ms() % 5000) < 300;
🧹 Nitpick comments (3)
src/sender.rs (3)

69-73: Prefer async-aware lock for shared_client_addr.
std::sync::Mutex in async paths can block the scheduler; tokio::sync::Mutex (or RwLock) is safer.

Apply minimally:

-    let shared_client_addr = Arc::new(Mutex::new(None::<SocketAddr>));
+    let shared_client_addr = Arc::new(tokio::sync::Mutex::new(None::<SocketAddr>));
@@
-            if let Ok(mut addr_guard) = shared_client_addr.lock() {
+            if let Ok(mut addr_guard) = shared_client_addr.lock().await {
                 *addr_guard = Some(src);
             }

Note: adjust other .lock() sites accordingly.

Also applies to: 351-353


97-101: Avoid eventual overflow of status_counter.
Make it u64 for long-running processes.

-    let mut status_counter = 0;
+    let mut status_counter: u64 = 0;

97-99: 1ms housekeeping tick may be unnecessarily hot.
Consider 5–10ms with explicit now_ms() scheduling if you need sub-100ms tasks.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f96fcbc and e592244.

📒 Files selected for processing (2)
  • src/sender.rs (15 hunks)
  • src/tests/sender_tests.rs (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/sender.rs (2)
src/utils.rs (1)
  • now_ms (7-12)
src/connection.rs (4)
  • nak_burst_count (663-665)
  • SrtlaConnection (22-129)
  • SrtlaConnection (131-696)
  • handle_srtla_ack_global (476-493)
src/tests/sender_tests.rs (2)
src/sender.rs (2)
  • std (68-68)
  • select_connection_idx (511-606)
src/test_helpers.rs (1)
  • create_test_connections (49-90)
🔇 Additional comments (2)
src/tests/sender_tests.rs (2)

4-4: Import addition looks good.
VecDeque import aligns with new seq_order usage.


34-53: Good stickiness coverage.
Test clearly verifies “stay on best within stickiness window” and “switch when last isn’t best.”

datagutt and others added 5 commits September 2, 2025 01:20
- Consolidate three duplicate 'with:' blocks into single mapping
- Merge toolchain and components keys under one 'with:' block
- Fixes YAML parsing error that prevented CI from running
@datagutt datagutt merged commit 6da565b into main Sep 1, 2025
5 checks passed
@datagutt datagutt deleted the refactor/housekeeping branch September 1, 2025 23:35
@coderabbitai coderabbitai bot mentioned this pull request Oct 15, 2025
@coderabbitai coderabbitai bot mentioned this pull request Jan 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants