diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index a8259f71e..becdfb73e 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -279,8 +279,12 @@ impl ChainPubsubActor { match msg { ChainPubsubActorMessage::AccountSubscribe { pubkey, response } => { if !is_connected.load(Ordering::SeqCst) { - trace!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected"); - send_ok(response, client_id); + warn!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected"); + let _ = response.send(Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!("Client {client_id} disconnected"), + ), + )); return; } let commitment_config = pubsub_client_config.commitment_config; @@ -294,7 +298,8 @@ impl ChainPubsubActor { is_connected, commitment_config, client_id, - ); + ) + .await; } ChainPubsubActorMessage::AccountUnsubscribe { pubkey, @@ -334,7 +339,7 @@ impl ChainPubsubActor { } #[allow(clippy::too_many_arguments)] - fn add_sub( + async fn add_sub( pubkey: Pubkey, sub_response: oneshot::Sender>, subs: Arc>>, @@ -375,33 +380,36 @@ impl ChainPubsubActor { ); } - tokio::spawn(async move { - let config = RpcAccountInfoConfig { - commitment: Some(commitment_config), - encoding: Some(UiAccountEncoding::Base64Zstd), - ..Default::default() - }; - let (mut update_stream, unsubscribe) = match pubsub_connection - .account_subscribe(&pubkey, config.clone()) - .await - { - Ok(res) => res, - Err(err) => { - error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}"); - Self::abort_and_signal_connection_issue( - client_id, - subs.clone(), - abort_sender, - is_connected.clone(), - ); + let config = RpcAccountInfoConfig { + commitment: Some(commitment_config), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() + }; - return; - } - }; + // Perform the subscription + let (mut update_stream, unsubscribe) = match pubsub_connection + .account_subscribe(&pubkey, config.clone()) + .await + { + Ok(res) => res, + Err(err) => { + error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}"); + Self::abort_and_signal_connection_issue( + client_id, + subs.clone(), + abort_sender, + is_connected.clone(), + ); + // RPC failed - inform the requester + let _ = sub_response.send(Err(err.into())); + return; + } + }; - // RPC succeeded - confirm to the requester that the subscription was made - let _ = sub_response.send(Ok(())); + // RPC succeeded - confirm to the requester that the subscription was made + let _ = sub_response.send(Ok(())); + tokio::spawn(async move { // Now keep listening for updates and relay them to the // subscription updates sender until it is cancelled loop { diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs index 719d12345..e8a014611 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs @@ -388,7 +388,7 @@ pub mod mock { ) -> RemoteAccountProviderResult<()> { if !*self.connected.lock().unwrap() { return Err( - RemoteAccountProviderError::AccountSubscriptionsFailed( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( "mock: subscribe while disconnected".to_string(), ), ); @@ -452,7 +452,7 @@ pub mod mock { if *to_fail > 0 { *to_fail -= 1; return Err( - RemoteAccountProviderError::AccountSubscriptionsFailed( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( "mock: forced resubscribe failure".to_string(), ), ); diff --git a/magicblock-chainlink/src/remote_account_provider/errors.rs b/magicblock-chainlink/src/remote_account_provider/errors.rs index db52b1c0a..c1ad41405 100644 --- a/magicblock-chainlink/src/remote_account_provider/errors.rs +++ b/magicblock-chainlink/src/remote_account_provider/errors.rs @@ -29,8 +29,8 @@ pub enum RemoteAccountProviderError { #[error("Failed to send message to pubsub actor: {0} ({1})")] ChainPubsubActorSendError(String, String), - #[error("Failed to setup an account subscription ({0})")] - AccountSubscriptionsFailed(String), + #[error("Failed to manage subscriptions ({0})")] + AccountSubscriptionsTaskFailed(String), #[error("Failed to resolve accounts ({0})")] AccountResolutionsFailed(String), diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 3c3700a75..c41318767 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -345,7 +345,7 @@ impl RemoteAccountProvider { > { if endpoints.is_empty() { return Err( - RemoteAccountProviderError::AccountSubscriptionsFailed( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( "No endpoints provided".to_string(), ), ); diff --git a/magicblock-chainlink/src/submux/mod.rs b/magicblock-chainlink/src/submux/mod.rs index 8c4e1f9d6..ea5e29f8b 100644 --- a/magicblock-chainlink/src/submux/mod.rs +++ b/magicblock-chainlink/src/submux/mod.rs @@ -23,6 +23,9 @@ const DEBOUNCE_INTERVAL_MILLIS: u64 = 2_000; mod debounce_state; pub use self::debounce_state::DebounceState; +mod subscription_task; +pub use self::subscription_task::AccountSubscriptionTask; + #[derive(Debug, Clone, Copy, Default)] pub struct DebounceConfig { /// The deduplication window in milliseconds. If None, defaults to @@ -143,7 +146,10 @@ struct ForwarderParams { allowed_count: usize, } -impl SubMuxClient { +impl SubMuxClient +where + T: ChainPubsubClient + ReconnectableClient, +{ pub fn new( clients: Vec<(Arc, mpsc::Receiver<()>)>, dedupe_window_millis: Option, @@ -578,26 +584,24 @@ where &self, pubkey: Pubkey, ) -> RemoteAccountProviderResult<()> { - for client in &self.clients { - client.subscribe(pubkey).await?; - } - Ok(()) + AccountSubscriptionTask::Subscribe(pubkey) + .process(self.clients.clone()) + .await } async fn unsubscribe( &self, pubkey: Pubkey, ) -> RemoteAccountProviderResult<()> { - for client in &self.clients { - client.unsubscribe(pubkey).await?; - } - Ok(()) + AccountSubscriptionTask::Unsubscribe(pubkey) + .process(self.clients.clone()) + .await } async fn shutdown(&self) { - for client in &self.clients { - client.shutdown().await; - } + let _ = AccountSubscriptionTask::Shutdown + .process(self.clients.clone()) + .await; } fn take_updates(&self) -> mpsc::Receiver { diff --git a/magicblock-chainlink/src/submux/subscription_task.rs b/magicblock-chainlink/src/submux/subscription_task.rs new file mode 100644 index 000000000..233e227e9 --- /dev/null +++ b/magicblock-chainlink/src/submux/subscription_task.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use futures_util::stream::{FuturesUnordered, StreamExt}; +use log::*; +use solana_pubkey::Pubkey; +use tokio::sync::oneshot; + +use crate::remote_account_provider::{ + chain_pubsub_client::{ChainPubsubClient, ReconnectableClient}, + errors::{RemoteAccountProviderError, RemoteAccountProviderResult}, +}; + +#[derive(Clone)] +pub enum AccountSubscriptionTask { + Subscribe(Pubkey), + Unsubscribe(Pubkey), + Shutdown, +} + +impl AccountSubscriptionTask { + pub async fn process( + self, + clients: Vec>, + ) -> RemoteAccountProviderResult<()> + where + T: ChainPubsubClient + ReconnectableClient + Send + Sync + 'static, + { + use AccountSubscriptionTask::*; + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let mut futures = FuturesUnordered::new(); + for (i, client) in clients.iter().enumerate() { + let client = client.clone(); + let task = self.clone(); + futures.push(async move { + let result = match task { + Subscribe(pubkey) => client.subscribe(pubkey).await, + Unsubscribe(pubkey) => client.unsubscribe(pubkey).await, + Shutdown => { + client.shutdown().await; + Ok(()) + } + }; + (i, result) + }); + } + + let mut errors = Vec::new(); + let mut tx = Some(tx); + let op_name = match self { + Subscribe(_) => "Subscribe", + Unsubscribe(_) => "Unsubscribe", + Shutdown => "Shutdown", + }; + + while let Some((i, result)) = futures.next().await { + match result { + Ok(_) => { + if let Some(tx) = tx.take() { + let _ = tx.send(Ok(())); + } + } + Err(e) => { + if tx.is_none() { + // If at least one client returned an `OK` response, ignore any `ERR` responses + // after that. These clients will also trigger the reconnection logic + // which takes care of fixing the RPC connection. + warn!( + "{} failed for client {}: {:?}", + op_name, i, e + ); + } else { + errors.push(format!("Client {}: {:?}", i, e)); + } + } + } + } + + if let Some(tx) = tx { + let msg = format!( + "All clients failed to {}: {}", + op_name.to_lowercase(), + errors.join(", ") + ); + let _ = tx.send(Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + msg, + ), + )); + } + }); + + rx.await.unwrap_or_else(|_| { + Err(RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + "Orchestration task panicked or dropped channel".to_string(), + )) + }) + } +}