Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
},
};

use futures_util::stream::FuturesUnordered;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions

The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:

  • AccountSubscribe:

    • Calls Self::add_sub(...).await.
    • add_sub inserts the AccountSubscription into subs before its first .await (account_subscribe().await).
  • AccountUnsubscribe:

    • Has no .await; its entire branch is synchronous and completes on the first poll of its future.

With FuturesUnordered (and without any explicit ordering guarantees), there is now a valid interleaving where:

  1. A subscribe message for pubkey is received first and its handle_msg future is pushed into pending_messages.
  2. A subsequent unsubscribe message for the same pubkey is received and its handle_msg future is also pushed.
  3. When pending_messages.next() runs, it may poll the unsubscribe future before the subscribe future’s first poll.
  4. In that case:
    • The unsubscribe branch checks subs.lock().get(&pubkey) and finds no entry yet, so it returns AccountSubscriptionDoesNotExist.
    • Afterwards, the subscribe future is polled for the first time, and add_sub inserts the AccountSubscription into the map and sets up the real subscription.

This results in:

  • The caller logically issuing subscribe then unsubscribe, but:
    • Seeing an unsubscribe error (AccountSubscriptionDoesNotExist).
    • Still ending up with a live subscription (because add_sub completes later and spawns the update task).
  • A leaked subscription that the caller believes was removed.
  • The comment in add_sub about “messages to this actor are processed in the order they are sent” no longer being true under the concurrent FuturesUnordered processing.

This is a semantic correctness regression introduced by the concurrency change, not just a theoretical concern.

Consider one of the following fixes:

  • Restrict concurrency to subscription establishment only:

    • Keep the actor loop sequential for control messages (AccountUnsubscribe, Reconnect).
    • For AccountSubscribe, enqueue/track only the long‑running “establish subscription” work in FuturesUnordered (e.g., a dedicated async function that performs account_subscribe + spawn of the update loop), while the main handle_msg path that updates subs and returns the oneshot response preserves message ordering.
  • Explicitly track in‑flight subscriptions:

    • Maintain a per‑Pubkey state for “subscription being created”.
    • Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to subs).

Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it’s acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.

Also applies to: 232-252, 257-257, 362-370, 376-381


🌐 Web query:

Does `futures_util::stream::FuturesUnordered` provide any guarantee about the order in which newly pushed futures are first polled, relative to each other? Specifically, can a future that was pushed later be polled and complete before a previously pushed future’s first poll?

💡 Result:

Short answer: No — there is no guarantee. FuturesUnordered is unordered: pushing a future does not poll it, and futures are driven only when they generate wake notifications, so a future pushed later can be polled (and even complete) before an earlier-pushed future’s first poll. Use FuturesOrdered if you need FIFO semantics. [1][2]

Sources:

  • futures-util/FuturesUnordered docs (behaviour and note that it “may complete in any order”). [1]
  • futures::stream::FuturesOrdered docs (contrast: preserves FIFO order). [2]

The web search confirms the review comment's analysis is correct. FuturesUnordered provides no guarantees about polling order—a future pushed later can be polled and complete before an earlier-pushed future's first poll; FuturesOrdered is the correct choice for FIFO semantics. This directly validates the identified race condition where an unsubscribe could complete before a preceding subscribe's critical section (insertion into subs) executes.

The review comment is accurate and comprehensive. The semantic correctness regression is real and actionable.


Concurrent handling via FuturesUnordered breaks subscribe→unsubscribe ordering assumptions and can leak subscriptions

The new FuturesUnordered-based loop makes handle_msg futures for all message types run concurrently. Because of how handle_msg is structured:

  • AccountSubscribe:

    • Calls Self::add_sub(...).await.
    • add_sub inserts the AccountSubscription into subs before its first .await (account_subscribe().await).
  • AccountUnsubscribe:

    • Has no .await; its entire branch is synchronous and completes on the first poll of its future.

With FuturesUnordered (which provides no ordering guarantees), there is now a valid interleaving where:

  1. A subscribe message for pubkey is received first and its handle_msg future is pushed into pending_messages.
  2. A subsequent unsubscribe message for the same pubkey is received and its handle_msg future is also pushed.
  3. When pending_messages.next() runs, it may poll the unsubscribe future before the subscribe future's first poll.
  4. In that case:
    • The unsubscribe branch checks subs.lock().get(&pubkey) and finds no entry yet, so it returns AccountSubscriptionDoesNotExist.
    • Afterwards, the subscribe future is polled for the first time, and add_sub inserts the AccountSubscription into the map and sets up the real subscription.

This results in:

  • The caller logically issuing subscribe then unsubscribe, but:
    • Seeing an unsubscribe error (AccountSubscriptionDoesNotExist).
    • Still ending up with a live subscription (because add_sub completes later and spawns the update task).
  • A leaked subscription that the caller believes was removed.
  • The comment in add_sub about "messages to this actor are processed in the order they are sent" no longer being true under the concurrent FuturesUnordered processing.

Consider one of the following fixes:

  • Restrict concurrency to subscription establishment only:

    • Keep the actor loop sequential for control messages (AccountUnsubscribe, Reconnect).
    • For AccountSubscribe, enqueue/track only the long‑running "establish subscription" work in FuturesUnordered (e.g., a dedicated async function that performs account_subscribe + spawn of the update loop), while the main handle_msg path that updates subs and returns the oneshot response preserves message ordering.
  • Explicitly track in‑flight subscriptions:

    • Maintain a per‑Pubkey state for "subscription being created".
    • Treat an unsubscribe for an in‑flight subscription as success and arrange for the creation path to observe and honor that cancellation (e.g., cancel before or immediately after adding to subs).

Additionally, on shutdown or when messages_receiver is closed, the loop breaks and drops pending_messages, abruptly canceling in‑flight handle_msg futures. This is a behavior change from the previous sequential implementation, where the currently executing handle_msg would finish before exit. Please double‑check that it's acceptable for callers to sometimes see pending subscribe/reconnect requests fail with dropped oneshots during shutdown.

Also applies to: lines 232-252, 257, 362-370, 376-381

use log::*;
use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding};
use solana_pubkey::Pubkey;
Expand Down Expand Up @@ -228,24 +229,32 @@ impl ChainPubsubActor {
let is_connected = self.is_connected.clone();
let abort_sender = self.abort_sender.clone();
tokio::spawn(async move {
let mut pending_messages = FuturesUnordered::new();
loop {
tokio::select! {
msg = messages_receiver.recv() => {
if let Some(msg) = msg {
Self::handle_msg(
subs.clone(),
pubsub_connection.clone(),
subscription_updates_sender.clone(),
pubsub_client_config.clone(),
abort_sender.clone(),
let subs = subs.clone();
let pubsub_connection = pubsub_connection.clone();
let subscription_updates_sender = subscription_updates_sender.clone();
let pubsub_client_config = pubsub_client_config.clone();
let abort_sender = abort_sender.clone();
let is_connected = is_connected.clone();
pending_messages.push(Self::handle_msg(
subs,
pubsub_connection,
subscription_updates_sender,
pubsub_client_config,
abort_sender,
client_id,
is_connected.clone(),
is_connected,
msg
).await;
));
} else {
break;
}
}
_ = pending_messages.next(), if !pending_messages.is_empty() => {}
_ = shutdown_token.cancelled() => {
break;
}
Expand Down
Loading