From e9c3a8f7f358a7aad205a8d5b8d41e6a0737750b Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 20 Nov 2025 21:40:04 +0400 Subject: [PATCH] perf: allow pubsub actor messages to be handled in parallel --- .../chain_pubsub_actor.rs | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index becdfb73e..4604eec3a 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use futures_util::stream::FuturesUnordered; use log::*; use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding}; use solana_pubkey::Pubkey; @@ -228,24 +229,32 @@ impl ChainPubsubActor { let is_connected = self.is_connected.clone(); let abort_sender = self.abort_sender.clone(); tokio::spawn(async move { + let mut pending_messages = FuturesUnordered::new(); loop { tokio::select! { msg = messages_receiver.recv() => { if let Some(msg) = msg { - Self::handle_msg( - subs.clone(), - pubsub_connection.clone(), - subscription_updates_sender.clone(), - pubsub_client_config.clone(), - abort_sender.clone(), + let subs = subs.clone(); + let pubsub_connection = pubsub_connection.clone(); + let subscription_updates_sender = subscription_updates_sender.clone(); + let pubsub_client_config = pubsub_client_config.clone(); + let abort_sender = abort_sender.clone(); + let is_connected = is_connected.clone(); + pending_messages.push(Self::handle_msg( + subs, + pubsub_connection, + subscription_updates_sender, + pubsub_client_config, + abort_sender, client_id, - is_connected.clone(), + is_connected, msg - ).await; + )); } else { break; } } + _ = pending_messages.next(), if !pending_messages.is_empty() => {} _ = shutdown_token.cancelled() => { break; }