Skip to content
3 changes: 3 additions & 0 deletions magicblock-chainlink/src/chainlink/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum ChainlinkError {
#[error("Failed to find account that was just resolved {0}")]
ResolvedAccountCouldNoLongerBeFound(Pubkey),

#[error("Failed to find companion account that was just resolved {0}")]
ResolvedCompanionAccountCouldNoLongerBeFound(Pubkey),

#[error("Failed to subscribe to account {0}: {1:?}")]
FailedToSubscribeToAccount(Pubkey, RemoteAccountProviderError),

Expand Down
235 changes: 159 additions & 76 deletions magicblock-chainlink/src/chainlink/fetch_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,38 +859,103 @@ where

let (loaded_programs, program_data_subs, errors) = {
// For LoaderV3 accounts we fetch the program data account
let mut fetch_with_program_data_join_set = JoinSet::new();
let (loaderv3_programs, single_account_programs): (Vec<_>, Vec<_>) =
programs
.into_iter()
.partition(|(_, acc, _)| acc.owner().eq(&LOADER_V3));

let mut pubkeys_to_fetch =
Vec::with_capacity(loaderv3_programs.len() * 2);
let mut batch_min_context_slot = min_context_slot;

for (pubkey, _, account_slot) in &loaderv3_programs {
let effective_slot = if let Some(min_slot) = min_context_slot {
min_slot.max(*account_slot)
} else {
*account_slot
};
fetch_with_program_data_join_set.spawn(
self.task_to_fetch_with_program_data(
*pubkey,
effective_slot,
),
batch_min_context_slot = Some(
batch_min_context_slot.unwrap_or(0).max(effective_slot),
);

// We intentionally take the global max effective slot for the batch (not per-program)
// to enforce a consistent minimum slot across all LoaderV3 programs.
let program_data_pubkey =
get_loaderv3_get_program_data_address(pubkey);
pubkeys_to_fetch.push(*pubkey);
pubkeys_to_fetch.push(program_data_pubkey);
}
let joined = fetch_with_program_data_join_set.join_all().await;
let (mut errors, accounts_with_program_data) = joined
.into_iter()
.fold((vec![], vec![]), |(mut errors, mut successes), res| {
match res {
Ok(Ok(account_with_program_data)) => {
successes.push(account_with_program_data)

let fetch_result = if !pubkeys_to_fetch.is_empty() {
self.fetch_count.fetch_add(
pubkeys_to_fetch.len() as u64,
Ordering::Relaxed,
);
self.remote_account_provider
.try_get_multi_until_slots_match(
&pubkeys_to_fetch,
Some(MatchSlotsConfig {
min_context_slot: batch_min_context_slot,
..Default::default()
}),
)
.await
} else {
Ok(vec![])
};

let (mut errors, accounts_with_program_data) = match fetch_result {
Ok(remote_accounts) => {
if remote_accounts.len() != pubkeys_to_fetch.len() {
(
vec![ChainlinkError::ProgramAccountResolutionsFailed(
format!(
"LoaderV3 fetch: expected {} accounts, got {}",
pubkeys_to_fetch.len(),
remote_accounts.len()
)
)],
vec![],
)
} else {
let mut successes = Vec::new();
let mut errors = Vec::new();

for (program_info, (pubkey_pair, account_pair)) in
loaderv3_programs.into_iter().zip(
pubkeys_to_fetch
.chunks(2)
.zip(remote_accounts.chunks(2)),
)
{
if account_pair.len() != 2 {
errors.push(ChainlinkError::ProgramAccountResolutionsFailed(
format!("LoaderV3 fetch: expected 2 accounts (program + data) per pair, got {}", account_pair.len())
));
continue;
}
let (pubkey, _, _) = program_info;
let program_data_pubkey = pubkey_pair[1];

let account_program = account_pair[0].clone();
let account_data = account_pair[1].clone();
let result = Self::resolve_account_with_companion(
&self.accounts_bank,
pubkey,
program_data_pubkey,
account_program,
account_data,
);
match result {
Ok(res) => successes.push(res),
Err(err) => errors.push(err),
}
}
Ok(Err(err)) => errors.push(err),
Err(err) => errors.push(err.into()),
(errors, successes)
}
(errors, successes)
});
}
Err(err) => (vec![ChainlinkError::from(err)], vec![]),
};
let mut loaded_programs = vec![];

// Cancel subs for program data accounts
Expand Down Expand Up @@ -1243,25 +1308,25 @@ where
fn task_to_fetch_with_companion(
&self,
pubkey: Pubkey,
delegation_record_pubkey: Pubkey,
companion_pubkey: Pubkey,
slot: u64,
) -> task::JoinHandle<ChainlinkResult<AccountWithCompanion>> {
let provider = self.remote_account_provider.clone();
let bank = self.accounts_bank.clone();
let fetch_count = self.fetch_count.clone();
task::spawn(async move {
trace!("Fetching account {pubkey} with delegation record {delegation_record_pubkey} at slot {slot}");
trace!("Fetching account {pubkey} with companion {companion_pubkey} at slot {slot}");

// Increment fetch counter for testing deduplication (2 accounts: pubkey + delegation_record_pubkey)
fetch_count.fetch_add(2, Ordering::Relaxed);

provider
.try_get_multi_until_slots_match(
&[pubkey, delegation_record_pubkey],
&[pubkey, companion_pubkey],
Some(MatchSlotsConfig {
min_context_slot: Some(slot),
..Default::default()
}),
min_context_slot: Some(slot),
..Default::default()
}),
)
.await
// SAFETY: we always get two results here
Expand All @@ -1272,63 +1337,81 @@ where
})
.map_err(ChainlinkError::from)
.and_then(|(acc, deleg)| {
use RemoteAccount::*;
match (acc, deleg) {
// Account not found even though we found it previously - this is invalid,
// either way we cannot use it now
(NotFound(_), NotFound(_)) |
(NotFound(_), Found(_)) => Err(ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey
)),
(Found(acc), NotFound(_)) => {
// Only account found without a delegation record, it is either invalid
// or a delegation record itself.
// Clone it as is (without changing the owner or flagging as delegated)
match acc.account.resolved_account_shared_data(&*bank) {
Some(account) =>
Ok(AccountWithCompanion {
pubkey,
account,
companion_pubkey: delegation_record_pubkey,
companion_account: None,
}),
None => Err(
ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey
),
),
}
}
(Found(acc), Found(deleg)) => {
// Found the delegation record, we include it so that the caller can
// use it to add metadata to the account and use it for decision making
let Some(deleg_account) =
deleg.account.resolved_account_shared_data(&*bank)
else {
return Err(
ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey
));
};
let Some(account) = acc.account.resolved_account_shared_data(&*bank) else {
return Err(
ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey
),
);
};
Ok(AccountWithCompanion {
pubkey,
account,
companion_pubkey: delegation_record_pubkey,
companion_account: Some(deleg_account),
})
},
}
Self::resolve_account_with_companion(
&bank,
pubkey,
companion_pubkey,
acc,
deleg,
)
})
})
}

fn resolve_account_with_companion(
bank: &V,
pubkey: Pubkey,
companion_pubkey: Pubkey,
acc: RemoteAccount,
companion: RemoteAccount,
) -> ChainlinkResult<AccountWithCompanion> {
use RemoteAccount::*;
match (acc, companion) {
// Account not found even though we found it previously - this is invalid,
// either way we cannot use it now
(NotFound(_), NotFound(_)) | (NotFound(_), Found(_)) => {
Err(ChainlinkError::ResolvedAccountCouldNoLongerBeFound(pubkey))
}
(Found(acc), NotFound(_)) => {
// Only account found without a companion
// In case of delegation record fetch the account is either invalid
// or a delegation record itself.
// Clone it as is (without changing the owner or flagging as delegated)
match acc.account.resolved_account_shared_data(bank) {
Some(account) => Ok(AccountWithCompanion {
pubkey,
account,
companion_pubkey,
companion_account: None,
}),
None => Err(
ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey,
),
),
}
}
(Found(acc), Found(comp)) => {
// Found the delegation record, we include it so that the caller can
// use it to add metadata to the account and use it for decision making
let Some(comp_account) =
comp.account.resolved_account_shared_data(bank)
else {
return Err(
ChainlinkError::ResolvedCompanionAccountCouldNoLongerBeFound(
companion_pubkey,
),
);
};
let Some(account) =
acc.account.resolved_account_shared_data(bank)
else {
return Err(
ChainlinkError::ResolvedAccountCouldNoLongerBeFound(
pubkey,
),
);
};
Ok(AccountWithCompanion {
pubkey,
account,
companion_pubkey,
companion_account: Some(comp_account),
})
}
}
}

/// Check if an account is currently being watched (subscribed to) by the
/// remote account provider
pub fn is_watching(&self, pubkey: &Pubkey) -> bool {
Expand Down
10 changes: 10 additions & 0 deletions test-integration/configs/cloning-conf.devnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ id = "MiniV31111111111111111111111111111111111111"
path = "../target/deploy/miniv3/program_mini.so"
auth = "MiniV3AUTH111111111111111111111111111111111"

[[program]]
id = "MiniV32111111111111111111111111111111111111"
path = "../target/deploy/miniv3/program_mini.so"
auth = "MiniV4AUTH211111111111111111111111111111111"

[[program]]
id = "MiniV33111111111111111111111111111111111111"
path = "../target/deploy/miniv3/program_mini.so"
auth = "MiniV4AUTH311111111111111111111111111111111"

[[program]]
id = "f1exzKGtdeVX3d6UXZ89cY7twiNJe9S5uq84RTA4Rq4"
path = "../target/deploy/program_flexi_counter.so"
Expand Down
15 changes: 15 additions & 0 deletions test-integration/test-chainlink/src/programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ pub const MINIV3_AUTH: Pubkey =
pub const MINIV4_AUTH: Pubkey =
pubkey!("MiniV4AUTH111111111111111111111111111111111");

/// Additional v3 loader program for testing parallel cloning of multiple
/// programs. This program is cloned from devnet in ephemeral tests.
/// Note: In devnet tests, these programs are deployed with different IDs
/// but using the same MINIV3 binary. They're used to test batched fetching
/// of multiple LoaderV3 programs.
pub const PARALLEL_MINIV3_1: Pubkey =
pubkey!("MiniV32111111111111111111111111111111111111");
pub const PARALLEL_MINIV3_1_AUTH: Pubkey =
pubkey!("MiniV4AUTH211111111111111111111111111111111");

pub const PARALLEL_MINIV3_2: Pubkey =
pubkey!("MiniV33111111111111111111111111111111111111");
pub const PARALLEL_MINIV3_2_AUTH: Pubkey =
pubkey!("MiniV4AUTH311111111111111111111111111111111");

const CHUNK_SIZE: usize = 800;

pub async fn airdrop_sol(
Expand Down
Loading