diff --git a/magicblock-chainlink/src/chainlink/errors.rs b/magicblock-chainlink/src/chainlink/errors.rs index 09e9c4cce..9998514ee 100644 --- a/magicblock-chainlink/src/chainlink/errors.rs +++ b/magicblock-chainlink/src/chainlink/errors.rs @@ -27,6 +27,9 @@ pub enum ChainlinkError { #[error("Failed to find account that was just resolved {0}")] ResolvedAccountCouldNoLongerBeFound(Pubkey), + #[error("Failed to find companion account that was just resolved {0}")] + ResolvedCompanionAccountCouldNoLongerBeFound(Pubkey), + #[error("Failed to subscribe to account {0}: {1:?}")] FailedToSubscribeToAccount(Pubkey, RemoteAccountProviderError), diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner.rs b/magicblock-chainlink/src/chainlink/fetch_cloner.rs index f5c6fa416..96c133191 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner.rs @@ -859,38 +859,103 @@ where let (loaded_programs, program_data_subs, errors) = { // For LoaderV3 accounts we fetch the program data account - let mut fetch_with_program_data_join_set = JoinSet::new(); let (loaderv3_programs, single_account_programs): (Vec<_>, Vec<_>) = programs .into_iter() .partition(|(_, acc, _)| acc.owner().eq(&LOADER_V3)); + let mut pubkeys_to_fetch = + Vec::with_capacity(loaderv3_programs.len() * 2); + let mut batch_min_context_slot = min_context_slot; + for (pubkey, _, account_slot) in &loaderv3_programs { let effective_slot = if let Some(min_slot) = min_context_slot { min_slot.max(*account_slot) } else { *account_slot }; - fetch_with_program_data_join_set.spawn( - self.task_to_fetch_with_program_data( - *pubkey, - effective_slot, - ), + batch_min_context_slot = Some( + batch_min_context_slot.unwrap_or(0).max(effective_slot), ); + + // We intentionally take the global max effective slot for the batch (not per-program) + // to enforce a consistent minimum slot across all LoaderV3 programs. + let program_data_pubkey = + get_loaderv3_get_program_data_address(pubkey); + pubkeys_to_fetch.push(*pubkey); + pubkeys_to_fetch.push(program_data_pubkey); } - let joined = fetch_with_program_data_join_set.join_all().await; - let (mut errors, accounts_with_program_data) = joined - .into_iter() - .fold((vec![], vec![]), |(mut errors, mut successes), res| { - match res { - Ok(Ok(account_with_program_data)) => { - successes.push(account_with_program_data) + + let fetch_result = if !pubkeys_to_fetch.is_empty() { + self.fetch_count.fetch_add( + pubkeys_to_fetch.len() as u64, + Ordering::Relaxed, + ); + self.remote_account_provider + .try_get_multi_until_slots_match( + &pubkeys_to_fetch, + Some(MatchSlotsConfig { + min_context_slot: batch_min_context_slot, + ..Default::default() + }), + ) + .await + } else { + Ok(vec![]) + }; + + let (mut errors, accounts_with_program_data) = match fetch_result { + Ok(remote_accounts) => { + if remote_accounts.len() != pubkeys_to_fetch.len() { + ( + vec![ChainlinkError::ProgramAccountResolutionsFailed( + format!( + "LoaderV3 fetch: expected {} accounts, got {}", + pubkeys_to_fetch.len(), + remote_accounts.len() + ) + )], + vec![], + ) + } else { + let mut successes = Vec::new(); + let mut errors = Vec::new(); + + for (program_info, (pubkey_pair, account_pair)) in + loaderv3_programs.into_iter().zip( + pubkeys_to_fetch + .chunks(2) + .zip(remote_accounts.chunks(2)), + ) + { + if account_pair.len() != 2 { + errors.push(ChainlinkError::ProgramAccountResolutionsFailed( + format!("LoaderV3 fetch: expected 2 accounts (program + data) per pair, got {}", account_pair.len()) + )); + continue; + } + let (pubkey, _, _) = program_info; + let program_data_pubkey = pubkey_pair[1]; + + let account_program = account_pair[0].clone(); + let account_data = account_pair[1].clone(); + let result = Self::resolve_account_with_companion( + &self.accounts_bank, + pubkey, + program_data_pubkey, + account_program, + account_data, + ); + match result { + Ok(res) => successes.push(res), + Err(err) => errors.push(err), + } } - Ok(Err(err)) => errors.push(err), - Err(err) => errors.push(err.into()), + (errors, successes) } - (errors, successes) - }); + } + Err(err) => (vec![ChainlinkError::from(err)], vec![]), + }; let mut loaded_programs = vec![]; // Cancel subs for program data accounts @@ -1243,25 +1308,25 @@ where fn task_to_fetch_with_companion( &self, pubkey: Pubkey, - delegation_record_pubkey: Pubkey, + companion_pubkey: Pubkey, slot: u64, ) -> task::JoinHandle> { let provider = self.remote_account_provider.clone(); let bank = self.accounts_bank.clone(); let fetch_count = self.fetch_count.clone(); task::spawn(async move { - trace!("Fetching account {pubkey} with delegation record {delegation_record_pubkey} at slot {slot}"); + trace!("Fetching account {pubkey} with companion {companion_pubkey} at slot {slot}"); // Increment fetch counter for testing deduplication (2 accounts: pubkey + delegation_record_pubkey) fetch_count.fetch_add(2, Ordering::Relaxed); provider .try_get_multi_until_slots_match( - &[pubkey, delegation_record_pubkey], + &[pubkey, companion_pubkey], Some(MatchSlotsConfig { - min_context_slot: Some(slot), - ..Default::default() - }), + min_context_slot: Some(slot), + ..Default::default() + }), ) .await // SAFETY: we always get two results here @@ -1272,63 +1337,81 @@ where }) .map_err(ChainlinkError::from) .and_then(|(acc, deleg)| { - use RemoteAccount::*; - match (acc, deleg) { - // Account not found even though we found it previously - this is invalid, - // either way we cannot use it now - (NotFound(_), NotFound(_)) | - (NotFound(_), Found(_)) => Err(ChainlinkError::ResolvedAccountCouldNoLongerBeFound( - pubkey - )), - (Found(acc), NotFound(_)) => { - // Only account found without a delegation record, it is either invalid - // or a delegation record itself. - // Clone it as is (without changing the owner or flagging as delegated) - match acc.account.resolved_account_shared_data(&*bank) { - Some(account) => - Ok(AccountWithCompanion { - pubkey, - account, - companion_pubkey: delegation_record_pubkey, - companion_account: None, - }), - None => Err( - ChainlinkError::ResolvedAccountCouldNoLongerBeFound( - pubkey - ), - ), - } - } - (Found(acc), Found(deleg)) => { - // Found the delegation record, we include it so that the caller can - // use it to add metadata to the account and use it for decision making - let Some(deleg_account) = - deleg.account.resolved_account_shared_data(&*bank) - else { - return Err( - ChainlinkError::ResolvedAccountCouldNoLongerBeFound( - pubkey - )); - }; - let Some(account) = acc.account.resolved_account_shared_data(&*bank) else { - return Err( - ChainlinkError::ResolvedAccountCouldNoLongerBeFound( - pubkey - ), - ); - }; - Ok(AccountWithCompanion { - pubkey, - account, - companion_pubkey: delegation_record_pubkey, - companion_account: Some(deleg_account), - }) - }, - } + Self::resolve_account_with_companion( + &bank, + pubkey, + companion_pubkey, + acc, + deleg, + ) }) }) } + fn resolve_account_with_companion( + bank: &V, + pubkey: Pubkey, + companion_pubkey: Pubkey, + acc: RemoteAccount, + companion: RemoteAccount, + ) -> ChainlinkResult { + use RemoteAccount::*; + match (acc, companion) { + // Account not found even though we found it previously - this is invalid, + // either way we cannot use it now + (NotFound(_), NotFound(_)) | (NotFound(_), Found(_)) => { + Err(ChainlinkError::ResolvedAccountCouldNoLongerBeFound(pubkey)) + } + (Found(acc), NotFound(_)) => { + // Only account found without a companion + // In case of delegation record fetch the account is either invalid + // or a delegation record itself. + // Clone it as is (without changing the owner or flagging as delegated) + match acc.account.resolved_account_shared_data(bank) { + Some(account) => Ok(AccountWithCompanion { + pubkey, + account, + companion_pubkey, + companion_account: None, + }), + None => Err( + ChainlinkError::ResolvedAccountCouldNoLongerBeFound( + pubkey, + ), + ), + } + } + (Found(acc), Found(comp)) => { + // Found the delegation record, we include it so that the caller can + // use it to add metadata to the account and use it for decision making + let Some(comp_account) = + comp.account.resolved_account_shared_data(bank) + else { + return Err( + ChainlinkError::ResolvedCompanionAccountCouldNoLongerBeFound( + companion_pubkey, + ), + ); + }; + let Some(account) = + acc.account.resolved_account_shared_data(bank) + else { + return Err( + ChainlinkError::ResolvedAccountCouldNoLongerBeFound( + pubkey, + ), + ); + }; + Ok(AccountWithCompanion { + pubkey, + account, + companion_pubkey, + companion_account: Some(comp_account), + }) + } + } + } + /// Check if an account is currently being watched (subscribed to) by the /// remote account provider pub fn is_watching(&self, pubkey: &Pubkey) -> bool { diff --git a/test-integration/configs/cloning-conf.devnet.toml b/test-integration/configs/cloning-conf.devnet.toml index 984c5e7e5..3fd4b146c 100644 --- a/test-integration/configs/cloning-conf.devnet.toml +++ b/test-integration/configs/cloning-conf.devnet.toml @@ -44,6 +44,16 @@ id = "MiniV31111111111111111111111111111111111111" path = "../target/deploy/miniv3/program_mini.so" auth = "MiniV3AUTH111111111111111111111111111111111" +[[program]] +id = "MiniV32111111111111111111111111111111111111" +path = "../target/deploy/miniv3/program_mini.so" +auth = "MiniV4AUTH211111111111111111111111111111111" + +[[program]] +id = "MiniV33111111111111111111111111111111111111" +path = "../target/deploy/miniv3/program_mini.so" +auth = "MiniV4AUTH311111111111111111111111111111111" + [[program]] id = "f1exzKGtdeVX3d6UXZ89cY7twiNJe9S5uq84RTA4Rq4" path = "../target/deploy/program_flexi_counter.so" diff --git a/test-integration/test-chainlink/src/programs.rs b/test-integration/test-chainlink/src/programs.rs index c6d3f480d..ee193b1c2 100644 --- a/test-integration/test-chainlink/src/programs.rs +++ b/test-integration/test-chainlink/src/programs.rs @@ -50,6 +50,21 @@ pub const MINIV3_AUTH: Pubkey = pub const MINIV4_AUTH: Pubkey = pubkey!("MiniV4AUTH111111111111111111111111111111111"); +/// Additional v3 loader program for testing parallel cloning of multiple +/// programs. This program is cloned from devnet in ephemeral tests. +/// Note: In devnet tests, these programs are deployed with different IDs +/// but using the same MINIV3 binary. They're used to test batched fetching +/// of multiple LoaderV3 programs. +pub const PARALLEL_MINIV3_1: Pubkey = + pubkey!("MiniV32111111111111111111111111111111111111"); +pub const PARALLEL_MINIV3_1_AUTH: Pubkey = + pubkey!("MiniV4AUTH211111111111111111111111111111111"); + +pub const PARALLEL_MINIV3_2: Pubkey = + pubkey!("MiniV33111111111111111111111111111111111111"); +pub const PARALLEL_MINIV3_2_AUTH: Pubkey = + pubkey!("MiniV4AUTH311111111111111111111111111111111"); + const CHUNK_SIZE: usize = 800; pub async fn airdrop_sol( diff --git a/test-integration/test-cloning/tests/08_multi_program_cloning.rs b/test-integration/test-cloning/tests/08_multi_program_cloning.rs new file mode 100644 index 000000000..533cb3ff1 --- /dev/null +++ b/test-integration/test-cloning/tests/08_multi_program_cloning.rs @@ -0,0 +1,73 @@ +use integration_test_tools::IntegrationTestContext; +use log::*; +use program_mini::sdk::MiniSdk; +use solana_sdk::{ + instruction::Instruction, native_token::LAMPORTS_PER_SOL, + signature::Keypair, signer::Signer, +}; +use test_chainlink::programs::{PARALLEL_MINIV3_1, PARALLEL_MINIV3_2}; +use test_kit::init_logger; + +/// This test verifies that we can clone two LoaderV3 programs together by +/// sending a transaction that invokes both programs. Both programs are +/// cloned from the local devnet, demonstrating that the batched fetching +/// optimization works correctly when multiple LoaderV3 programs need to be +/// fetched and cloned in a single batch. +#[test] +fn test_clone_two_programs_in_single_transaction() { + init_logger!(); + let ctx = IntegrationTestContext::try_new().unwrap(); + + // Create SDK instances for both parallel programs + let sdk_prog1 = MiniSdk::new(PARALLEL_MINIV3_1); + let sdk_prog2 = MiniSdk::new(PARALLEL_MINIV3_2); + + let payer = Keypair::new(); + ctx.airdrop_chain_escrowed(&payer, 2 * LAMPORTS_PER_SOL) + .unwrap(); + + // Create instructions for both programs + let msg_prog1 = "Hello from Parallel Program 1"; + let msg_prog2 = "Hello from Parallel Program 2"; + + let ix_prog1: Instruction = + sdk_prog1.log_msg_instruction(&payer.pubkey(), msg_prog1); + let ix_prog2: Instruction = + sdk_prog2.log_msg_instruction(&payer.pubkey(), msg_prog2); + + // Send a transaction that invokes both programs. + // This exercises the batched fetching optimization since both programs + // are LoaderV3 programs that need their program data accounts fetched. + debug!( + "Sending transaction with instructions for both parallel programs..." + ); + let (sig, found) = ctx + .send_and_confirm_instructions_with_payer_ephem( + &[ix_prog1, ix_prog2], + &payer, + ) + .unwrap(); + + debug!( + "Transaction sent with signature {}. Found on chain: {}", + sig, found + ); + assert!(found, "Transaction was not found on chain"); + + // Verify both programs executed correctly by checking logs + if let Some(logs) = ctx.fetch_ephemeral_logs(sig) { + debug!("Transaction logs: {:?}", logs); + assert!( + logs.contains(&format!("Program log: LogMsg: {}", msg_prog1)), + "First parallel program instruction did not execute correctly" + ); + assert!( + logs.contains(&format!("Program log: LogMsg: {}", msg_prog2)), + "Second parallel program instruction did not execute correctly" + ); + } else { + panic!("No logs found for transaction {}", sig); + } + + debug!("Test passed: Successfully cloned and executed two LoaderV3 programs in a single transaction"); +}