Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,12 @@ impl ChainPubsubActor {
match msg {
ChainPubsubActorMessage::AccountSubscribe { pubkey, response } => {
if !is_connected.load(Ordering::SeqCst) {
trace!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected");
send_ok(response, client_id);
warn!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected");
let _ = response.send(Err(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
format!("Client {client_id} disconnected"),
),
));
return;
}
let commitment_config = pubsub_client_config.commitment_config;
Expand All @@ -294,7 +298,8 @@ impl ChainPubsubActor {
is_connected,
commitment_config,
client_id,
);
)
.await;
}
ChainPubsubActorMessage::AccountUnsubscribe {
pubkey,
Expand Down Expand Up @@ -334,7 +339,7 @@ impl ChainPubsubActor {
}

#[allow(clippy::too_many_arguments)]
fn add_sub(
async fn add_sub(
pubkey: Pubkey,
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
Expand Down Expand Up @@ -375,33 +380,36 @@ impl ChainPubsubActor {
);
}

tokio::spawn(async move {
let config = RpcAccountInfoConfig {
commitment: Some(commitment_config),
encoding: Some(UiAccountEncoding::Base64Zstd),
..Default::default()
};
let (mut update_stream, unsubscribe) = match pubsub_connection
.account_subscribe(&pubkey, config.clone())
.await
{
Ok(res) => res,
Err(err) => {
error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}");
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
abort_sender,
is_connected.clone(),
);
let config = RpcAccountInfoConfig {
commitment: Some(commitment_config),
encoding: Some(UiAccountEncoding::Base64Zstd),
..Default::default()
};

return;
}
};
// Perform the subscription
let (mut update_stream, unsubscribe) = match pubsub_connection
.account_subscribe(&pubkey, config.clone())
.await
{
Ok(res) => res,
Err(err) => {
error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}");
Self::abort_and_signal_connection_issue(
client_id,
subs.clone(),
abort_sender,
is_connected.clone(),
);
// RPC failed - inform the requester
let _ = sub_response.send(Err(err.into()));
return;
}
};

// RPC succeeded - confirm to the requester that the subscription was made
let _ = sub_response.send(Ok(()));
// RPC succeeded - confirm to the requester that the subscription was made
let _ = sub_response.send(Ok(()));

tokio::spawn(async move {
// Now keep listening for updates and relay them to the
// subscription updates sender until it is cancelled
loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ pub mod mock {
) -> RemoteAccountProviderResult<()> {
if !*self.connected.lock().unwrap() {
return Err(
RemoteAccountProviderError::AccountSubscriptionsFailed(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
"mock: subscribe while disconnected".to_string(),
),
);
Expand Down Expand Up @@ -452,7 +452,7 @@ pub mod mock {
if *to_fail > 0 {
*to_fail -= 1;
return Err(
RemoteAccountProviderError::AccountSubscriptionsFailed(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
"mock: forced resubscribe failure".to_string(),
),
);
Expand Down
4 changes: 2 additions & 2 deletions magicblock-chainlink/src/remote_account_provider/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub enum RemoteAccountProviderError {
#[error("Failed to send message to pubsub actor: {0} ({1})")]
ChainPubsubActorSendError(String, String),

#[error("Failed to setup an account subscription ({0})")]
AccountSubscriptionsFailed(String),
#[error("Failed to manage subscriptions ({0})")]
AccountSubscriptionsTaskFailed(String),

#[error("Failed to resolve accounts ({0})")]
AccountResolutionsFailed(String),
Expand Down
2 changes: 1 addition & 1 deletion magicblock-chainlink/src/remote_account_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl<T: ChainRpcClient, U: ChainPubsubClient> RemoteAccountProvider<T, U> {
> {
if endpoints.is_empty() {
return Err(
RemoteAccountProviderError::AccountSubscriptionsFailed(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
"No endpoints provided".to_string(),
),
);
Expand Down
28 changes: 16 additions & 12 deletions magicblock-chainlink/src/submux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const DEBOUNCE_INTERVAL_MILLIS: u64 = 2_000;
mod debounce_state;
pub use self::debounce_state::DebounceState;

mod subscription_task;
pub use self::subscription_task::AccountSubscriptionTask;

#[derive(Debug, Clone, Copy, Default)]
pub struct DebounceConfig {
/// The deduplication window in milliseconds. If None, defaults to
Expand Down Expand Up @@ -143,7 +146,10 @@ struct ForwarderParams {
allowed_count: usize,
}

impl<T: ChainPubsubClient + ReconnectableClient> SubMuxClient<T> {
impl<T> SubMuxClient<T>
where
T: ChainPubsubClient + ReconnectableClient,
{
pub fn new(
clients: Vec<(Arc<T>, mpsc::Receiver<()>)>,
dedupe_window_millis: Option<u64>,
Expand Down Expand Up @@ -578,26 +584,24 @@ where
&self,
pubkey: Pubkey,
) -> RemoteAccountProviderResult<()> {
for client in &self.clients {
client.subscribe(pubkey).await?;
}
Ok(())
AccountSubscriptionTask::Subscribe(pubkey)
.process(self.clients.clone())
.await
}

async fn unsubscribe(
&self,
pubkey: Pubkey,
) -> RemoteAccountProviderResult<()> {
for client in &self.clients {
client.unsubscribe(pubkey).await?;
}
Ok(())
AccountSubscriptionTask::Unsubscribe(pubkey)
.process(self.clients.clone())
.await
}

async fn shutdown(&self) {
for client in &self.clients {
client.shutdown().await;
}
let _ = AccountSubscriptionTask::Shutdown
.process(self.clients.clone())
.await;
}

fn take_updates(&self) -> mpsc::Receiver<SubscriptionUpdate> {
Expand Down
100 changes: 100 additions & 0 deletions magicblock-chainlink/src/submux/subscription_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::Arc;

use futures_util::stream::{FuturesUnordered, StreamExt};
use log::*;
use solana_pubkey::Pubkey;
use tokio::sync::oneshot;

use crate::remote_account_provider::{
chain_pubsub_client::{ChainPubsubClient, ReconnectableClient},
errors::{RemoteAccountProviderError, RemoteAccountProviderResult},
};

#[derive(Clone)]
pub enum AccountSubscriptionTask {
Subscribe(Pubkey),
Unsubscribe(Pubkey),
Shutdown,
}

impl AccountSubscriptionTask {
pub async fn process<T>(
self,
clients: Vec<Arc<T>>,
) -> RemoteAccountProviderResult<()>
where
T: ChainPubsubClient + ReconnectableClient + Send + Sync + 'static,
{
use AccountSubscriptionTask::*;
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
let mut futures = FuturesUnordered::new();
for (i, client) in clients.iter().enumerate() {
let client = client.clone();
let task = self.clone();
futures.push(async move {
let result = match task {
Subscribe(pubkey) => client.subscribe(pubkey).await,
Unsubscribe(pubkey) => client.unsubscribe(pubkey).await,
Shutdown => {
client.shutdown().await;
Ok(())
}
};
(i, result)
});
}

let mut errors = Vec::new();
let mut tx = Some(tx);
let op_name = match self {
Subscribe(_) => "Subscribe",
Unsubscribe(_) => "Unsubscribe",
Shutdown => "Shutdown",
};

while let Some((i, result)) = futures.next().await {
match result {
Ok(_) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Ok(()));
}
}
Err(e) => {
if tx.is_none() {
// If at least one client returned an `OK` response, ignore any `ERR` responses
// after that. These clients will also trigger the reconnection logic
// which takes care of fixing the RPC connection.
warn!(
"{} failed for client {}: {:?}",
op_name, i, e
);
} else {
errors.push(format!("Client {}: {:?}", i, e));
}
}
}
}

if let Some(tx) = tx {
let msg = format!(
"All clients failed to {}: {}",
op_name.to_lowercase(),
errors.join(", ")
);
let _ = tx.send(Err(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
msg,
),
));
}
});

rx.await.unwrap_or_else(|_| {
Err(RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
"Orchestration task panicked or dropped channel".to_string(),
))
})
}
}