diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 4604eec3a..4e5a9649e 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -12,7 +12,8 @@ use log::*; use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding}; use solana_pubkey::Pubkey; use solana_rpc_client_api::{ - config::RpcAccountInfoConfig, response::Response as RpcResponse, + config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + response::Response as RpcResponse, }; use solana_sdk::{commitment_config::CommitmentConfig, sysvar::clock}; use tokio::{ @@ -80,6 +81,8 @@ pub struct ChainPubsubActor { messages_sender: mpsc::Sender, /// Map of subscriptions we are holding subscriptions: Arc>>, + /// Map of program account subscriptions we are holding + program_subs: Arc>>, /// Sends updates for any account subscription that is received via /// the [Self::pubsub_connection] subscription_updates_sender: mpsc::Sender, @@ -97,14 +100,25 @@ pub struct ChainPubsubActor { #[derive(Debug)] pub enum ChainPubsubActorMessage { + /// Subscribe to account updates for the given pubkey AccountSubscribe { pubkey: Pubkey, response: oneshot::Sender>, }, + /// Unsubscribe from account updates for the given pubkey AccountUnsubscribe { pubkey: Pubkey, response: oneshot::Sender>, }, + /// Subscribe to program account updates for the given program pubkey. + /// NOTE: only updates to accounts also subscribed to directly and thus + /// part of [ChainPubsubActor::subscriptions] will be sent via the + /// [ChainPubsubActor::subscription_updates_sender]. + ProgramSubscribe { + pubkey: Pubkey, + response: oneshot::Sender>, + }, + /// Attempt to reconnect the pubsub connection Reconnect { response: oneshot::Sender>, }, @@ -145,6 +159,7 @@ impl ChainPubsubActor { pubsub_connection, messages_sender, subscriptions: Default::default(), + program_subs: Default::default(), subscription_updates_sender, shutdown_token, client_id: CLIENT_ID.fetch_add(1, Ordering::SeqCst), @@ -168,6 +183,7 @@ impl ChainPubsubActor { .lock() .unwrap() .drain() + .chain(self.program_subs.lock().unwrap().drain()) .collect::>(); for (_, sub) in subs { sub.cancellation_token.cancel(); @@ -220,6 +236,7 @@ impl ChainPubsubActor { mut messages_receiver: mpsc::Receiver, ) { let subs = self.subscriptions.clone(); + let program_subs = self.program_subs.clone(); let shutdown_token = self.shutdown_token.clone(); let pubsub_client_config = self.pubsub_client_config.clone(); let subscription_updates_sender = @@ -235,6 +252,7 @@ impl ChainPubsubActor { msg = messages_receiver.recv() => { if let Some(msg) = msg { let subs = subs.clone(); + let program_subs = program_subs.clone(); let pubsub_connection = pubsub_connection.clone(); let subscription_updates_sender = subscription_updates_sender.clone(); let pubsub_client_config = pubsub_client_config.clone(); @@ -242,6 +260,7 @@ impl ChainPubsubActor { let is_connected = is_connected.clone(); pending_messages.push(Self::handle_msg( subs, + program_subs, pubsub_connection, subscription_updates_sender, pubsub_client_config, @@ -266,6 +285,7 @@ impl ChainPubsubActor { #[allow(clippy::too_many_arguments)] async fn handle_msg( subscriptions: Arc>>, + program_subs: Arc>>, pubsub_connection: Arc, subscription_updates_sender: mpsc::Sender, pubsub_client_config: PubsubClientConfig, @@ -280,7 +300,7 @@ impl ChainPubsubActor { ) { let _ = response.send(Ok(())).inspect_err(|err| { warn!( - "[client_id={client_id}] Failed to send msg ack: {err:?}" + "[client_id={client_id}] Failed to send msg ack: {err:?}" ); }); } @@ -301,6 +321,7 @@ impl ChainPubsubActor { pubkey, response, subscriptions, + program_subs, pubsub_connection, subscription_updates_sender, abort_sender, @@ -326,7 +347,7 @@ impl ChainPubsubActor { .get(&pubkey) { cancellation_token.cancel(); - let _ = response.send(Ok(())); + send_ok(response, client_id); } else { let _ = response .send(Err(RemoteAccountProviderError::AccountSubscriptionDoesNotExist( @@ -334,6 +355,31 @@ impl ChainPubsubActor { ))); } } + ChainPubsubActorMessage::ProgramSubscribe { pubkey, response } => { + if !is_connected.load(Ordering::SeqCst) { + warn!("[client_id={client_id}] Ignoring subscribe request for program {pubkey} because disconnected"); + let _ = response.send(Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!("Client {client_id} disconnected"), + ), + )); + return; + } + let commitment_config = pubsub_client_config.commitment_config; + Self::add_program_sub( + pubkey, + response, + subscriptions, + program_subs, + pubsub_connection, + subscription_updates_sender, + abort_sender, + is_connected, + commitment_config, + client_id, + ) + .await; + } ChainPubsubActorMessage::Reconnect { response } => { let result = Self::try_reconnect( pubsub_connection, @@ -352,6 +398,7 @@ impl ChainPubsubActor { pubkey: Pubkey, sub_response: oneshot::Sender>, subs: Arc>>, + program_subs: Arc>>, pubsub_connection: Arc, subscription_updates_sender: mpsc::Sender, abort_sender: mpsc::Sender<()>, @@ -406,6 +453,7 @@ impl ChainPubsubActor { Self::abort_and_signal_connection_issue( client_id, subs.clone(), + program_subs.clone(), abort_sender, is_connected.clone(), ); @@ -444,6 +492,7 @@ impl ChainPubsubActor { Self::abort_and_signal_connection_issue( client_id, subs.clone(), + program_subs.clone(), abort_sender.clone(), is_connected.clone(), ); @@ -470,6 +519,138 @@ impl ChainPubsubActor { .remove(&pubkey); }); } + #[allow(clippy::too_many_arguments)] + async fn add_program_sub( + pubkey: Pubkey, + sub_response: oneshot::Sender>, + subs: Arc>>, + program_subs: Arc>>, + pubsub_connection: Arc, + subscription_updates_sender: mpsc::Sender, + abort_sender: mpsc::Sender<()>, + is_connected: Arc, + commitment_config: CommitmentConfig, + client_id: u16, + ) { + if program_subs + .lock() + .expect("program subscriptions lock poisoned") + .contains_key(&pubkey) + { + trace!("[client_id={client_id}] Program subscription for {pubkey} already exists, ignoring add_program_sub request"); + let _ = sub_response.send(Ok(())); + return; + } + + trace!("[client_id={client_id}] Adding program subscription for {pubkey} with commitment {commitment_config:?}"); + + let cancellation_token = CancellationToken::new(); + + { + let mut program_subs_lock = program_subs + .lock() + .expect("program subscriptions lock poisoned"); + program_subs_lock.insert( + pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + }, + ); + } + + let config = RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(commitment_config), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() + }, + ..Default::default() + }; + + let (mut update_stream, unsubscribe) = match pubsub_connection + .program_subscribe(&pubkey, config.clone()) + .await + { + Ok(res) => res, + Err(err) => { + error!("[client_id={client_id}] Failed to subscribe to program {pubkey} {err:?}"); + Self::abort_and_signal_connection_issue( + client_id, + subs.clone(), + program_subs.clone(), + abort_sender, + is_connected.clone(), + ); + // RPC failed - inform the requester + let _ = sub_response.send(Err(err.into())); + return; + } + }; + + // RPC succeeded - confirm to the requester that the subscription was made + let _ = sub_response.send(Ok(())); + + tokio::spawn(async move { + // Now keep listening for updates and relay matching accounts to the + // subscription updates sender until it is cancelled + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + trace!("[client_id={client_id}] Subscription for program {pubkey} was cancelled"); + break; + } + update = update_stream.next() => { + if let Some(rpc_response) = update { + let pubkey = rpc_response.value.pubkey + .parse::().inspect_err(|err| { + warn!("[client_id={client_id}] Received invalid pubkey in program subscription update: {} {:?}", rpc_response.value.pubkey, err); + }); + if let Ok(pubkey) = pubkey { + if subs.lock().expect("subscriptions lock poisoned").contains_key(&pubkey) { + let _ = subscription_updates_sender.send(SubscriptionUpdate { + pubkey, + rpc_response: RpcResponse { + context: rpc_response.context, + value: rpc_response.value.account, + }, + }).await.inspect_err(|err| { + error!("[client_id={client_id}] Failed to send {pubkey} subscription update: {err:?}"); + }); + } + } + } else { + debug!("[client_id={client_id}] Subscription for program {pubkey} ended (EOF); signaling connection issue"); + Self::abort_and_signal_connection_issue( + client_id, + subs.clone(), + program_subs.clone(), + abort_sender.clone(), + is_connected.clone(), + ); + // Return early - abort_and_signal_connection_issue cancels all + // subscriptions, triggering cleanup via the cancellation path + // above. No need to run unsubscribe/cleanup here. + return; + } + } + } + } + + // Clean up subscription with timeout to prevent hanging on dead sockets + if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) + .await + .is_err() + { + warn!( + "[client_id={client_id}] unsubscribe timed out for program {pubkey}" + ); + } + program_subs + .lock() + .expect("program_subs lock poisoned") + .remove(&pubkey); + }); + } async fn try_reconnect( pubsub_connection: Arc, @@ -514,6 +695,7 @@ impl ChainPubsubActor { fn abort_and_signal_connection_issue( client_id: u16, subscriptions: Arc>>, + program_subs: Arc>>, abort_sender: mpsc::Sender<()>, is_connected: Arc, ) { @@ -527,18 +709,28 @@ impl ChainPubsubActor { debug!("[client_id={client_id}] aborting"); - let drained = { - let mut subs_lock = subscriptions.lock().unwrap(); - std::mem::take(&mut *subs_lock) - }; - let drained_len = drained.len(); - for (_, AccountSubscription { cancellation_token }) in drained { - cancellation_token.cancel(); + fn drain_subscriptions( + client_id: u16, + subscriptions: Arc>>, + ) { + let drained_subs = { + let mut subs_lock = subscriptions.lock().unwrap(); + std::mem::take(&mut *subs_lock) + }; + let drained_len = drained_subs.len(); + for (_, AccountSubscription { cancellation_token }) in drained_subs + { + cancellation_token.cancel(); + } + debug!( + "[client_id={client_id}] canceled {} subscriptions", + drained_len + ); } - debug!( - "[client_id={client_id}] canceled {} subscriptions", - drained_len - ); + + drain_subscriptions(client_id, subscriptions); + drain_subscriptions(client_id, program_subs); + // Use try_send to avoid blocking and naturally coalesce signals let _ = abort_sender.try_send(()).inspect_err(|err| { // Channel full is expected when reconnect is already in progress diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs index ec1ba2e63..876e6b755 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs @@ -14,7 +14,10 @@ use solana_pubkey::Pubkey; use solana_pubsub_client::nonblocking::pubsub_client::{ PubsubClient, PubsubClientResult, }; -use solana_rpc_client_api::{config::RpcAccountInfoConfig, response::Response}; +use solana_rpc_client_api::{ + config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + response::{Response, RpcKeyedAccount}, +}; use solana_sdk::commitment_config::CommitmentConfig; use tokio::{ sync::{mpsc, oneshot, Mutex as AsyncMutex}, @@ -33,6 +36,10 @@ type SubscribeResult = PubsubClientResult<( BoxStream<'static, Response>, UnsubscribeFn, )>; +type ProgramSubscribeResult = PubsubClientResult<( + BoxStream<'static, Response>, + UnsubscribeFn, +)>; const MAX_RECONNECT_ATTEMPTS: usize = 5; const RECONNECT_ATTEMPT_DELAY: Duration = Duration::from_millis(500); @@ -79,6 +86,29 @@ impl PubSubConnection { Ok((stream, unsub)) } + pub async fn program_subscribe( + &self, + program_id: &Pubkey, + config: RpcProgramAccountsConfig, + ) -> ProgramSubscribeResult { + let client = self.client.load(); + let config = Some(config.clone()); + let (stream, unsub) = + client.program_subscribe(program_id, config).await?; + + // SAFETY: + // the returned stream depends on the used client, which is only ever dropped + // if the connection has been terminated, at which point the stream is useless + // and will be discarded as well, thus it's safe lifetime extension to 'static + let stream = unsafe { + mem::transmute::< + BoxStream<'_, Response>, + BoxStream<'static, Response>, + >(stream) + }; + Ok((stream, unsub)) + } + pub async fn reconnect(&self) -> PubsubClientResult<()> { // Prevents multiple reconnect attempts running concurrently let _guard = match self.reconnect_guard.try_lock() { @@ -121,6 +151,10 @@ pub trait ChainPubsubClient: Send + Sync + Clone + 'static { &self, pubkey: Pubkey, ) -> RemoteAccountProviderResult<()>; + async fn subscribe_program( + &self, + program_id: Pubkey, + ) -> RemoteAccountProviderResult<()>; async fn unsubscribe( &self, pubkey: Pubkey, @@ -217,6 +251,24 @@ impl ChainPubsubClient for ChainPubsubClientImpl { })? } + async fn subscribe_program( + &self, + program_id: Pubkey, + ) -> RemoteAccountProviderResult<()> { + let (tx, rx) = oneshot::channel(); + self.actor + .send_msg(ChainPubsubActorMessage::ProgramSubscribe { + pubkey: program_id, + response: tx, + }) + .await?; + + rx.await + .inspect_err(|err| { + warn!("ChainPubsubClientImpl::subscribe_program - RecvError occurred while awaiting subscription response for {}: {err:?}. This indicates the actor sender was dropped without responding.", program_id); + })? + } + async fn unsubscribe( &self, pubkey: Pubkey, @@ -400,6 +452,22 @@ pub mod mock { Ok(()) } + async fn subscribe_program( + &self, + _program_id: Pubkey, + ) -> RemoteAccountProviderResult<()> { + if !*self.connected.lock().unwrap() { + return Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + "mock: subscribe_program while disconnected" + .to_string(), + ), + ); + } + // Program subscriptions don't track individual accounts in the mock + Ok(()) + } + async fn unsubscribe( &self, pubkey: Pubkey, diff --git a/magicblock-chainlink/src/remote_account_provider/config.rs b/magicblock-chainlink/src/remote_account_provider/config.rs index 1b8516c5d..5c6646f5c 100644 --- a/magicblock-chainlink/src/remote_account_provider/config.rs +++ b/magicblock-chainlink/src/remote_account_provider/config.rs @@ -1,4 +1,7 @@ +use std::collections::HashSet; + use magicblock_config::config::LifecycleMode; +use solana_pubkey::Pubkey; use super::{RemoteAccountProviderError, RemoteAccountProviderResult}; @@ -8,9 +11,15 @@ pub const DEFAULT_SUBSCRIBED_ACCOUNTS_LRU_CAPACITY: usize = 10_000; #[derive(Debug, Clone)] pub struct RemoteAccountProviderConfig { + /// How many accounts to monitor for changes subscribed_accounts_lru_capacity: usize, + /// Lifecycle mode of the validator lifecycle_mode: LifecycleMode, + /// Whether to enable metrics for account subscriptions enable_subscription_metrics: bool, + /// Set of program accounts to always subscribe to as backup + /// for direct account subs + program_subs: HashSet, } impl RemoteAccountProviderConfig { @@ -39,6 +48,7 @@ impl RemoteAccountProviderConfig { subscribed_accounts_lru_capacity, lifecycle_mode, enable_subscription_metrics, + ..Default::default() }) } @@ -60,6 +70,10 @@ impl RemoteAccountProviderConfig { pub fn enable_subscription_metrics(&self) -> bool { self.enable_subscription_metrics } + + pub fn program_subs(&self) -> &HashSet { + &self.program_subs + } } impl Default for RemoteAccountProviderConfig { @@ -69,6 +83,7 @@ impl Default for RemoteAccountProviderConfig { DEFAULT_SUBSCRIBED_ACCOUNTS_LRU_CAPACITY, lifecycle_mode: LifecycleMode::default(), enable_subscription_metrics: true, + program_subs: vec![dlp::id()].into_iter().collect(), } } } diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 93e6bd337..c15739224 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -381,6 +381,18 @@ impl RemoteAccountProvider { let submux = SubMuxClient::new(pubsubs, subscribed_accounts.clone(), None); + if !config.program_subs().is_empty() { + debug!( + "Subscribing to program accounts: [{}]", + pubkeys_str( + &config.program_subs().iter().cloned().collect::>() + ) + ); + for program_id in config.program_subs() { + submux.subscribe_program(*program_id).await?; + } + } + RemoteAccountProvider::< ChainRpcClientImpl, SubMuxClient, diff --git a/magicblock-chainlink/src/submux/mod.rs b/magicblock-chainlink/src/submux/mod.rs index 9870734cb..033503bc7 100644 --- a/magicblock-chainlink/src/submux/mod.rs +++ b/magicblock-chainlink/src/submux/mod.rs @@ -135,9 +135,10 @@ where /// Accounts that should never be debounced, namely the clock sysvar account /// which we use to track the latest remote slot. never_debounce: HashSet, - /// Number of clients that must confirm a subscription for it to be considered active. required_subscription_confirmations: usize, + /// Map of program account subscriptions we are holding inside the pubsub clients + program_subs: Arc>>, } // Parameters for the long-running forwarder loop, grouped to avoid @@ -201,8 +202,13 @@ where let never_debounce: HashSet = vec![solana_sdk::sysvar::clock::ID].into_iter().collect(); - let clients = - Self::spawn_reconnectors(clients, subscribed_accounts_tracker); + let program_subs: Arc>> = Default::default(); + + let clients = Self::spawn_reconnectors( + clients, + subscribed_accounts_tracker, + program_subs.clone(), + ); let required_subscription_confirmations = { let n = clients.len(); @@ -219,6 +225,7 @@ where debounce_states: debounce_states.clone(), never_debounce, required_subscription_confirmations, + program_subs, }; // Spawn background tasks @@ -233,12 +240,14 @@ where fn spawn_reconnectors( clients: Vec<(Arc, mpsc::Receiver<()>)>, subscribed_accounts_tracker: Arc, + program_subs: Arc>>, ) -> Vec> { let mut clients_only = Vec::with_capacity(clients.len()); for (client, mut abort_rx) in clients.into_iter() { clients_only.push(client.clone()); let subscribed_accounts_tracker = subscribed_accounts_tracker.clone(); + let program_subs = program_subs.clone(); tokio::spawn(async move { while abort_rx.recv().await.is_some() { // Drain any duplicate abort signals to coalesce reconnect attempts @@ -250,6 +259,7 @@ where Self::reconnect_client_with_backoff( client.clone(), subscribed_accounts_tracker.clone(), + program_subs.clone(), ) .await; } @@ -261,6 +271,7 @@ where async fn reconnect_client_with_backoff( client: Arc, accounts_tracker: Arc, + program_subs: Arc>>, ) { fn fib_with_max(n: u64) -> u64 { let (mut a, mut b) = (0u64, 1u64); @@ -274,7 +285,13 @@ where let mut attempt = 0; loop { attempt += 1; - if Self::reconnect_client(client.clone(), &accounts_tracker).await { + if Self::reconnect_client( + client.clone(), + &accounts_tracker, + &program_subs, + ) + .await + { debug!( "Successfully reconnected client after {} attempts", attempt @@ -294,6 +311,7 @@ where async fn reconnect_client( client: Arc, accounts_tracker: &Arc, + program_subs: &Arc>>, ) -> bool { if let Err(err) = client.try_reconnect().await { debug!("Failed to reconnect client: {:?}", err); @@ -302,18 +320,27 @@ where // Resubscribe all accounts from the authoritative tracker. // This ensures subscriptions are restored even if all clients lost their state // during disconnect/abort. - let subs = accounts_tracker.subscribed_accounts(); + let account_subs = accounts_tracker.subscribed_accounts(); + + if let Err(err) = client.resub_multiple(account_subs).await { + debug!("Failed to resubscribe accounts after reconnect: {:?}", err); + return false; + } - match client.resub_multiple(subs).await { - Err(err) => { + // Resubscribe all program subscriptions + let programs: HashSet = + program_subs.lock().unwrap().iter().copied().collect(); + for program_id in programs { + if let Err(err) = client.subscribe_program(program_id).await { debug!( - "Failed to resubscribe accounts after reconnect: {:?}", - err + "Failed to resubscribe program {} after reconnect: {:?}", + program_id, err ); - false + return false; } - Ok(_) => true, } + + true } fn spawn_dedup_pruner(&self) { @@ -611,6 +638,29 @@ where .await } + async fn subscribe_program( + &self, + program_id: Pubkey, + ) -> RemoteAccountProviderResult<()> { + // Check if we already have this program subscription + { + let mut subs = self.program_subs.lock().unwrap(); + if subs.contains(&program_id) { + warn!("Program subscription already exists for {}", program_id); + return Ok(()); + } + // Add to program_subs before subscribing to clients + subs.insert(program_id); + } + + AccountSubscriptionTask::SubscribeProgram( + program_id, + self.required_subscription_confirmations, + ) + .process(self.clients.clone()) + .await + } + async fn unsubscribe( &self, pubkey: Pubkey, diff --git a/magicblock-chainlink/src/submux/subscription_task.rs b/magicblock-chainlink/src/submux/subscription_task.rs index 4d99f9436..852e49176 100644 --- a/magicblock-chainlink/src/submux/subscription_task.rs +++ b/magicblock-chainlink/src/submux/subscription_task.rs @@ -13,6 +13,7 @@ use crate::remote_account_provider::{ #[derive(Clone)] pub enum AccountSubscriptionTask { Subscribe(Pubkey, usize), + SubscribeProgram(Pubkey, usize), Unsubscribe(Pubkey), Shutdown, } @@ -22,6 +23,7 @@ impl AccountSubscriptionTask { use AccountSubscriptionTask::*; match self { Subscribe(_, _) => "Subscribe", + SubscribeProgram(_, _) => "SubscribeProgram", Unsubscribe(_) => "Unsubscribe", Shutdown => "Shutdown", } @@ -40,7 +42,7 @@ impl AccountSubscriptionTask { let total_clients = clients.len(); let required_confirmations = match &self { - Subscribe(_, n) => *n, + Subscribe(_, n) | SubscribeProgram(_, n) => *n, _ => 1, }; @@ -54,15 +56,15 @@ impl AccountSubscriptionTask { ); } - if let Subscribe(_, _) = self { - if required_confirmations == 0 { - return Err( - RemoteAccountProviderError::AccountSubscriptionsTaskFailed( - "Required confirmations must be greater than zero" - .to_string(), - ), - ); - } + if matches!(self, Subscribe(_, _) | SubscribeProgram(_, _)) + && required_confirmations == 0 + { + return Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + "Required confirmations must be greater than zero" + .to_string(), + ), + ); } let (tx, rx) = oneshot::channel(); @@ -77,6 +79,9 @@ impl AccountSubscriptionTask { futures.push(async move { let result = match task { Subscribe(pubkey, _) => client.subscribe(pubkey).await, + SubscribeProgram(program_id, _) => { + client.subscribe_program(program_id).await + } Unsubscribe(pubkey) => client.unsubscribe(pubkey).await, Shutdown => { client.shutdown().await;