diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index becdfb73e..4604eec3a 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use futures_util::stream::FuturesUnordered; use log::*; use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding}; use solana_pubkey::Pubkey; @@ -228,24 +229,32 @@ impl ChainPubsubActor { let is_connected = self.is_connected.clone(); let abort_sender = self.abort_sender.clone(); tokio::spawn(async move { + let mut pending_messages = FuturesUnordered::new(); loop { tokio::select! { msg = messages_receiver.recv() => { if let Some(msg) = msg { - Self::handle_msg( - subs.clone(), - pubsub_connection.clone(), - subscription_updates_sender.clone(), - pubsub_client_config.clone(), - abort_sender.clone(), + let subs = subs.clone(); + let pubsub_connection = pubsub_connection.clone(); + let subscription_updates_sender = subscription_updates_sender.clone(); + let pubsub_client_config = pubsub_client_config.clone(); + let abort_sender = abort_sender.clone(); + let is_connected = is_connected.clone(); + pending_messages.push(Self::handle_msg( + subs, + pubsub_connection, + subscription_updates_sender, + pubsub_client_config, + abort_sender, client_id, - is_connected.clone(), + is_connected, msg - ).await; + )); } else { break; } } + _ = pending_messages.next(), if !pending_messages.is_empty() => {} _ = shutdown_token.cancelled() => { break; }