Skip to content

Conversation

@thlorenz
Copy link
Collaborator

@thlorenz thlorenz commented Nov 20, 2025

Summary by CodeRabbit

  • Refactor
    • Optimized the message handling system to process multiple subscription updates concurrently rather than sequentially. This architectural enhancement enables parallel execution of in-flight messages, resulting in improved system throughput and better resource efficiency.

✏️ Tip: You can customize this high-level summary in your review settings.

Allow pubsub actor messages to be handled in parallel instead of sequentially. This improves
throughput by processing multiple messages concurrently rather than waiting for each message to
complete before processing the next one.

Closes: #652

Details

magicblock-chainlink

Modified the message handling loop in ChainPubsubActor to use FuturesUnordered for
concurrent message processing:

  • Messages are no longer awaited immediately upon arrival; instead they are spawned as futures
    and added to a collection
  • The tokio select loop now also polls pending message futures, allowing multiple messages to
    be processed concurrently
  • This prevents a single slow message from blocking the processing of subsequent messages,
    improving overall throughput and responsiveness of the pubsub actor

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 20, 2025

Walkthrough

The ChainPubsubActor now processes subscription messages concurrently using FuturesUnordered instead of awaiting each message handler sequentially. Messages are pushed as futures to a pending queue, and the main loop concurrently awaits their completion, improving subscription request throughput.

Changes

Cohort / File(s) Summary
Concurrent message handling in ChainPubsubActor
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
Introduces FuturesUnordered for pending_messages queue to enable parallel processing of subscription updates. Messages are now pushed as futures instead of awaited immediately, with the main select loop awaiting completion of in-flight futures to preserve correctness while improving concurrency.

Sequence Diagram

sequenceDiagram
    participant Main Loop
    participant Pending Messages
    participant Handler

    Note over Main Loop: Before: Sequential processing
    Main Loop->>Handler: Await handle_msg(msg1)
    Handler-->>Main Loop: Complete
    Main Loop->>Handler: Await handle_msg(msg2)
    Handler-->>Main Loop: Complete

    Note over Main Loop: After: Concurrent processing
    Main Loop->>Pending Messages: Push handle_msg(msg1)
    Main Loop->>Pending Messages: Push handle_msg(msg2)
    Main Loop->>Pending Messages: Await next() completion
    par Concurrent
        Pending Messages->>Handler: Process msg1
        Pending Messages->>Handler: Process msg2
    and
        Handler-->>Pending Messages: Complete msg1
        Handler-->>Pending Messages: Complete msg2
    end
    Pending Messages-->>Main Loop: Return completed future
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Async control flow modification involving FuturesUnordered and select! loop changes requires careful verification of concurrent correctness
  • Ensure that pending futures are properly polled to completion and shutdown behavior is maintained
  • Verify that variable rebindings for moved values don't introduce lifetime or borrow-checker issues
  • Confirm that error handling and message ordering guarantees are preserved in concurrent execution path

Possibly related PRs

Suggested reviewers

  • bmuddha

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: enabling parallel handling of pubsub actor messages by introducing concurrent futures processing instead of sequential awaiting.
Linked Issues check ✅ Passed The code changes fully address issue #652 by implementing FuturesUnordered to process subscriptions concurrently while preserving the guarantee that initiating code waits for its specific subscription to complete.
Out of Scope Changes check ✅ Passed All changes are scoped to the ChainPubsubActor message handling logic and directly address the performance bottleneck identified in issue #652; no unrelated modifications detected.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch thlorenz/chainlink-perf

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Nov 20, 2025

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

Copy link
Contributor

@bmuddha bmuddha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@thlorenz thlorenz force-pushed the thlorenz/chainlink-perf branch from 9004162 to e9c3a8f Compare November 20, 2025 18:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9004162 and e9c3a8f.

📒 Files selected for processing (1)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-11-19T09:34:37.917Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.917Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build Project
  • GitHub Check: run_make_ci_test
  • GitHub Check: run_make_ci_lint

},
};

use futures_util::stream::FuturesUnordered;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions

The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:

  • AccountSubscribe:

    • Calls Self::add_sub(...).await.
    • add_sub inserts the AccountSubscription into subs before its first .await (account_subscribe().await).
  • AccountUnsubscribe:

    • Has no .await; its entire branch is synchronous and completes on the first poll of its future.

With FuturesUnordered (and without any explicit ordering guarantees), there is now a valid interleaving where:

  1. A subscribe message for pubkey is received first and its handle_msg future is pushed into pending_messages.
  2. A subsequent unsubscribe message for the same pubkey is received and its handle_msg future is also pushed.
  3. When pending_messages.next() runs, it may poll the unsubscribe future before the subscribe future’s first poll.
  4. In that case:
    • The unsubscribe branch checks subs.lock().get(&pubkey) and finds no entry yet, so it returns AccountSubscriptionDoesNotExist.
    • Afterwards, the subscribe future is polled for the first time, and add_sub inserts the AccountSubscription into the map and sets up the real subscription.

This results in:

  • The caller logically issuing subscribe then unsubscribe, but:
    • Seeing an unsubscribe error (AccountSubscriptionDoesNotExist).
    • Still ending up with a live subscription (because add_sub completes later and spawns the update task).
  • A leaked subscription that the caller believes was removed.
  • The comment in add_sub about “messages to this actor are processed in the order they are sent” no longer being true under the concurrent FuturesUnordered processing.

This is a semantic correctness regression introduced by the concurrency change, not just a theoretical concern.

Consider one of the following fixes:

  • Restrict concurrency to subscription establishment only:

    • Keep the actor loop sequential for control messages (AccountUnsubscribe, Reconnect).
    • For AccountSubscribe, enqueue/track only the long‑running “establish subscription” work in FuturesUnordered (e.g., a dedicated async function that performs account_subscribe + spawn of the update loop), while the main handle_msg path that updates subs and returns the oneshot response preserves message ordering.
  • Explicitly track in‑flight subscriptions:

    • Maintain a per‑Pubkey state for “subscription being created”.
    • Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to subs).

Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it’s acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.

Also applies to: 232-252, 257-257, 362-370, 376-381


🌐 Web query:

Does `futures_util::stream::FuturesUnordered` provide any guarantee about the order in which newly pushed futures are first polled, relative to each other? Specifically, can a future that was pushed later be polled and complete before a previously pushed future’s first poll?

💡 Result:

Short answer: No — there is no guarantee. FuturesUnordered is unordered: pushing a future does not poll it, and futures are driven only when they generate wake notifications, so a future pushed later can be polled (and even complete) before an earlier-pushed future’s first poll. Use FuturesOrdered if you need FIFO semantics. [1][2]

Sources:

  • futures-util/FuturesUnordered docs (behaviour and note that it “may complete in any order”). [1]
  • futures::stream::FuturesOrdered docs (contrast: preserves FIFO order). [2]

The web search confirms the review comment's analysis is correct. FuturesUnordered provides no guarantees about polling order—a future pushed later can be polled and complete before an earlier-pushed future's first poll; FuturesOrdered is the correct choice for FIFO semantics. This directly validates the identified race condition where an unsubscribe could complete before a preceding subscribe's critical section (insertion into subs) executes.

The review comment is accurate and comprehensive. The semantic correctness regression is real and actionable.


Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions

The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:

  • AccountSubscribe:

    • Calls Self::add_sub(...).await.
    • add_sub inserts the AccountSubscription into subs before its first .await (account_subscribe().await).
  • AccountUnsubscribe:

    • Has no .await; its entire branch is synchronous and completes on the first poll of its future.

With FuturesUnordered (which provides no ordering guarantees), there is now a valid interleaving where:

  1. A subscribe message for pubkey is received first and its handle_msg future is pushed into pending_messages.
  2. A subsequent unsubscribe message for the same pubkey is received and its handle_msg future is also pushed.
  3. When pending_messages.next() runs, it may poll the unsubscribe future before the subscribe future's first poll.
  4. In that case:
    • The unsubscribe branch checks subs.lock().get(&pubkey) and finds no entry yet, so it returns AccountSubscriptionDoesNotExist.
    • Afterwards, the subscribe future is polled for the first time, and add_sub inserts the AccountSubscription into the map and sets up the real subscription.

This results in:

  • The caller logically issuing subscribe then unsubscribe, but:
    • Seeing an unsubscribe error (AccountSubscriptionDoesNotExist).
    • Still ending up with a live subscription (because add_sub completes later and spawns the update task).
  • A leaked subscription that the caller believes was removed.
  • The comment in add_sub about "messages to this actor are processed in the order they are sent" no longer being true under the concurrent FuturesUnordered processing.

Consider one of the following fixes:

  • Restrict concurrency to subscription establishment only:

    • Keep the actor loop sequential for control messages (AccountUnsubscribe, Reconnect).
    • For AccountSubscribe, enqueue/track only the long‑running "establish subscription" work in FuturesUnordered (e.g., a dedicated async function that performs account_subscribe + spawn of the update loop), while the main handle_msg path that updates subs and returns the oneshot response preserves message ordering.
  • Explicitly track in‑flight subscriptions:

    • Maintain a per‑Pubkey state for "subscription being created".
    • Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to subs).

Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it's acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.

Also applies to: lines 232-252, 257, 362-370, 376-381

* master:
  feat: use latest svm version (#657)
  chore: update solana account (#660)
@thlorenz thlorenz force-pushed the thlorenz/chainlink-perf branch from 0fa1d7b to 950590b Compare November 21, 2025 07:08
@thlorenz thlorenz merged commit ca51651 into master Nov 21, 2025
18 checks passed
@thlorenz thlorenz deleted the thlorenz/chainlink-perf branch November 21, 2025 09:44
thlorenz added a commit that referenced this pull request Nov 21, 2025
* master:
  perf: allow pubsub actor messages to be handled in parallel (#659)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf: each subscription is awaited making each of them sequential and possibly affecting performance

3 participants