Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 206 additions & 14 deletions magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use log::*;
use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding};
use solana_pubkey::Pubkey;
use solana_rpc_client_api::{
config::RpcAccountInfoConfig, response::Response as RpcResponse,
config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
response::Response as RpcResponse,
};
use solana_sdk::{commitment_config::CommitmentConfig, sysvar::clock};
use tokio::{
Expand Down Expand Up @@ -80,6 +81,8 @@ pub struct ChainPubsubActor {
messages_sender: mpsc::Sender<ChainPubsubActorMessage>,
/// Map of subscriptions we are holding
subscriptions: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
/// Map of program account subscriptions we are holding
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
/// Sends updates for any account subscription that is received via
/// the [Self::pubsub_connection]
subscription_updates_sender: mpsc::Sender<SubscriptionUpdate>,
Expand All @@ -97,14 +100,25 @@ pub struct ChainPubsubActor {

#[derive(Debug)]
pub enum ChainPubsubActorMessage {
/// Subscribe to account updates for the given pubkey
AccountSubscribe {
pubkey: Pubkey,
response: oneshot::Sender<RemoteAccountProviderResult<()>>,
},
/// Unsubscribe from account updates for the given pubkey
AccountUnsubscribe {
pubkey: Pubkey,
response: oneshot::Sender<RemoteAccountProviderResult<()>>,
},
/// Subscribe to program account updates for the given program pubkey.
/// NOTE: only updates to accounts also subscribed to directly and thus
/// part of [ChainPubsubActor::subscriptions] will be sent via the
/// [ChainPubsubActor::subscription_updates_sender].
ProgramSubscribe {
pubkey: Pubkey,
response: oneshot::Sender<RemoteAccountProviderResult<()>>,
},
/// Attempt to reconnect the pubsub connection
Reconnect {
response: oneshot::Sender<RemoteAccountProviderResult<()>>,
},
Expand Down Expand Up @@ -145,6 +159,7 @@ impl ChainPubsubActor {
pubsub_connection,
messages_sender,
subscriptions: Default::default(),
program_subs: Default::default(),
subscription_updates_sender,
shutdown_token,
client_id: CLIENT_ID.fetch_add(1, Ordering::SeqCst),
Expand All @@ -168,6 +183,7 @@ impl ChainPubsubActor {
.lock()
.unwrap()
.drain()
.chain(self.program_subs.lock().unwrap().drain())
.collect::<Vec<_>>();
for (_, sub) in subs {
sub.cancellation_token.cancel();
Expand Down Expand Up @@ -220,6 +236,7 @@ impl ChainPubsubActor {
mut messages_receiver: mpsc::Receiver<ChainPubsubActorMessage>,
) {
let subs = self.subscriptions.clone();
let program_subs = self.program_subs.clone();
let shutdown_token = self.shutdown_token.clone();
let pubsub_client_config = self.pubsub_client_config.clone();
let subscription_updates_sender =
Expand All @@ -235,13 +252,15 @@ impl ChainPubsubActor {
msg = messages_receiver.recv() => {
if let Some(msg) = msg {
let subs = subs.clone();
let program_subs = program_subs.clone();
let pubsub_connection = pubsub_connection.clone();
let subscription_updates_sender = subscription_updates_sender.clone();
let pubsub_client_config = pubsub_client_config.clone();
let abort_sender = abort_sender.clone();
let is_connected = is_connected.clone();
pending_messages.push(Self::handle_msg(
subs,
program_subs,
pubsub_connection,
subscription_updates_sender,
pubsub_client_config,
Expand All @@ -266,6 +285,7 @@ impl ChainPubsubActor {
#[allow(clippy::too_many_arguments)]
async fn handle_msg(
subscriptions: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
pubsub_connection: Arc<PubSubConnection>,
subscription_updates_sender: mpsc::Sender<SubscriptionUpdate>,
pubsub_client_config: PubsubClientConfig,
Expand All @@ -280,7 +300,7 @@ impl ChainPubsubActor {
) {
let _ = response.send(Ok(())).inspect_err(|err| {
warn!(
"[client_id={client_id}] Failed to send msg ack: {err:?}"
"[client_id={client_id}] Failed to send msg ack: {err:?}"
);
});
}
Expand All @@ -301,6 +321,7 @@ impl ChainPubsubActor {
pubkey,
response,
subscriptions,
program_subs,
pubsub_connection,
subscription_updates_sender,
abort_sender,
Expand All @@ -326,14 +347,39 @@ impl ChainPubsubActor {
.get(&pubkey)
{
cancellation_token.cancel();
let _ = response.send(Ok(()));
send_ok(response, client_id);
} else {
let _ = response
.send(Err(RemoteAccountProviderError::AccountSubscriptionDoesNotExist(
pubkey.to_string(),
)));
}
}
ChainPubsubActorMessage::ProgramSubscribe { pubkey, response } => {
if !is_connected.load(Ordering::SeqCst) {
warn!("[client_id={client_id}] Ignoring subscribe request for program {pubkey} because disconnected");
let _ = response.send(Err(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
format!("Client {client_id} disconnected"),
),
));
return;
}
let commitment_config = pubsub_client_config.commitment_config;
Self::add_program_sub(
pubkey,
response,
subscriptions,
program_subs,
pubsub_connection,
subscription_updates_sender,
abort_sender,
is_connected,
commitment_config,
client_id,
)
.await;
}
ChainPubsubActorMessage::Reconnect { response } => {
let result = Self::try_reconnect(
pubsub_connection,
Expand All @@ -352,6 +398,7 @@ impl ChainPubsubActor {
pubkey: Pubkey,
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
pubsub_connection: Arc<PubSubConnection>,
subscription_updates_sender: mpsc::Sender<SubscriptionUpdate>,
abort_sender: mpsc::Sender<()>,
Expand Down Expand Up @@ -406,6 +453,7 @@ impl ChainPubsubActor {
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
program_subs.clone(),
abort_sender,
is_connected.clone(),
);
Expand Down Expand Up @@ -444,6 +492,7 @@ impl ChainPubsubActor {
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
program_subs.clone(),
abort_sender.clone(),
is_connected.clone(),
);
Expand All @@ -470,6 +519,138 @@ impl ChainPubsubActor {
.remove(&pubkey);
});
}
#[allow(clippy::too_many_arguments)]
async fn add_program_sub(
pubkey: Pubkey,
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
pubsub_connection: Arc<PubSubConnection>,
subscription_updates_sender: mpsc::Sender<SubscriptionUpdate>,
abort_sender: mpsc::Sender<()>,
is_connected: Arc<AtomicBool>,
commitment_config: CommitmentConfig,
client_id: u16,
) {
if program_subs
.lock()
.expect("program subscriptions lock poisoned")
.contains_key(&pubkey)
{
trace!("[client_id={client_id}] Program subscription for {pubkey} already exists, ignoring add_program_sub request");
let _ = sub_response.send(Ok(()));
return;
}

trace!("[client_id={client_id}] Adding program subscription for {pubkey} with commitment {commitment_config:?}");

let cancellation_token = CancellationToken::new();

{
let mut program_subs_lock = program_subs
.lock()
.expect("program subscriptions lock poisoned");
program_subs_lock.insert(
pubkey,
AccountSubscription {
cancellation_token: cancellation_token.clone(),
},
);
}

let config = RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(commitment_config),
encoding: Some(UiAccountEncoding::Base64Zstd),
..Default::default()
},
..Default::default()
};

let (mut update_stream, unsubscribe) = match pubsub_connection
.program_subscribe(&pubkey, config.clone())
.await
{
Ok(res) => res,
Err(err) => {
error!("[client_id={client_id}] Failed to subscribe to program {pubkey} {err:?}");
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
program_subs.clone(),
abort_sender,
is_connected.clone(),
);
// RPC failed - inform the requester
let _ = sub_response.send(Err(err.into()));
return;
}
};

// RPC succeeded - confirm to the requester that the subscription was made
let _ = sub_response.send(Ok(()));

tokio::spawn(async move {
// Now keep listening for updates and relay matching accounts to the
// subscription updates sender until it is cancelled
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
trace!("[client_id={client_id}] Subscription for program {pubkey} was cancelled");
break;
}
update = update_stream.next() => {
if let Some(rpc_response) = update {
let pubkey = rpc_response.value.pubkey
.parse::<Pubkey>().inspect_err(|err| {
warn!("[client_id={client_id}] Received invalid pubkey in program subscription update: {} {:?}", rpc_response.value.pubkey, err);
});
if let Ok(pubkey) = pubkey {
if subs.lock().expect("subscriptions lock poisoned").contains_key(&pubkey) {
let _ = subscription_updates_sender.send(SubscriptionUpdate {
pubkey,
rpc_response: RpcResponse {
context: rpc_response.context,
value: rpc_response.value.account,
},
}).await.inspect_err(|err| {
error!("[client_id={client_id}] Failed to send {pubkey} subscription update: {err:?}");
});
}
}
} else {
debug!("[client_id={client_id}] Subscription for program {pubkey} ended (EOF); signaling connection issue");
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
program_subs.clone(),
abort_sender.clone(),
is_connected.clone(),
);
// Return early - abort_and_signal_connection_issue cancels all
// subscriptions, triggering cleanup via the cancellation path
// above. No need to run unsubscribe/cleanup here.
return;
}
}
}
}

// Clean up subscription with timeout to prevent hanging on dead sockets
if tokio::time::timeout(Duration::from_secs(2), unsubscribe())
.await
.is_err()
{
warn!(
"[client_id={client_id}] unsubscribe timed out for program {pubkey}"
);
}
program_subs
.lock()
.expect("program_subs lock poisoned")
.remove(&pubkey);
});
}

async fn try_reconnect(
pubsub_connection: Arc<PubSubConnection>,
Expand Down Expand Up @@ -514,6 +695,7 @@ impl ChainPubsubActor {
fn abort_and_signal_connection_issue(
client_id: u16,
subscriptions: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
abort_sender: mpsc::Sender<()>,
is_connected: Arc<AtomicBool>,
) {
Expand All @@ -527,18 +709,28 @@ impl ChainPubsubActor {

debug!("[client_id={client_id}] aborting");

let drained = {
let mut subs_lock = subscriptions.lock().unwrap();
std::mem::take(&mut *subs_lock)
};
let drained_len = drained.len();
for (_, AccountSubscription { cancellation_token }) in drained {
cancellation_token.cancel();
fn drain_subscriptions(
client_id: u16,
subscriptions: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
) {
let drained_subs = {
let mut subs_lock = subscriptions.lock().unwrap();
std::mem::take(&mut *subs_lock)
};
let drained_len = drained_subs.len();
for (_, AccountSubscription { cancellation_token }) in drained_subs
{
cancellation_token.cancel();
}
debug!(
"[client_id={client_id}] canceled {} subscriptions",
drained_len
);
}
debug!(
"[client_id={client_id}] canceled {} subscriptions",
drained_len
);

drain_subscriptions(client_id, subscriptions);
drain_subscriptions(client_id, program_subs);

// Use try_send to avoid blocking and naturally coalesce signals
let _ = abort_sender.try_send(()).inspect_err(|err| {
// Channel full is expected when reconnect is already in progress
Expand Down
Loading