Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Oct 25, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Oct 29, 2025 1:53am
rivet-inspector Ignored Ignored Preview Oct 29, 2025 1:53am
rivet-site Ignored Ignored Preview Oct 29, 2025 1:53am

Copy link
Contributor Author

MasterPtato commented Oct 25, 2025


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Oct 25, 2025

Pull Request Review: Improve Reconnection with Sleeping and Tunnel

Summary

This PR improves WebSocket reconnection reliability, particularly for sleeping actors and tunneled connections. The changes introduce message persistence, better reconnection handling, and synchronization improvements.

Positive Changes ✅

Architecture Improvements

  1. Concurrent HashMap (scc): Switching from Mutex<HashMap> to scc::HashMap in shared_state.rs is excellent for reducing lock contention in concurrent scenarios.

  2. WebSocket Handle Simplification: The refactoring of WebSocketHandle to eagerly accept connections (lines 23-31 in websocket_handle.rs) removes complex state machine logic and race conditions. Much cleaner!

  3. Pending Message Persistence: Storing pending messages by request ID (rather than by message ID) and replaying them on reconnection (send_reclaimed_messages) is a solid approach for handling actor sleep/wake cycles.

  4. Unique Request IDs: Using stable unique_request_id across retries (line 1794 in proxy_service.rs) ensures messages can be correlated correctly.

  5. Event Buffering in TypeScript: The WebSocketTunnelAdapter buffering events until listeners are attached is good defensive programming.

Critical Issues 🔴

1. Incorrect Into Implementation (shared_state.rs:34-37)

impl Into<protocol::ToClientTunnelMessage> for PendingMessage {
    fn into(self) -> protocol::ToClientTunnelMessage {
        self.payload
    }
}

Problem: This trait is never used in the codebase, and if it were, it should be From instead of Into (Rust convention is to implement From and get Into for free).

Fix: Remove this implementation entirely, or if needed, use:

impl From<PendingMessage> for protocol::ToClientTunnelMessage {
    fn from(msg: PendingMessage) -> Self {
        msg.payload
    }
}

2. Race Condition in Ack Handling (shared_state.rs:196-206)

if let Some(mut pending_msgs) = self.pending_messages.get_async(&msg.request_id).await {
    pending_msgs.retain(|m| m.payload.message_id != msg.message_id);
}

Problem: The retain modifies a temporary mutable reference but doesn't commit changes back. With scc::HashMap, you need to either:

  • Use update_async to atomically modify, or
  • The get_async returns a guard that commits on drop

Verification needed: Confirm that scc::HashMap::get_async() returns a guard that commits mutations on drop. If not, acks won't actually remove pending messages, causing memory leaks and incorrect replay behavior.

3. Hardcoded Backoff (proxy_service.rs:1853-1857)

let backoff = ProxyService::calculate_backoff(attempts, initial_interval);
let backoff = Duration::from_millis(100);  // ⚠️ Immediately overwrites!

Problem: Line 1857 completely overwrites the calculated backoff, making it always 100ms regardless of attempt count.

Fix: Either use the calculated backoff OR use a fixed 100ms, but remove one of these lines.

4. ACK Timeout Mismatch (Critical!)

  • Runner (TypeScript): MESSAGE_ACK_TIMEOUT = 5000 (5 seconds) - tunnel.ts:10
  • Gateway (Rust): MESSAGE_ACK_TIMEOUT = 30000 (30 seconds) - shared_state.rs:17

Problem: Runner will timeout and reject requests after 5 seconds, but gateway will keep messages pending for 30 seconds. This mismatch can cause:

  • Premature request failures on the runner side
  • Unnecessary pending message accumulation on gateway side
  • Inconsistent behavior under load

Fix: Align these values. Consider making 30 seconds standard for both, or making it configurable.

5. Debug Logging Left in Production (shared_state.rs:152)

tracing::info!(l=?pending_msgs_by_req_id.len(), message_id=?Uuid::from_bytes(payload.message_id), request_id=?Uuid::from_bytes(payload.request_id), "new msg -----------");

Problem: Multiple debug log statements with "-------" separators (lines 152, 196) are using tracing::info! instead of tracing::debug!.

Fix: Change to tracing::debug! and remove the separator dashes to follow CLAUDE.md logging conventions.

Moderate Issues 🟡

6. Unnecessary Async Blocks Removed

The removal of redundant async blocks in epoxy/src/ops/kv/*.rs is good, but verify this doesn't change the instrumentation span behavior.

7. Missing Error Context

In proxy_service.rs:1799, the context message is good:

.context("failed initiating websocket handle")?;

However, similar error paths at lines 2015 lack context. Be consistent.

8. Potential Message Reordering

When send_reclaimed_messages replays pending messages (shared_state.rs:268-298), there's no guarantee they'll be delivered in order relative to new messages being sent concurrently. Consider if this is acceptable for your use case.

9. MAX_PENDING_MSGS_PER_REQ Limit (shared_state.rs:154-161)

Setting limit to 1024 is reasonable, but when hit, you remove ALL pending messages for that request:

self.pending_messages.remove_async(&request_id).await;

This might be too aggressive - consider closing the connection more gracefully.

Minor Issues / Suggestions 🟢

10. WebSocket Receiver Mutex Contention (websocket_handle.rs:43-45)

pub fn recv(&self) -> Arc<Mutex<WebSocketReceiver>> {
    self.ws_rx.clone()
}

Returning Arc<Mutex<T>> means callers need to lock. Consider if this API could be improved by making recv take &mut self or by using a channel.

11. Logging Message Improvements

Per CLAUDE.md conventions:

  • Line 152: "new msg" → "sending new message"
  • Line 196: "ack" → "received ack"
  • Line 306: "tunnel sub closed" → "tunnel subscription closed"

12. Test Coverage

The test in actor_sleep.ts is good but only tests 10 iterations. Consider:

  • Testing boundary conditions (e.g., sleeping exactly at MESSAGE_ACK_TIMEOUT)
  • Testing concurrent WebSocket connections
  • Testing what happens when the pending message limit is reached

13. TypeScript Logging

Multiple debug logs in websocket-tunnel-adapter.ts and tunnel.ts use console-style messages. Consider if these should use structured logging like the Rust side.

Security Considerations 🔒

  1. DoS via Pending Messages: With 1024 max pending messages per request, a malicious client could potentially accumulate many slow requests. The limit helps, but consider rate limiting request creation too.

  2. WebSocket Protocol Validation: Ensure the rivet_actor.{actor_id} protocol extraction is properly validated against injection attacks.

Performance Considerations ⚡

  1. SCC HashMap: Good choice for high concurrency. Verify it's being used correctly with the async API.

  2. Message Cloning: In send_reclaimed_messages (line 288), messages are cloned via payload.clone(). For large messages, this could be expensive. Consider if you need a reference-counted approach.

Testing Recommendations 🧪

  1. Add integration test for message replay after actor wake
  2. Test ACK timeout scenarios on both runner and gateway
  3. Test the MAX_PENDING_MSGS_PER_REQ limit
  4. Test concurrent WebSocket reconnections
  5. Test behavior when messages arrive out of order

Final Recommendations

Before merging:

  1. ✅ Fix the ACK timeout mismatch ([SVC-2483] Remove hardcoded uses of rivet.gg #4)
  2. ✅ Fix the hardcoded backoff override ([SVC-2504] Fix 5 GB upload limit for local development from Cloudflare #3)
  3. ✅ Verify the scc::HashMap ack removal actually works ([SVC-2479] Send cluster events to PostHog #2)
  4. ✅ Clean up debug logging ([SVC-2358] Enable Redis memory overcommit #5)

Nice to have:

Overall Assessment

This is a well-thought-out improvement to WebSocket reconnection reliability. The core architecture changes (concurrent HashMap, message persistence, stable request IDs) are sound. The main concerns are around configuration mismatches and potential race conditions that need verification before merge.

Recommendation: Request changes for critical issues #2, #3, and #4, then approve after verification.

@MasterPtato MasterPtato force-pushed the 10-24-fix_improve_reconnection_with_sleeping_and_tunnel branch from 61ae6af to 049fe1f Compare October 27, 2025 23:59
@claude
Copy link

claude bot commented Oct 28, 2025

Code Review: Improve reconnection with sleeping and tunnel

Thanks for this PR! I've reviewed the changes focusing on code quality, best practices, potential bugs, performance, security, and test coverage. Here's my feedback:

Overall Assessment

This PR makes significant improvements to WebSocket reconnection handling and tunnel message reliability. The changes are generally well-architected, but there are several areas that need attention.


Major Issues

1. Data Race and Concurrency Issues in shared_state.rs

Location: engine/packages/pegboard-gateway/src/shared_state.rs:145-163

The code modifies pending_msgs_by_req_id while holding a mutable reference from the scc::HashMap:

let mut pending_msgs_by_req_id = self
    .pending_messages
    .entry_async(request_id)
    .await
    .or_insert_with(Vec::new);
let pending_msgs_by_req_id = pending_msgs_by_req_id.get_mut();

if pending_msgs_by_req_id.len() >= MAX_PENDING_MSGS_PER_REQ {
    self.pending_messages.remove_async(&request_id).await;  // ❌ Potential issue
    // ...
}

Issue: You're calling remove_async() while still holding a mutable reference to the entry. This could lead to undefined behavior or panics. The entry reference should be dropped before calling remove_async().

Recommendation:

if pending_msgs_by_req_id.len() >= MAX_PENDING_MSGS_PER_REQ {
    drop(pending_msgs_by_req_id); // Drop the entry reference first
    self.pending_messages.remove_async(&request_id).await;
    return Err(WebsocketPendingLimitReached { limit: MAX_PENDING_MSGS_PER_REQ }.build());
}

2. Debug Logging in Production Code

Locations: Multiple files with debug statements

The code contains several verbose debug/info logging statements that appear to be debugging artifacts:

  • shared_state.rs:152 - tracing::info!(l=?pending_msgs_by_req_id.len(), message_id=?Uuid::from_bytes(payload.message_id), request_id=?Uuid::from_bytes(payload.request_id), "new msg -----------");
  • shared_state.rs:196 - tracing::info!(message_id=?Uuid::from_bytes(msg.message_id), request_id=?Uuid::from_bytes(msg.request_id), "ack -----------");
  • Multiple "------------ " prefixed logs in TypeScript files

Issue: These look like temporary debugging statements with informal formatting (e.g., "------------ ", "-----------").

Recommendation: Remove or downgrade to tracing::debug! with more professional formatting before merging.

3. ACK Timing Logic Asymmetry

Location: tunnel.ts:286-299 and shared_state.rs:195-206

The WebSocket message ACK is conditionally sent based on whether the message is "unhandled":

let unhandled = await this.#handleWebSocketMessage(
    message.requestId,
    message.messageKind.val,
);

if (!unhandled) {
    this.#sendAck(message.requestId, message.messageId);
}

Issue: The _handleMessage method returns true when the adapter doesn't exist, meaning no ACK is sent. However, this creates a subtle edge case where messages sent during a brief window (after WS close but before cleanup) won't be ACKed, causing unnecessary retries.

Recommendation: Consider ACKing even "unhandled" messages to prevent retry storms, or add explicit handling for this edge case.

4. Missing Error Handling for send_reclaimed_messages

Location: lib.rs:249-252

self.shared_state
    .send_reclaimed_messages(request_id)
    .await?;

Issue: If send_reclaimed_messages fails after the WebSocket handshake succeeds, the connection is aborted. However, the client has already been notified that the connection succeeded. This could leave the client in a bad state.

Recommendation: Add better error recovery - perhaps close the WebSocket with an appropriate error code rather than returning early.


Medium Issues

5. MESSAGE_ACK_TIMEOUT Mismatch

Locations:

  • Rust: shared_state.rs:17 - const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30);
  • TypeScript: tunnel.ts:10 - const MESSAGE_ACK_TIMEOUT = 5000; // 5 seconds

Issue: The timeout values are significantly different (30s vs 5s), which could lead to inconsistent behavior.

Recommendation: Align these timeouts or document why they differ.

6. WebSocketHandle State Management Simplification

Location: websocket_handle.rs:22-46

The refactor removes the state machine pattern in favor of always accepting the websocket immediately. While simpler, this changes the behavior significantly.

Previous: Lazy acceptance (accept on first use)
Current: Eager acceptance (accept immediately in constructor)

Concern: This means websockets are always accepted before any validation can occur in the caller. Ensure this is the intended behavior and doesn't introduce security issues.

7. Into Implementation Should Use From

Location: shared_state.rs:34-37

impl Into<protocol::ToClientTunnelMessage> for PendingMessage {
    fn into(self) -> protocol::ToClientTunnelMessage {
        self.payload
    }
}

Issue: The Rust API guidelines recommend implementing From instead of Into, as Into is automatically derived from From.

Recommendation:

impl From<PendingMessage> for protocol::ToClientTunnelMessage {
    fn from(msg: PendingMessage) -> Self {
        msg.payload
    }
}

Minor Issues

8. Unnecessary Async Block Removal

Locations: get_local.rs:32-40, get_optimistic.rs:55-74, etc.

The removal of inner async blocks looks correct but ensure these don't change instrumentation behavior in subtle ways.

9. Hardcoded Backoff Value

Location: proxy_service.rs:1857

let backoff = Duration::from_millis(100);

Issue: This appears to override any exponential backoff calculation (referenced but not shown). The comment mentions initial_interval but uses a hardcoded value.

Recommendation: Either use the calculated backoff or document why it's hardcoded.

10. Clone Before Checking Conditions

Location: shared_state.rs:111-124

Multiple clones happen before checking if the request is in flight. Consider checking first to avoid unnecessary work.


Performance Considerations

11. Lock-Free HashMap Usage

Location: shared_state.rs:4

Good choice using scc::HashMap for lock-free concurrent access! However:

Concern: The frequent get_async() followed by mutations could still cause contention under high load. Consider batching operations where possible.

12. Message Retention Strategy

Location: shared_state.rs:154-161

The code limits pending messages to 1024 per request and aborts the connection when exceeded.

Issue: This is a hard limit that could be hit legitimately under slow network conditions or burst traffic.

Recommendation: Consider:

  • Making this configurable
  • Adding metrics to monitor how often this limit is hit
  • Implementing a more graceful degradation strategy

Security Concerns

13. DOS via Pending Message Accumulation

Location: shared_state.rs:154

While the MAX_PENDING_MSGS_PER_REQ limit helps, there's no global limit on total pending messages across all requests.

Issue: An attacker could open many connections and accumulate up to 1024 messages each, potentially exhausting memory.

Recommendation: Add a global pending message limit or at least add monitoring/alerting.


Test Coverage

14. Missing Test Coverage

The PR modifies critical connection handling logic but doesn't include new tests.

Needed Tests:

  • Reconnection scenarios with pending messages
  • Message reclamation after reconnection
  • Pending message limit enforcement
  • ACK timeout handling
  • Concurrent request handling with the new lock-free HashMap

Recommendation: Add integration tests covering the reconnection flow, especially testing the message reclamation logic.


Positive Observations

  1. Good error handling: New error type WebsocketPendingLimitReached follows the project's error convention
  2. Logging improvements: Generally better structured logging with UUIDs
  3. Concurrency upgrade: Moving from Mutex<HashMap> to scc::HashMap is a smart performance improvement
  4. Unique request ID: Using a stable unique_request_id for WebSocket reconnection is a solid approach
  5. Event buffering: The TypeScript WebSocketTunnelAdapter event buffering (lines 22-26) elegantly handles race conditions

Recommendations Summary

Before merging:

  1. Fix the data race in shared_state.rs:155 (remove while holding entry reference)
  2. Remove or clean up debug logging statements
  3. Add tests for reconnection scenarios
  4. Align MESSAGE_ACK_TIMEOUT values or document difference
  5. Consider adding global pending message limits

Nice to have:
6. Fix the IntoFrom implementation
7. Review ACK timing for unhandled messages
8. Add error recovery for send_reclaimed_messages
9. Make message limits configurable


Overall, this is a solid improvement to the reconnection logic! The main concerns are around the data race and production readiness of the debug logging. Once those are addressed, this should be good to merge.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Oct 28, 2025

More templates

@rivetkit/actor

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/actor@3261

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3261

@rivetkit/core

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/core@3261

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3261

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3261

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3261

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3261

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3261

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3261

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3261

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3261

commit: dafcccf

@MasterPtato MasterPtato force-pushed the 10-24-fix_improve_reconnection_with_sleeping_and_tunnel branch from 049fe1f to dafcccf Compare October 29, 2025 01:53
@claude
Copy link

claude bot commented Oct 29, 2025

PR Review: Improve reconnection with sleeping and tunnel

Overview

This PR improves WebSocket reconnection handling for sleeping actors and tunnel connections. The changes address several important areas around connection lifecycle, message acknowledgment, and retry logic.

Positive Aspects

Improved WebSocket Handle Design - Simplifying WebSocketHandle by accepting the connection immediately and exposing split streams is much cleaner than the complex state machine approach.

Unique Request ID for Retries - Using a stable unique_request_id across retries (proxy_service.rs:1794, 1808) is a smart approach to maintain continuity.

Message Reclaiming - The new send_reclaimed_messages functionality (shared_state.rs:268-298) to replay pending messages after reconnection is well-designed.

Event Buffering in WebSocket Adapter - Buffering events when listeners aren't yet attached (websocket-tunnel-adapter.ts:22-26, 347-360) solves race conditions elegantly.

Critical Issues

1. Concurrency Safety with scc::HashMap (shared_state.rs)

Severity: High

The PR introduces scc::HashMap for lock-free concurrent access, but the implementation has several race conditions:

// shared_state.rs:111-119
if let Some(mut req) = self.requests_in_flight.get_async(&request_id).await {
    let receiver_subject = req.receiver_subject.clone();
    let include_reply_to = !req.opened;
    if include_reply_to {
        req.opened = true;  // ⚠️ Mutation through entry handle
    }
    (receiver_subject, include_reply_to)
}

Problem: The mutation of req.opened may not be visible to concurrent accessors with scc::HashMap. The entry is dropped after this scope, potentially losing the update.

Recommendation: Either:

  • Use update_async for atomic updates
  • Switch to tokio::sync::RwLock<HashMap> if stronger consistency is needed
  • Document the relaxed consistency guarantees

2. Message Acknowledgment Logic Flaw (shared_state.rs:196-206)

Severity: High

if let Some(mut pending_msgs) = self.pending_messages.get_async(&msg.request_id).await {
    pending_msgs.retain(|m| m.payload.message_id != msg.message_id);
} else {
    tracing::warn!("pending message does not exist or ack received after message body")
};

Problems:

  1. retain modifies through the entry handle - the update may not persist after the Entry is dropped
  2. If pending_msgs becomes empty, it stays in the map forever (memory leak)
  3. The else warning is misleading - it could also mean a reconnection scenario

Recommendation: Use entry_async() with explicit mutation:

if let Some(mut entry) = self.pending_messages.entry_async(msg.request_id).await {
    entry.get_mut().retain(|m| m.payload.message_id != msg.message_id);
    if entry.get().is_empty() {
        entry.remove();
    }
}

3. Hardcoded Backoff (proxy_service.rs:1853-1857)

Severity: Medium

let backoff = ProxyService::calculate_backoff(attempts, initial_interval);
let backoff = Duration::from_millis(100);  // ⚠️ Calculated backoff is immediately overwritten

Problem: The calculated backoff is unused. This defeats the exponential backoff pattern and could lead to thundering herd issues.

Recommendation: Remove one of these lines - likely the hardcoded 100ms value should be deleted.

4. Pending Message Limit Behavior (shared_state.rs:154-161)

Severity: Medium

if pending_msgs_by_req_id.len() >= MAX_PENDING_MSGS_PER_REQ {
    self.pending_messages.remove_async(&request_id).await;
    return Err(WebsocketPendingLimitReached { limit: MAX_PENDING_MSGS_PER_REQ }.build());
}

Problems:

  1. Removes ALL pending messages when limit is reached - could drop legitimate in-flight messages
  2. The connection is terminated but the client doesn't know which messages were lost
  3. No mechanism to apply backpressure before hitting the limit

Recommendations:

  • Keep pending messages and reject only the new message
  • Consider implementing backpressure at 80-90% capacity
  • Log which messages are being dropped for debugging

5. Debug/Temporary Logging (shared_state.rs:152, 196)

Severity: Low

Multiple instances of debug logging with "-------" separators and tracing::info! for debug info:

tracing::info!(l=?pending_msgs_by_req_id.len(), message_id=?Uuid::from_bytes(payload.message_id), request_id=?Uuid::from_bytes(payload.request_id), "new msg -----------");

Problem: Appears to be temporary debug code that should be cleaned up or downgraded to tracing::debug!.

Recommendation: Change to tracing::debug! and remove the "-------" markers.

Code Quality Issues

6. Incorrect Trait Implementation (shared_state.rs:34-38)

impl Into<protocol::ToClientTunnelMessage> for PendingMessage {
    fn into(self) -> protocol::ToClientTunnelMessage {
        self.payload
    }
}

Problem: The Rust API guidelines recommend implementing From instead of Into (the reciprocal is auto-implemented).

Recommendation:

impl From<PendingMessage> for protocol::ToClientTunnelMessage {
    fn from(msg: PendingMessage) -> Self {
        msg.payload
    }
}

7. Missing Error Context (lib.rs:1797-1799)

let ws_handle = WebSocketHandle::new(client_ws)
    .await
    .context("failed initiating websocket handle")?;

Problem: Generic error message doesn't help debugging. Should include request details.

Recommendation:

.with_context(|| format!("failed initiating websocket handle for path: {}", req_path))?

8. Inconsistent ACK Timeout (tunnel.ts:10 vs shared_state.rs:17)

  • TypeScript: MESSAGE_ACK_TIMEOUT = 5000 (5 seconds)
  • Rust: MESSAGE_ACK_TIMEOUT = Duration::from_secs(30) (30 seconds)

Problem: Mismatch could cause the runner to timeout before the gateway, leading to confusing behavior.

Recommendation: Document why these differ or align them.

Performance Considerations

9. Clone in Hot Path (shared_state.rs:143)

payload: payload.clone(),

Problem: Cloning the entire payload for every pending message could be expensive for large WebSocket messages.

Recommendation: Consider using Arc<protocol::ToClientTunnelMessage> if the payload is large or frequently resent.

10. Unnecessary Async Operations (shared_state.rs:277-278)

let Some(entry) = self.pending_messages.get_async(&request_id).await else {
    return Ok(());
};

Problem: This awaits but doesn't perform I/O. The _async variants of scc::HashMap methods yield occasionally but aren't actually async I/O.

Note: This is fine if intentional for cooperative scheduling, but consider profiling if this path is very hot.

Testing Concerns

11. Test Coverage

The test changes in scripts/tests/actor_sleep.ts are minimal - the main change is looping 10 times. Consider adding:

  • Tests for message loss during reconnection
  • Tests for the new pending message limit
  • Tests for concurrent WebSocket operations
  • Tests verifying message ordering after reconnection

Security Considerations

12. DoS via Pending Messages

While MAX_PENDING_MSGS_PER_REQ limits messages per request, there's no global limit across all requests. An attacker could create many requests, each with 1024 pending messages.

Recommendation: Consider adding a per-gateway or per-actor global limit.

Minor Issues

  1. Unused parameter in custom_serve.rs:29 - unique_request_id added but only used in one implementation
  2. TODO comment in pegboard-gateway/Cargo.toml:15 - "Doesn't match workspace version" for hyper dependency
  3. Redundant async blocks removed in get_local.rs and get_optimistic.rs - Good cleanup! ✅

Recommendations Summary

Must Fix Before Merge:

  1. Fix scc::HashMap race conditions (Issue [SVC-2555] Set up issue templates #1, [SVC-2479] Send cluster events to PostHog #2)
  2. Remove hardcoded backoff override (Issue [SVC-2504] Fix 5 GB upload limit for local development from Cloudflare #3)
  3. Fix pending message limit behavior (Issue [SVC-2483] Remove hardcoded uses of rivet.gg #4)

Should Fix:
4. Clean up debug logging (Issue #5)
5. Align ACK timeouts or document difference (Issue #8)
6. Add error context (Issue #7)

Consider:
7. Implement From instead of Into (Issue #6)
8. Add comprehensive tests (Issue #11)
9. Add global pending message limits (Issue #12)
10. Profile payload cloning overhead (Issue #9)

Conclusion

This PR makes significant improvements to reconnection handling, but has several critical concurrency bugs around the scc::HashMap usage that must be addressed. The overall architecture is sound, but the implementation needs refinement before merge.

The concept of stable request IDs across retries and message reclaiming is excellent and will significantly improve reliability.


Review by Claude Code 🤖

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants