Skip to content

Conversation

@thlorenz
Copy link
Collaborator

@thlorenz thlorenz commented Nov 20, 2025

Summary

Ensure RemoteAccountProvider::subscribe waits for successful subscription and submux subscribes to
inner clients in parallel.

Problem

When RemoteAccountProvider::subscribe was called, the code had two critical issues:

  1. Premature response before RPC completion: The add_sub function was spawning the RPC subscription call asynchronously but responding to the caller immediately after spawn, not after the subscription actually succeeded.

  2. Sequential subscription to all clients: The SubMuxClient::subscribe method was subscribing to all clients sequentially and only returning when all succeeded, rather than waiting for just one client to succeed.

This meant that:

  • The caller would get an Ok() response without verifying the subscription actually worked
  • If the subscription failed, the error would not be propagated back to the caller
  • The LRU cache would be updated before verifying the subscription actually worked
  • Subscription would fail if any single client was disconnected

Root Causes

Issue 1: Sequential subscription in SubMuxClient

Original code in submux/mod.rs:

async fn subscribe(&self, pubkey: Pubkey) -> RemoteAccountProviderResult<()> {
    for client in &self.clients {
        client.subscribe(pubkey).await?  // Fails if ANY client fails
    }
    Ok(())
}

This required ALL clients to succeed, instead of waiting for the first success.

Issue 2: Response sent before RPC call in ChainPubsubActor

Original code in chain_pubsub_actor.rs:

fn add_sub(...) {
    tokio::spawn(async move {
        // Make RPC call here (async)
        let (mut update_stream, unsubscribe) = match pubsub_connection
            .account_subscribe(&pubkey, config.clone())
            .await
        {
            Ok(res) => res,
            Err(err) => return,
        };

        // Send response AFTER spawn
        let _ = sub_response.send(Ok(()));

        // Listen for updates
    });
    // Function returns before RPC call completes
}

The response was sent inside the spawned task, so the function returned before the RPC call completed.

Issue 3: Disconnected actor returning success

When the actor was disconnected, it would still return Ok(()):

if !is_connected.load(Ordering::SeqCst) {
    send_ok(response, client_id);  // Returns success despite being disconnected
    return;
}

Solution

Change 1: Parallel subscription with first-success semantics

Created a new AccountSubscriptionTask enum and process() method in submux/subscription_task.rs that:

  • Spawns subscription tasks to all clients in parallel using FuturesUnordered
  • Returns Ok() as soon as ANY client succeeds
  • Collects errors from all clients only if ALL fail
  • Ignores errors from clients after the first success
pub enum AccountSubscriptionTask {
    Subscribe(Pubkey),
    Unsubscribe(Pubkey),
    Shutdown,
}

impl AccountSubscriptionTask {
    pub async fn process<T>(self, clients: Vec<Arc<T>>) -> RemoteAccountProviderResult<()> {
        tokio::spawn(async move {
            let mut futures = FuturesUnordered::new();
            // Spawn all client subscriptions in parallel
            for (i, client) in clients.iter().enumerate() {
                futures.push(async move {
                    let result = match task {
                        Subscribe(pubkey) => client.subscribe(pubkey).await,
                        // ...
                    };
                    (i, result)
                });
            }

            let mut tx = Some(tx);
            while let Some((i, result)) = futures.next().await {
                match result {
                    Ok(_) => {
                        // First success - send and drop tx
                        if let Some(tx) = tx.take() {
                            let _ = tx.send(Ok(()));
                        }
                    }
                    Err(e) => {
                        if tx.is_none() {
                            // Already succeeded once, ignore subsequent errors
                            warn!("Error from client {}: {:?}", i, e);
                        } else {
                            errors.push(format!("Client {}: {:?}", i, e));
                        }
                    }
                }
            }
        });
    }
}

Updated SubMuxClient to use this:

async fn subscribe(&self, pubkey: Pubkey) -> RemoteAccountProviderResult<()> {
    AccountSubscriptionTask::Subscribe(pubkey)
        .process(self.clients.clone())
        .await
}

Change 2: Make add_sub async and verify RPC before responding

Changed add_sub from synchronous function to async, and moved the RPC call outside the spawned task:

async fn add_sub(...) {
    // ... setup ...

    let config = RpcAccountInfoConfig { /* ... */ };

    // Perform the subscription BEFORE spawning
    let (mut update_stream, unsubscribe) = match pubsub_connection
        .account_subscribe(&pubkey, config.clone())
        .await
    {
        Ok(res) => res,
        Err(err) => {
            error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}");
            Self::abort_and_signal_connection_issue(/* ... */);
            // RPC failed - inform the requester
            let _ = sub_response.send(Err(err.into()));
            return;
        }
    };

    // RPC succeeded - confirm to the requester BEFORE spawning
    let _ = sub_response.send(Ok(()));

    // NOW spawn the background task to listen for updates
    tokio::spawn(async move {
        // Listen for updates and relay them
        loop {
            tokio::select! {
                _ = cancellation_token.cancelled() => break,
                update = update_stream.next() => {
                    // Relay update
                }
            }
        }
        // Cleanup
    });
}

Updated caller to await it:

Self::add_sub(...).await;  // Now awaits completion of RPC call

Change 3: Return error when subscribing while disconnected

Changed actor to return error instead of success:

if !is_connected.load(Ordering::SeqCst) {
    warn!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected");
    let _ = response.send(Err(
        RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
            format!("Client {client_id} disconnected"),
        ),
    ));
    return;
}

Change 4: Error type consistency

Renamed and clarified error type from AccountSubscriptionsFailed to AccountSubscriptionsTaskFailed across all files for consistency.

Subscription Flow (After Fix)

  1. Entry Point: RemoteAccountProvider::subscribe(pubkey) is called
  2. Calls register_subscription(pubkey)SubMuxClient::subscribe(pubkey)
  3. SubMuxClient creates AccountSubscriptionTask::Subscribe(pubkey) and calls process()
  4. Parallel Subscription: process() spawns subscription tasks to ALL clients in parallel using FuturesUnordered
  5. First Success Wins: Returns Ok() as soon as ANY client succeeds
  6. Each client sends AccountSubscribe message to its ChainPubsubActor and awaits response
  7. Actor Validates Connection: Checks is_connected flag - returns error if disconnected
  8. RPC Call Happens Now (in the actor, not spawned): Calls account_subscribe() and awaits result
  9. On RPC Success: Sends Ok() response back (response goes all the way back to caller)
  10. On RPC Failure: Sends Err response back (tries next client)
  11. After RPC Confirmed: Spawns background task to listen for update stream
  12. Completion: Caller receives response only after RPC subscription is actually established

Result

After these fixes, RemoteAccountProvider::subscribe will:

  • Wait until at least one client successfully establishes an RPC subscription
  • Return error if all clients fail (rather than succeeding without confirming)
  • Fail fast if a client is disconnected
  • Properly propagate RPC errors back to the caller

Summary by CodeRabbit

  • Bug Fixes

    • Immediate failure notification when subscription setup fails or a client disconnects; EOF and cancellation now trigger proper cleanup and error replies.
  • Refactor

    • Subscription setup now occurs before per-subscription listeners to avoid races; lifecycle and cancellation flows made more robust.
  • New Features

    • Centralized subscription task to coordinate subscribe/unsubscribe/shutdown across clients.
  • Other

    • Renamed and standardized subscription error variant and message for clearer reporting.

✏️ Tip: You can customize this high-level summary in your review settings.

@github-actions
Copy link

github-actions bot commented Nov 20, 2025

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 20, 2025

Walkthrough

Converts per-account subscription setup to async: add_sub now awaits account_subscribe before spawning the per-subscription listener, inserts subscriptions earlier to avoid races, reports subscribe failures explicitly, adds a task-based AccountSubscriptionTask for cross-client orchestration, and renames an error variant.

Changes

Cohort / File(s) Summary
Actor: async subscription flow
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
add_sub changed to async fn; subscription setup (account_subscribe) is awaited before spawning listener task; inserts AccountSubscription into map prior to await; explicit error reply on subscribe failure; listener loop moved into spawned task and now handles cancellation and EOF as connection issues.
Client & errors: variant rename
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, magicblock-chainlink/src/remote_account_provider/errors.rs, magicblock-chainlink/src/remote_account_provider/mod.rs
Renamed public enum variant RemoteAccountProviderError::AccountSubscriptionsFailed(String)AccountSubscriptionsTaskFailed(String) and updated associated message and mock-return sites.
Subscription orchestration: new task module
magicblock-chainlink/src/submux/mod.rs, magicblock-chainlink/src/submux/subscription_task.rs
Added subscription_task module and pub use export; introduced AccountSubscriptionTask enum (Subscribe/Unsubscribe/Shutdown) with pub async fn process to orchestrate subscribe/unsubscribe/shutdown across multiple ChainPubsubClient instances and delegated SubMuxClient methods to it.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant ChainPubsubActor
    participant RemoteClient as PubsubClient
    participant SubTask as SpawnedListener

    rect rgba(200,230,255,0.6)
    Caller->>ChainPubsubActor: Subscribe(pubkey) request
    ChainPubsubActor->>ChainPubsubActor: insert AccountSubscription into map
    ChainPubsubActor->>RemoteClient: account_subscribe(config) (await)
    alt subscribe fails
        ChainPubsubActor-->>Caller: Err(AccountSubscriptionsTaskFailed)
        ChainPubsubActor->>ChainPubsubActor: abort/cleanup
    else subscribe succeeds
        ChainPubsubActor-->>Caller: Ok(())
        ChainPubsubActor->>SubTask: spawn(listener with update_stream + cancel token)
        loop updates
            RemoteClient-->>SubTask: Update
            SubTask->>ChainPubsubActor: forward update
        end
        alt EOF / cancellation
            SubTask->>ChainPubsubActor: signal connection issue / abort
        end
    end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Pay special attention to:
    • async/await transitions, Send/Sync/lifetime bounds for spawned tasks and add_sub signature change
    • correctness of inserting subscription before awaiting and potential unsubscribe races
    • propagation and usage of the renamed error variant across public APIs and tests
    • cancellation, EOF handling, and cleanup ordering in spawned listener tasks

Possibly related PRs

Suggested reviewers

  • GabrielePicco
  • bmuddha

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main changes: making subscriptions awaited until established and performing them in parallel, which aligns with the core improvements described in the PR objectives.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch thlorenz/await-subs

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 49c435d and be7479e.

📒 Files selected for processing (2)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (4 hunks)
  • magicblock-chainlink/src/submux/mod.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.

Applied to files:

  • magicblock-chainlink/src/submux/mod.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-10-21T14:00:54.642Z
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579

Applied to files:

  • magicblock-chainlink/src/submux/mod.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-11-19T09:34:37.890Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.890Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-10-26T16:53:29.820Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 587
File: magicblock-chainlink/src/remote_account_provider/mod.rs:134-0
Timestamp: 2025-10-26T16:53:29.820Z
Learning: In magicblock-chainlink/src/remote_account_provider/mod.rs, the `Endpoint::separate_pubsub_url_and_api_key()` method uses `split_once("?api-key=")` because the api-key parameter is always the only query parameter right after `?`. No additional query parameter parsing is needed for this use case.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-11-20T08:57:07.189Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 650
File: magicblock-chainlink/src/submux/subscription_task.rs:13-99
Timestamp: 2025-11-20T08:57:07.189Z
Learning: In the magicblock-validator repository, avoid posting review comments that merely confirm code is correct or matches intended behavior without providing actionable feedback, suggestions for improvement, or identifying potential issues. Such confirmatory comments are considered unhelpful noise by the maintainers.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
🧬 Code graph analysis (2)
magicblock-chainlink/src/submux/mod.rs (4)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (6)
  • unsubscribe (123-126)
  • unsubscribe (219-235)
  • unsubscribe (402-410)
  • shutdown (127-127)
  • shutdown (185-187)
  • shutdown (412-412)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)
  • unsubscribe (797-832)
magicblock-chainlink/src/testing/chain_pubsub.rs (1)
  • unsubscribe (45-57)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
  • shutdown (160-175)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (2)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (3)
  • unsubscribe (123-126)
  • unsubscribe (219-235)
  • unsubscribe (402-410)
magicblock-chainlink/src/testing/chain_pubsub.rs (1)
  • unsubscribe (45-57)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Project
  • GitHub Check: run_make_ci_test
🔇 Additional comments (11)
magicblock-chainlink/src/submux/mod.rs (5)

26-27: LGTM: Module addition for subscription task orchestration.

The new subscription_task module and AccountSubscriptionTask re-export centralize subscription logic as described in the PR objectives.


149-152: LGTM: Generic constraints support the new delegation pattern.

The trait bounds ChainPubsubClient + ReconnectableClient are correctly applied to enable the AccountSubscriptionTask orchestration.


583-590: LGTM: Subscribe delegation enables parallel subscription attempts.

The delegation to AccountSubscriptionTask::Subscribe implements the PR objective of attempting subscriptions to all inner clients in parallel, returning on the first successful response.


592-599: LGTM: Unsubscribe delegation follows consistent pattern.

The delegation centralizes unsubscribe orchestration across multiple clients, maintaining consistency with the subscribe flow.


601-605: Verify: Shutdown result intentionally ignored?

The AccountSubscriptionTask::Shutdown.process() result is explicitly discarded with let _ =. While shutdown operations are typically best-effort, silently ignoring potential errors might hide issues during graceful shutdown. Consider whether at least logging failures would be beneficial for observability.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (6)

280-289: LGTM: Proper error propagation when disconnected.

The change from silent success to explicit error response (AccountSubscriptionsTaskFailed) ensures callers are correctly informed when subscription requests fail due to disconnection. This aligns with the PR objective of propagating RPC errors to callers.


291-302: LGTM: Awaiting async add_sub ensures subscription confirmation.

Awaiting add_sub implements the PR objective of waiting until the RPC subscription is established before returning to the caller.


342-342: LGTM: Async signature enables RPC confirmation.

Converting add_sub to async fn is essential for awaiting the RPC account_subscribe call and ensuring subscription confirmation before responding to the caller.


363-381: LGTM: Early insertion prevents race with unsubscribe.

Inserting the subscription into the map before awaiting the RPC call prevents a race condition where an unsubscribe message could be processed before the cancellation token is available. The comment clearly explains the ordering assumption. On RPC failure, the cleanup is handled via abort_and_signal_connection_issue.


383-410: LGTM: RPC subscription errors properly propagated.

The refactor moves account_subscribe outside the spawned task, ensuring:

  1. RPC errors are propagated to the caller (lines 395-406)
  2. Success response is sent only after RPC confirmation (line 410)
  3. Connection failures trigger abort for all subscriptions, which is appropriate since the entire connection is likely compromised

This directly implements the PR objective of awaiting subscription establishment.


412-462: LGTM: Spawned listener task with proper cleanup.

The background task correctly handles:

  1. Cancellation (lines 417-420): Breaks on cancellation signal
  2. Updates (lines 422-432): Forwards updates to sender with appropriate logging
  3. EOF (lines 433-445): Treats stream end as connection failure, triggers abort for all subscriptions (appropriate since connection is likely compromised), and returns early to avoid duplicate cleanup
  4. Cleanup (lines 450-462): Unsubscribes with timeout guard (per learnings, unsubscribe returns unit) and removes from map

Based on learnings


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 02780fa and 49c435d.

📒 Files selected for processing (6)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (7 hunks)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (2 hunks)
  • magicblock-chainlink/src/remote_account_provider/errors.rs (1 hunks)
  • magicblock-chainlink/src/remote_account_provider/mod.rs (1 hunks)
  • magicblock-chainlink/src/submux/mod.rs (4 hunks)
  • magicblock-chainlink/src/submux/subscription_task.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/mod.rs
  • magicblock-chainlink/src/remote_account_provider/errors.rs
  • magicblock-chainlink/src/submux/mod.rs
  • magicblock-chainlink/src/submux/subscription_task.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
📚 Learning: 2025-11-19T09:34:37.890Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.890Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/mod.rs
  • magicblock-chainlink/src/remote_account_provider/errors.rs
  • magicblock-chainlink/src/submux/subscription_task.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
📚 Learning: 2025-10-26T16:53:29.820Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 587
File: magicblock-chainlink/src/remote_account_provider/mod.rs:134-0
Timestamp: 2025-10-26T16:53:29.820Z
Learning: In magicblock-chainlink/src/remote_account_provider/mod.rs, the `Endpoint::separate_pubsub_url_and_api_key()` method uses `split_once("?api-key=")` because the api-key parameter is always the only query parameter right after `?`. No additional query parameter parsing is needed for this use case.

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/mod.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
📚 Learning: 2025-10-21T14:00:54.642Z
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579

Applied to files:

  • magicblock-chainlink/src/remote_account_provider/errors.rs
  • magicblock-chainlink/src/submux/mod.rs
  • magicblock-chainlink/src/submux/subscription_task.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
🧬 Code graph analysis (3)
magicblock-chainlink/src/submux/mod.rs (3)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (6)
  • unsubscribe (123-126)
  • unsubscribe (219-235)
  • unsubscribe (402-410)
  • shutdown (127-127)
  • shutdown (185-187)
  • shutdown (412-412)
magicblock-chainlink/src/testing/chain_pubsub.rs (1)
  • unsubscribe (45-57)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
  • shutdown (160-175)
magicblock-chainlink/src/submux/subscription_task.rs (2)
magicblock-chainlink/src/submux/mod.rs (2)
  • clients (220-223)
  • new (153-164)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (2)
  • new (46-54)
  • new (311-322)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (3)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (3)
  • unsubscribe (123-126)
  • unsubscribe (219-235)
  • unsubscribe (402-410)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)
  • unsubscribe (797-832)
magicblock-chainlink/src/testing/chain_pubsub.rs (1)
  • unsubscribe (45-57)
🪛 GitHub Actions: Run CI - Format
magicblock-chainlink/src/submux/mod.rs

[error] 1387-1387: Rust fmt check failed due to formatting changes detected by cargo fmt. Apply formatting with 'cargo +nightly fmt' to fix this file.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs

[error] 548-548: Rust fmt check failed due to formatting changes detected by cargo fmt. Apply formatting with 'cargo +nightly fmt' to fix this file.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Project
  • GitHub Check: run_make_ci_test
🔇 Additional comments (12)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)

346-352: Error variant rename is consistent with new subscription orchestration

Using RemoteAccountProviderError::AccountSubscriptionsTaskFailed("No endpoints provided".to_string()) keeps the existing control flow while aligning with the new, more generic subscription-task error variant. No functional concerns here.

magicblock-chainlink/src/remote_account_provider/errors.rs (1)

32-33: Renamed error variant/message matches broader usage

AccountSubscriptionsTaskFailed with “Failed to manage subscriptions ({0})” better describes both orchestration failures (SubMux) and setup failures (e.g., no endpoints). Shape compatibility is preserved, so downstream handling remains intact.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (2)

385-400: Mock subscribe error now uses unified subscription-task variant

The disconnected-path error in ChainPubsubClientMock::subscribe now surfaces as AccountSubscriptionsTaskFailed("mock: subscribe while disconnected"), which matches the new unified subscription-task error variant and keeps tests’ intent intact.


444-458: Mock resubscribe failure wired to AccountSubscriptionsTaskFailed

fail_next_resubscriptions now triggers AccountSubscriptionsTaskFailed("mock: forced resubscribe failure"), which integrates cleanly with SubMuxClient::reconnect_client’s retry logic and keeps the failure mode explicit for tests.

magicblock-chainlink/src/submux/mod.rs (3)

26-28: New subscription_task module and re-export are straightforward

Re‑exporting AccountSubscriptionTask from subscription_task cleanly exposes the orchestration primitive to the rest of the module without altering existing public SubMuxClient APIs.


149-152: Generic impl header change is purely cosmetic

Switching to impl<T> SubMuxClient<T> where T: ChainPubsubClient + ReconnectableClient keeps the same trait bounds while improving readability; no behavioral impact.


583-605: SubMuxClient now delegates subscribe/unsubscribe/shutdown to AccountSubscriptionTask

subscribe, unsubscribe, and shutdown now call AccountSubscriptionTask::{Subscribe,Unsubscribe,Shutdown}.process(self.clients.clone()), which:

  • attempts the operation on all inner clients in parallel,
  • returns once the first client succeeds,
  • and bubbles up AccountSubscriptionsTaskFailed only if every client fails.

This is aligned with the PR goal of parallelizing subscriptions while still ensuring all inner clients eventually execute the operation.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (5)

282-288: LGTM - Improved error propagation on disconnect.

The change from trace logging to warning and explicit error response ensures that subscription attempts on disconnected clients fail fast with clear error signaling. This properly implements the PR objective of propagating RPC failures to callers.


301-302: LGTM - Correctly awaits async subscription.

The .await correctly handles the now-async add_sub function, ensuring subscription establishment completes before processing the next message. This is central to the PR's fix.


343-343: LGTM - Core change enabling synchronous subscription.

Converting add_sub to async fn is the key change that enables awaiting the RPC subscription before returning, directly addressing the PR's objective.


384-411: LGTM - Core fix ensures subscription confirmed before returning.

The refactored flow correctly addresses the PR objectives:

  1. RPC account_subscribe is awaited outside tokio::spawn
  2. Ok(()) is sent only after RPC succeeds (Line 411)
  3. Errors are propagated to the requester (Line 405)

This ensures callers receive confirmation only after the RPC subscription is established, preventing premature LRU cache updates mentioned in the PR description.

Note: Lines 398-403 treat any RPC subscription failure as a connection issue (via abort_and_signal_connection_issue), aborting all active subscriptions. This is aggressive—pubkey-specific errors (e.g., invalid pubkey) would also trigger a full abort. Based on the PR description this appears intentional, but verify this behavior is desired.


413-463: LGTM - Update listener correctly handles cancellation and EOF.

The spawned task properly:

  • Listens for updates until cancelled or EOF
  • Handles cancellation with cleanup (unsubscribe + map removal)
  • Handles EOF by signaling connection issue and returning early (Lines 434-446)

The early return on EOF is correct—abort_and_signal_connection_issue already removes from the map and cancels tokens. Not calling unsubscribe when the connection is dead avoids potential hangs. Based on learnings, the timeout wrapper at Lines 452-459 correctly guards against hanging on dead sockets.

Based on learnings

Copy link
Contributor

@bmuddha bmuddha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@thlorenz
Copy link
Collaborator Author

@bmuddha pointed out a bottleneck that this PR introduces and it is tracked in this issue to be fixed in a subsequent PR.

@thlorenz thlorenz merged commit 40d83a3 into master Nov 20, 2025
19 checks passed
@thlorenz thlorenz deleted the thlorenz/await-subs branch November 20, 2025 09:32
bmuddha pushed a commit that referenced this pull request Nov 20, 2025
## Summary

Ensure RemoteAccountProvider::subscribe waits for successful
subscription and submux subscribes to
inner clients in parallel.

## Problem

When `RemoteAccountProvider::subscribe` was called, the code had two
critical issues:

1. **Premature response before RPC completion**: The `add_sub` function
was spawning the RPC subscription call asynchronously but responding to
the caller immediately after spawn, not after the subscription actually
succeeded.

2. **Sequential subscription to all clients**: The
`SubMuxClient::subscribe` method was subscribing to all clients
sequentially and only returning when all succeeded, rather than waiting
for just one client to succeed.

This meant that:
- The caller would get an `Ok()` response without verifying the
subscription actually worked
- If the subscription failed, the error would not be propagated back to
the caller
- The LRU cache would be updated before verifying the subscription
actually worked
- Subscription would fail if any single client was disconnected

## Root Causes

### Issue 1: Sequential subscription in SubMuxClient

Original code in `submux/mod.rs`:
```rust
async fn subscribe(&self, pubkey: Pubkey) -> RemoteAccountProviderResult<()> {
    for client in &self.clients {
        client.subscribe(pubkey).await?  // Fails if ANY client fails
    }
    Ok(())
}
```

This required ALL clients to succeed, instead of waiting for the first
success.

### Issue 2: Response sent before RPC call in ChainPubsubActor

Original code in `chain_pubsub_actor.rs`:
```rust
fn add_sub(...) {
    tokio::spawn(async move {
        // Make RPC call here (async)
        let (mut update_stream, unsubscribe) = match pubsub_connection
            .account_subscribe(&pubkey, config.clone())
            .await
        {
            Ok(res) => res,
            Err(err) => return,
        };

        // Send response AFTER spawn
        let _ = sub_response.send(Ok(()));

        // Listen for updates
    });
    // Function returns before RPC call completes
}
```

The response was sent inside the spawned task, so the function returned
before the RPC call completed.

### Issue 3: Disconnected actor returning success

When the actor was disconnected, it would still return `Ok(())`:
```rust
if !is_connected.load(Ordering::SeqCst) {
    send_ok(response, client_id);  // Returns success despite being disconnected
    return;
}
```

## Solution

### Change 1: Parallel subscription with first-success semantics

Created a new `AccountSubscriptionTask` enum and `process()` method in
`submux/subscription_task.rs` that:
- Spawns subscription tasks to all clients in parallel using
`FuturesUnordered`
- Returns `Ok()` as soon as ANY client succeeds
- Collects errors from all clients only if ALL fail
- Ignores errors from clients after the first success

```rust
pub enum AccountSubscriptionTask {
    Subscribe(Pubkey),
    Unsubscribe(Pubkey),
    Shutdown,
}

impl AccountSubscriptionTask {
    pub async fn process<T>(self, clients: Vec<Arc<T>>) -> RemoteAccountProviderResult<()> {
        tokio::spawn(async move {
            let mut futures = FuturesUnordered::new();
            // Spawn all client subscriptions in parallel
            for (i, client) in clients.iter().enumerate() {
                futures.push(async move {
                    let result = match task {
                        Subscribe(pubkey) => client.subscribe(pubkey).await,
                        // ...
                    };
                    (i, result)
                });
            }

            let mut tx = Some(tx);
            while let Some((i, result)) = futures.next().await {
                match result {
                    Ok(_) => {
                        // First success - send and drop tx
                        if let Some(tx) = tx.take() {
                            let _ = tx.send(Ok(()));
                        }
                    }
                    Err(e) => {
                        if tx.is_none() {
                            // Already succeeded once, ignore subsequent errors
                            warn!("Error from client {}: {:?}", i, e);
                        } else {
                            errors.push(format!("Client {}: {:?}", i, e));
                        }
                    }
                }
            }
        });
    }
}
```

Updated `SubMuxClient` to use this:
```rust
async fn subscribe(&self, pubkey: Pubkey) -> RemoteAccountProviderResult<()> {
    AccountSubscriptionTask::Subscribe(pubkey)
        .process(self.clients.clone())
        .await
}
```

### Change 2: Make add_sub async and verify RPC before responding

Changed `add_sub` from synchronous function to async, and moved the RPC
call outside the spawned task:

```rust
async fn add_sub(...) {
    // ... setup ...

    let config = RpcAccountInfoConfig { /* ... */ };

    // Perform the subscription BEFORE spawning
    let (mut update_stream, unsubscribe) = match pubsub_connection
        .account_subscribe(&pubkey, config.clone())
        .await
    {
        Ok(res) => res,
        Err(err) => {
            error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}");
            Self::abort_and_signal_connection_issue(/* ... */);
            // RPC failed - inform the requester
            let _ = sub_response.send(Err(err.into()));
            return;
        }
    };

    // RPC succeeded - confirm to the requester BEFORE spawning
    let _ = sub_response.send(Ok(()));

    // NOW spawn the background task to listen for updates
    tokio::spawn(async move {
        // Listen for updates and relay them
        loop {
            tokio::select! {
                _ = cancellation_token.cancelled() => break,
                update = update_stream.next() => {
                    // Relay update
                }
            }
        }
        // Cleanup
    });
}
```

Updated caller to await it:
```rust
Self::add_sub(...).await;  // Now awaits completion of RPC call
```

### Change 3: Return error when subscribing while disconnected

Changed actor to return error instead of success:
```rust
if !is_connected.load(Ordering::SeqCst) {
    warn!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected");
    let _ = response.send(Err(
        RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
            format!("Client {client_id} disconnected"),
        ),
    ));
    return;
}
```

### Change 4: Error type consistency

Renamed and clarified error type from `AccountSubscriptionsFailed` to
`AccountSubscriptionsTaskFailed` across all files for consistency.

## Subscription Flow (After Fix)

1. **Entry Point**: `RemoteAccountProvider::subscribe(pubkey)` is called
2. Calls `register_subscription(pubkey)` →
`SubMuxClient::subscribe(pubkey)`
3. `SubMuxClient` creates `AccountSubscriptionTask::Subscribe(pubkey)`
and calls `process()`
4. **Parallel Subscription**: `process()` spawns subscription tasks to
ALL clients in parallel using `FuturesUnordered`
5. **First Success Wins**: Returns `Ok()` as soon as ANY client succeeds
6. Each client sends `AccountSubscribe` message to its
`ChainPubsubActor` and awaits response
7. **Actor Validates Connection**: Checks `is_connected` flag - returns
error if disconnected
8. **RPC Call Happens Now** (in the actor, not spawned): Calls
`account_subscribe()` and awaits result
9. **On RPC Success**: Sends `Ok()` response back (response goes all the
way back to caller)
10. **On RPC Failure**: Sends `Err` response back (tries next client)
11. **After RPC Confirmed**: Spawns background task to listen for update
stream
12. **Completion**: Caller receives response only after RPC subscription
is actually established

## Result

After these fixes, `RemoteAccountProvider::subscribe` will:
- Wait until at least one client successfully establishes an RPC
subscription
- Return error if all clients fail (rather than succeeding without
confirming)
- Fail fast if a client is disconnected
- Properly propagate RPC errors back to the caller



## <!-- This is an auto-generated comment: release notes by
coderabbit.ai -->
## Summary by CodeRabbit

* **Bug Fixes**
* Immediate failure notification when subscription setup fails or a
client disconnects; EOF and cancellation now trigger proper cleanup and
error replies.

* **Refactor**
* Subscription setup now occurs before per-subscription listeners to
avoid races; lifecycle and cancellation flows made more robust.

* **New Features**
* Centralized subscription task to coordinate
subscribe/unsubscribe/shutdown across clients.

* **Other**
* Renamed and standardized subscription error variant and message for
clearer reporting.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
thlorenz added a commit that referenced this pull request Nov 21, 2025
* master:
  feat: use latest svm version (#657)
  chore: update solana account (#660)
  fix: better transaction diagnostics & rent exemption check (#642)
  chore: add access-control-max-age header to cors (#654)
  fix(aperture): prevent racy getLatestBlockhash (#649)
  fix: await until sub is established and perform them in parallel (#650)
  feat: persist all accounts (#648)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants