diff --git a/magicblock-accounts/src/scheduled_commits_processor.rs b/magicblock-accounts/src/scheduled_commits_processor.rs index da5a81a14..b709c5be2 100644 --- a/magicblock-accounts/src/scheduled_commits_processor.rs +++ b/magicblock-accounts/src/scheduled_commits_processor.rs @@ -280,8 +280,8 @@ impl ScheduledCommitsProcessorImpl { }, Err(err) => { error!( - "Failed to commit in slot: {}, blockhash: {}. {:?}", - intent_meta.slot, intent_meta.blockhash, err + "Failed to commit intent: {}, slot: {}, blockhash: {}. {:?}", + intent_id, intent_meta.slot, intent_meta.blockhash, err ); err.signatures() .map(|(commit, finalize)| { diff --git a/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs b/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs index 5b1b359b7..16c0a3d16 100644 --- a/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs +++ b/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs @@ -431,6 +431,7 @@ mod tests { let msg = create_test_intent( 1, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); sender.send(msg.clone()).await.unwrap(); @@ -448,8 +449,8 @@ mod tests { // Send two conflicting messages let pubkey = pubkey!("1111111111111111111111111111111111111111111"); - let msg1 = create_test_intent(1, &[pubkey]); - let msg2 = create_test_intent(2, &[pubkey]); + let msg1 = create_test_intent(1, &[pubkey], false); + let msg2 = create_test_intent(2, &[pubkey], false); sender.send(msg1.clone()).await.unwrap(); sender.send(msg2.clone()).await.unwrap(); @@ -475,6 +476,7 @@ mod tests { let msg = create_test_intent( 1, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); sender.send(msg.clone()).await.unwrap(); @@ -501,6 +503,7 @@ mod tests { let msg = create_test_intent( 1, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); worker.db.store_base_intent(msg.clone()).await.unwrap(); @@ -535,6 +538,7 @@ mod tests { let msg = create_test_intent( i as u64, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); sender.send(msg).await.unwrap(); } @@ -570,6 +574,7 @@ mod tests { let msg = create_test_intent( i as u64, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); sender.send(msg).await.unwrap(); } @@ -600,7 +605,7 @@ mod tests { let mut received_ids = HashSet::new(); for i in 0..NUM_MESSAGES { let unique_pubkey = Pubkey::new_unique(); // Each message gets unique key - let msg = create_test_intent(i, &[unique_pubkey]); + let msg = create_test_intent(i, &[unique_pubkey], false); received_ids.insert(i); sender.send(msg).await.unwrap(); @@ -666,7 +671,7 @@ mod tests { vec![Pubkey::new_unique()] }; - let msg = create_test_intent(i as u64, &pubkeys); + let msg = create_test_intent(i as u64, &pubkeys, false); sender.send(msg).await.unwrap(); } diff --git a/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs b/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs index 27eca6a0d..cb5335d0f 100644 --- a/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs +++ b/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs @@ -312,10 +312,12 @@ mod simple_test { let msg1 = create_test_intent( 1, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); let msg2 = create_test_intent( 2, &[pubkey!("22222222222222222222222222222222222222222222")], + false, ); // First intent should execute immediately @@ -333,12 +335,12 @@ mod simple_test { let mut scheduler = IntentScheduler::new(); let pubkey = pubkey!("1111111111111111111111111111111111111111111"); - let msg1 = create_test_intent(1, &[pubkey]); + let msg1 = create_test_intent(1, &[pubkey], false); // First message executes immediately assert!(scheduler.schedule(msg1).is_some()); for id in 2..=NUM_INTENTS { - let msg = create_test_intent(id, &[pubkey]); + let msg = create_test_intent(id, &[pubkey], false); // intent gets blocked assert!(scheduler.schedule(msg).is_none()); } @@ -359,8 +361,8 @@ mod completion_simple_test { fn test_completion_unblocks_intents() { let mut scheduler = IntentScheduler::new(); let pubkey = pubkey!("1111111111111111111111111111111111111111111"); - let msg1 = create_test_intent(1, &[pubkey]); - let msg2 = create_test_intent(2, &[pubkey]); + let msg1 = create_test_intent(1, &[pubkey], false); + let msg2 = create_test_intent(2, &[pubkey], false); // First intent executes immediately let executed = scheduler.schedule(msg1.clone()).unwrap(); @@ -380,9 +382,9 @@ mod completion_simple_test { fn test_multiple_blocked_intents() { let mut scheduler = IntentScheduler::new(); let pubkey = pubkey!("1111111111111111111111111111111111111111111"); - let msg1 = create_test_intent(1, &[pubkey]); - let msg2 = create_test_intent(2, &[pubkey]); - let msg3 = create_test_intent(3, &[pubkey]); + let msg1 = create_test_intent(1, &[pubkey], false); + let msg2 = create_test_intent(2, &[pubkey], false); + let msg3 = create_test_intent(3, &[pubkey], false); // First intent executes immediately let executed = scheduler.schedule(msg1.clone()).unwrap(); @@ -431,25 +433,25 @@ mod complex_blocking_test { // intent 1: [a1, a2, a3] let msg1_keys = vec![a1, a2, a3]; - let msg1 = create_test_intent(1, &msg1_keys); + let msg1 = create_test_intent(1, &msg1_keys, false); assert!(scheduler.schedule(msg1.clone()).is_some()); assert_eq!(scheduler.intents_blocked(), 0); // intent 2: [b1, b2, b3] let msg2_keys = vec![b1, b2, b3]; - let msg2 = create_test_intent(2, &msg2_keys); + let msg2 = create_test_intent(2, &msg2_keys, false); assert!(scheduler.schedule(msg2.clone()).is_some()); assert_eq!(scheduler.intents_blocked(), 0); // intent 3: [a1, b1] - blocked by msg1 & msg2 let msg3_keys = vec![a1, b1]; - let msg3 = create_test_intent(3, &msg3_keys); + let msg3 = create_test_intent(3, &msg3_keys, false); assert!(scheduler.schedule(msg3.clone()).is_none()); assert_eq!(scheduler.intents_blocked(), 1); // intent 4: [a1, a3] - blocked by msg1 & msg3 let msg4_keys = vec![a1, a3]; - let msg4 = create_test_intent(4, &msg4_keys); + let msg4 = create_test_intent(4, &msg4_keys, false); assert!(scheduler.schedule(msg4.clone()).is_none()); assert_eq!(scheduler.intents_blocked(), 2); @@ -491,15 +493,15 @@ mod complex_blocking_test { // intent 1: [a1, a2, a3] (executing) let msg1_keys = vec![a1, a2, a3]; - let msg1 = create_test_intent(1, &msg1_keys); + let msg1 = create_test_intent(1, &msg1_keys, false); // intent 2: [c1, a1] (blocked by msg1) let msg2_keys = vec![c1, a1]; - let msg2 = create_test_intent(2, &msg2_keys); + let msg2 = create_test_intent(2, &msg2_keys, false); // intent 3: [c2, c1] (arriving later) let msg3_keys = vec![c2, c1]; - let msg3 = create_test_intent(3, &msg3_keys); + let msg3 = create_test_intent(3, &msg3_keys, false); // Schedule msg1 (executes immediately) let executed_msg1 = scheduler.schedule(msg1.clone()).unwrap(); @@ -540,11 +542,11 @@ mod complex_blocking_test { let c = pubkey!("31111111111111111111111111111111111111111111"); // intents with various key combinations - let msg1 = create_test_intent(1, &[a, b]); - let msg2 = create_test_intent(2, &[a, c]); - let msg3 = create_test_intent(3, &[c]); - let msg4 = create_test_intent(4, &[b]); - let msg5 = create_test_intent(5, &[a]); + let msg1 = create_test_intent(1, &[a, b], false); + let msg2 = create_test_intent(2, &[a, c], false); + let msg3 = create_test_intent(3, &[c], false); + let msg4 = create_test_intent(4, &[b], false); + let msg5 = create_test_intent(5, &[a], false); // msg1 executes immediately let executed1 = scheduler.schedule(msg1.clone()).unwrap(); @@ -589,7 +591,7 @@ mod edge_cases_test { #[test] fn test_intent_without_pubkeys() { let mut scheduler = IntentScheduler::new(); - let mut msg = create_test_intent(1, &[]); + let mut msg = create_test_intent(1, &[], false); msg.inner.base_intent = MagicBaseIntent::BaseActions(vec![]); // Should execute immediately since it has no pubkeys @@ -612,6 +614,7 @@ mod complete_error_test { let msg = create_test_intent( 1, &[pubkey!("1111111111111111111111111111111111111111111")], + false, ); // Attempt to complete message that was never scheduled @@ -629,11 +632,11 @@ mod complete_error_test { let pubkey2 = pubkey!("21111111111111111111111111111111111111111111"); // Schedule first intent - let mut msg1 = create_test_intent(1, &[pubkey1, pubkey2]); + let mut msg1 = create_test_intent(1, &[pubkey1, pubkey2], false); assert!(scheduler.schedule(msg1.clone()).is_some()); // Schedule second intent that conflicts with first - let msg2 = create_test_intent(2, &[pubkey1]); + let msg2 = create_test_intent(2, &[pubkey1], false); assert!(scheduler.schedule(msg2.clone()).is_none()); msg1.inner.get_committed_accounts_mut().unwrap().pop(); @@ -654,7 +657,7 @@ mod complete_error_test { let pubkey3 = pubkey!("31111111111111111111111111111111111111111111"); // Schedule first intent - let mut msg1 = create_test_intent(1, &[pubkey1, pubkey2]); + let mut msg1 = create_test_intent(1, &[pubkey1, pubkey2], false); assert!(scheduler.schedule(msg1.clone()).is_some()); msg1.inner @@ -681,11 +684,11 @@ mod complete_error_test { let pubkey2 = pubkey!("21111111111111111111111111111111111111111111"); // Schedule first intent for pubkey1 only - let msg1 = create_test_intent(1, &[pubkey1]); + let msg1 = create_test_intent(1, &[pubkey1], false); assert!(scheduler.schedule(msg1.clone()).is_some()); // Create second intent using both pubkeys - let msg2 = create_test_intent(2, &[pubkey1, pubkey2]); + let msg2 = create_test_intent(2, &[pubkey1, pubkey2], false); // Manually add to blocked_keys without proper scheduling scheduler.schedule(msg2.clone()); @@ -703,8 +706,8 @@ mod complete_error_test { let pubkey = pubkey!("1111111111111111111111111111111111111111111"); // Schedule two intents for same pubkey - let msg1 = create_test_intent(1, &[pubkey]); - let msg2 = create_test_intent(2, &[pubkey]); + let msg1 = create_test_intent(1, &[pubkey], false); + let msg2 = create_test_intent(2, &[pubkey], false); // First executes immediately assert!(scheduler.schedule(msg1.clone()).is_some()); @@ -725,9 +728,11 @@ mod complete_error_test { pub(crate) fn create_test_intent( id: u64, pubkeys: &[Pubkey], + is_undelegate: bool, ) -> ScheduledBaseIntentWrapper { use magicblock_program::magic_scheduled_base_intent::{ - CommitType, CommittedAccount, MagicBaseIntent, + CommitAndUndelegate, CommitType, CommittedAccount, MagicBaseIntent, + ScheduledBaseIntent, UndelegateType, }; use solana_account::Account; use solana_sdk::{hash::Hash, transaction::Transaction}; @@ -753,8 +758,16 @@ pub(crate) fn create_test_intent( }) .collect(); - intent.base_intent = - MagicBaseIntent::Commit(CommitType::Standalone(committed_accounts)); + let commit_type = CommitType::Standalone(committed_accounts); + if is_undelegate { + intent.base_intent = + MagicBaseIntent::CommitAndUndelegate(CommitAndUndelegate { + commit_action: commit_type, + undelegate_action: UndelegateType::Standalone, + }) + } else { + intent.base_intent = MagicBaseIntent::Commit(commit_type); + } } ScheduledBaseIntentWrapper { diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index d37346c19..3afbecf48 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -7,7 +7,7 @@ pub mod two_stage_executor; use std::{mem, ops::ControlFlow, sync::Arc, time::Duration}; use async_trait::async_trait; -use futures_util::future::try_join_all; +use futures_util::future::{join, try_join_all}; use log::{trace, warn}; use magicblock_metrics::metrics; use magicblock_program::{ @@ -26,7 +26,7 @@ use solana_pubkey::Pubkey; use solana_rpc_client_api::config::RpcTransactionConfig; use solana_sdk::{ message::VersionedMessage, - signature::{Keypair, Signature, Signer, SignerError}, + signature::{Keypair, Signature, Signer}, transaction::VersionedTransaction, }; @@ -45,7 +45,7 @@ use crate::{ tasks::{ task_builder::{TaskBuilderError, TaskBuilderImpl, TasksBuilder}, task_strategist::{ - TaskStrategist, TaskStrategistError, TransactionStrategy, + StrategyExecutionMode, TaskStrategist, TransactionStrategy, }, task_visitors::utility_visitor::TaskVisitorUtils, BaseTask, TaskType, @@ -137,40 +137,6 @@ where } } - /// Checks if it is possible to unite Commit & Finalize stages in 1 transaction - /// Returns corresponding `TransactionStrategy` if possible, otherwise `None` - fn try_unite_tasks( - commit_tasks: &[Box], - finalize_task: &[Box], - authority: &Pubkey, - persister: &Option

, - ) -> Result, SignerError> { - const MAX_UNITED_TASKS_LEN: usize = 22; - - // We can unite in 1 tx a lot of commits - // but then there's a possibility of hitting CPI limit, aka - // MaxInstructionTraceLengthExceeded error. - // So we limit tasks len with 22 total tasks - // In case this fails as well, it will be retried with TwoStage approach - // on retry, once retries are introduced - if commit_tasks.len() + finalize_task.len() > MAX_UNITED_TASKS_LEN { - return Ok(None); - } - - // Clone tasks since strategies applied to united case maybe suboptimal for regular one - let mut commit_tasks = commit_tasks.to_owned(); - let finalize_task = finalize_task.to_owned(); - - // Unite tasks to attempt running as single tx - commit_tasks.extend(finalize_task); - match TaskStrategist::build_strategy(commit_tasks, authority, persister) - { - Ok(strategy) => Ok(Some(strategy)), - Err(TaskStrategistError::FailedToFitError) => Ok(None), - Err(TaskStrategistError::SignerError(err)) => Err(err), - } - } - async fn execute_inner( &mut self, base_intent: ScheduledBaseIntent, @@ -191,17 +157,17 @@ where ); } - // Build tasks for commit stage - let commit_tasks = TaskBuilderImpl::commit_tasks( - &self.task_info_fetcher, - &base_intent, - persister, - ) - .await?; - let committed_pubkeys = match base_intent.get_committed_pubkeys() { Some(value) => value, None => { + // Build tasks for commit stage + let commit_tasks = TaskBuilderImpl::commit_tasks( + &self.task_info_fetcher, + &base_intent, + persister, + ) + .await?; + // Standalone actions executed in single stage let strategy = TaskStrategist::build_strategy( commit_tasks, @@ -218,55 +184,52 @@ where } }; - let finalize_tasks = TaskBuilderImpl::finalize_tasks( - &self.task_info_fetcher, - &base_intent, - ) - .await?; + // Build tasks for commit & finalize stages + let (commit_tasks, finalize_tasks) = { + let commit_tasks_fut = TaskBuilderImpl::commit_tasks( + &self.task_info_fetcher, + &base_intent, + persister, + ); + let finalize_tasks_fut = TaskBuilderImpl::finalize_tasks( + &self.task_info_fetcher, + &base_intent, + ); + let (commit_tasks, finalize_tasks) = + join(commit_tasks_fut, finalize_tasks_fut).await; + + (commit_tasks?, finalize_tasks?) + }; - // See if we can squeeze them in one tx - if let Some(single_tx_strategy) = Self::try_unite_tasks( - &commit_tasks, - &finalize_tasks, + // Build execution strategy + match TaskStrategist::build_execution_strategy( + commit_tasks, + finalize_tasks, &self.authority.pubkey(), persister, )? { - trace!("Executing intent in single stage"); - let output = self - .single_stage_execution_flow( + StrategyExecutionMode::SingleStage(strategy) => { + trace!("Executing intent in single stage"); + self.single_stage_execution_flow( base_intent, - single_tx_strategy, + strategy, persister, ) - .await?; - - Ok(output) - } else { - // Build strategy for Commit stage - let commit_strategy = TaskStrategist::build_strategy( - commit_tasks, - &self.authority.pubkey(), - persister, - )?; - - // Build strategy for Finalize stage - let finalize_strategy = TaskStrategist::build_strategy( - finalize_tasks, - &self.authority.pubkey(), - persister, - )?; - - trace!("Executing intent in two stages"); - let output = self - .two_stage_execution_flow( + .await + } + StrategyExecutionMode::TwoStage { + commit_stage, + finalize_stage, + } => { + trace!("Executing intent in two stages"); + self.two_stage_execution_flow( &committed_pubkeys, - commit_strategy, - finalize_strategy, + commit_stage, + finalize_stage, persister, ) - .await?; - - Ok(output) + .await + } } } @@ -844,79 +807,3 @@ where try_join_all(cleanup_futs).await.map(|_| ()) } } - -#[cfg(test)] -mod tests { - use std::{collections::HashMap, sync::Arc}; - - use solana_pubkey::Pubkey; - - use crate::{ - intent_execution_manager::intent_scheduler::create_test_intent, - intent_executor::{ - task_info_fetcher::{ - ResetType, TaskInfoFetcher, TaskInfoFetcherResult, - }, - IntentExecutorImpl, - }, - persist::IntentPersisterImpl, - tasks::task_builder::{TaskBuilderImpl, TasksBuilder}, - transaction_preparator::TransactionPreparatorImpl, - }; - - struct MockInfoFetcher; - #[async_trait::async_trait] - impl TaskInfoFetcher for MockInfoFetcher { - async fn fetch_next_commit_ids( - &self, - pubkeys: &[Pubkey], - ) -> TaskInfoFetcherResult> { - Ok(pubkeys.iter().map(|pubkey| (*pubkey, 0)).collect()) - } - - async fn fetch_rent_reimbursements( - &self, - pubkeys: &[Pubkey], - ) -> TaskInfoFetcherResult> { - Ok(pubkeys.iter().map(|_| Pubkey::new_unique()).collect()) - } - - fn peek_commit_id(&self, _pubkey: &Pubkey) -> Option { - Some(0) - } - - fn reset(&self, _: ResetType) {} - } - - #[tokio::test] - async fn test_try_unite() { - let pubkey = [Pubkey::new_unique()]; - let intent = create_test_intent(0, &pubkey); - - let info_fetcher = Arc::new(MockInfoFetcher); - let commit_task = TaskBuilderImpl::commit_tasks( - &info_fetcher, - &intent, - &None::, - ) - .await - .unwrap(); - let finalize_task = - TaskBuilderImpl::finalize_tasks(&info_fetcher, &intent) - .await - .unwrap(); - - let result = IntentExecutorImpl::< - TransactionPreparatorImpl, - MockInfoFetcher, - >::try_unite_tasks( - &commit_task, - &finalize_task, - &Pubkey::new_unique(), - &None::, - ); - - let strategy = result.unwrap().unwrap(); - assert!(strategy.lookup_tables_keys.is_empty()); - } -} diff --git a/magicblock-committor-service/src/tasks/task_strategist.rs b/magicblock-committor-service/src/tasks/task_strategist.rs index 406ba1a9d..11e369ac3 100644 --- a/magicblock-committor-service/src/tasks/task_strategist.rs +++ b/magicblock-committor-service/src/tasks/task_strategist.rs @@ -41,11 +41,131 @@ impl TransactionStrategy { ) } } + + pub fn uses_alts(&self) -> bool { + !self.lookup_tables_keys.is_empty() + } +} + +pub enum StrategyExecutionMode { + SingleStage(TransactionStrategy), + TwoStage { + commit_stage: TransactionStrategy, + finalize_stage: TransactionStrategy, + }, +} + +impl StrategyExecutionMode { + pub fn uses_alts(&self) -> bool { + match self { + Self::SingleStage(value) => value.uses_alts(), + Self::TwoStage { + commit_stage, + finalize_stage, + } => commit_stage.uses_alts() || finalize_stage.uses_alts(), + } + } } +/// Takes [`BaseTask`]s and chooses the best way to fit them in TX +/// It may change Task execution strategy so all task would fit in tx pub struct TaskStrategist; impl TaskStrategist { - /// Returns [`TaskDeliveryStrategy`] for every [`Task`] + /// Builds execution strategy from [`BaseTask`]s + /// 1. Optimizes tasks to fit in TX + /// 2. Chooses the fastest execution mode for Tasks + pub fn build_execution_strategy( + commit_tasks: Vec>, + finalize_tasks: Vec>, + authority: &Pubkey, + persister: &Option

, + ) -> TaskStrategistResult { + const MAX_UNITED_TASKS_LEN: usize = 22; + + // We can unite in 1 tx a lot of commits + // but then there's a possibility of hitting CPI limit, aka + // MaxInstructionTraceLengthExceeded error. + // So we limit tasks len with 22 total tasks + // In case this fails as well, it will be retried with TwoStage approach + // on retry, once retries are introduced + if commit_tasks.len() + finalize_tasks.len() > MAX_UNITED_TASKS_LEN { + return Self::build_two_stage( + commit_tasks, + finalize_tasks, + authority, + persister, + ); + } + + // Clone tasks since strategies applied to united case maybe suboptimal for regular one + // Unite tasks to attempt running as single tx + let single_stage_tasks = + [commit_tasks.clone(), finalize_tasks.clone()].concat(); + let single_stage_strategy = match TaskStrategist::build_strategy( + single_stage_tasks, + authority, + persister, + ) { + Ok(strategy) => StrategyExecutionMode::SingleStage(strategy), + Err(TaskStrategistError::FailedToFitError) => { + // If Tasks can't fit in SingleStage - use TwpStage execution + return Self::build_two_stage( + commit_tasks, + finalize_tasks, + authority, + persister, + ); + } + Err(TaskStrategistError::SignerError(err)) => { + return Err(err.into()) + } + }; + + // If ALTs aren't used then we sure this will be optimal - return + if !single_stage_strategy.uses_alts() { + return Ok(single_stage_strategy); + } + + // As ALTs take a very long time to activate + // it is actually faster to execute in TwoStage mode + // unless TwoStage also uses ALTs + let two_stage = Self::build_two_stage( + commit_tasks, + finalize_tasks, + authority, + persister, + )?; + if two_stage.uses_alts() { + Ok(single_stage_strategy) + } else { + Ok(two_stage) + } + } + + fn build_two_stage( + commit_tasks: Vec>, + finalize_tasks: Vec>, + authority: &Pubkey, + persister: &Option

, + ) -> TaskStrategistResult { + // Build strategy for Commit stage + let commit_strategy = + TaskStrategist::build_strategy(commit_tasks, authority, persister)?; + + // Build strategy for Finalize stage + let finalize_strategy = TaskStrategist::build_strategy( + finalize_tasks, + authority, + persister, + )?; + + Ok(StrategyExecutionMode::TwoStage { + commit_stage: commit_strategy, + finalize_stage: finalize_strategy, + }) + } + + /// Returns [`TransactionStrategy`] for tasks /// Returns Error if all optimizations weren't enough pub fn build_strategy( mut tasks: Vec>, @@ -249,18 +369,52 @@ pub type TaskStrategistResult = Result; #[cfg(test)] mod tests { + use std::{collections::HashMap, sync::Arc}; + use magicblock_program::magic_scheduled_base_intent::{ BaseAction, CommittedAccount, ProgramArgs, }; use solana_account::Account; + use solana_pubkey::Pubkey; use solana_sdk::system_program; use super::*; use crate::{ + intent_execution_manager::intent_scheduler::create_test_intent, + intent_executor::task_info_fetcher::{ + ResetType, TaskInfoFetcher, TaskInfoFetcherResult, + }, persist::IntentPersisterImpl, - tasks::{BaseActionTask, CommitTask, TaskStrategy, UndelegateTask}, + tasks::{ + task_builder::{TaskBuilderImpl, TasksBuilder}, + BaseActionTask, CommitTask, TaskStrategy, UndelegateTask, + }, }; + struct MockInfoFetcher; + #[async_trait::async_trait] + impl TaskInfoFetcher for MockInfoFetcher { + async fn fetch_next_commit_ids( + &self, + pubkeys: &[Pubkey], + ) -> TaskInfoFetcherResult> { + Ok(pubkeys.iter().map(|pubkey| (*pubkey, 0)).collect()) + } + + async fn fetch_rent_reimbursements( + &self, + pubkeys: &[Pubkey], + ) -> TaskInfoFetcherResult> { + Ok(pubkeys.iter().map(|_| Pubkey::new_unique()).collect()) + } + + fn peek_commit_id(&self, _pubkey: &Pubkey) -> Option { + Some(0) + } + + fn reset(&self, _: ResetType) {} + } + // Helper to create a simple commit task fn create_test_commit_task(commit_id: u64, data_size: usize) -> ArgsTask { ArgsTask::new(ArgsTaskType::Commit(CommitTask { @@ -500,4 +654,105 @@ mod tests { // As expected assert!(!strategy.lookup_tables_keys.is_empty()); } + + #[tokio::test] + async fn test_build_single_stage_mode() { + let pubkey = [Pubkey::new_unique()]; + let intent = create_test_intent(0, &pubkey, false); + + let info_fetcher = Arc::new(MockInfoFetcher); + let commit_task = TaskBuilderImpl::commit_tasks( + &info_fetcher, + &intent, + &None::, + ) + .await + .unwrap(); + let finalize_task = + TaskBuilderImpl::finalize_tasks(&info_fetcher, &intent) + .await + .unwrap(); + + let execution_mode = TaskStrategist::build_execution_strategy( + commit_task, + finalize_task, + &Pubkey::new_unique(), + &None::, + ) + .expect("Execution mode created"); + + let StrategyExecutionMode::SingleStage(value) = execution_mode else { + panic!("Unexpected execution mode"); + }; + assert!(!value.uses_alts()); + } + + #[tokio::test] + async fn test_build_two_stage_mode_no_alts() { + let pubkeys: [_; 3] = std::array::from_fn(|_| Pubkey::new_unique()); + let intent = create_test_intent(0, &pubkeys, true); + + let info_fetcher = Arc::new(MockInfoFetcher); + let commit_task = TaskBuilderImpl::commit_tasks( + &info_fetcher, + &intent, + &None::, + ) + .await + .unwrap(); + let finalize_task = + TaskBuilderImpl::finalize_tasks(&info_fetcher, &intent) + .await + .unwrap(); + + let execution_mode = TaskStrategist::build_execution_strategy( + commit_task, + finalize_task, + &Pubkey::new_unique(), + &None::, + ) + .expect("Execution mode created"); + + let StrategyExecutionMode::TwoStage { + commit_stage, + finalize_stage, + } = execution_mode + else { + panic!("Unexpected execution mode"); + }; + assert!(!commit_stage.uses_alts()); + assert!(!finalize_stage.uses_alts()); + } + + #[tokio::test] + async fn test_build_single_stage_mode_with_alts() { + let pubkeys: [_; 8] = std::array::from_fn(|_| Pubkey::new_unique()); + let intent = create_test_intent(0, &pubkeys, false); + + let info_fetcher = Arc::new(MockInfoFetcher); + let commit_task = TaskBuilderImpl::commit_tasks( + &info_fetcher, + &intent, + &None::, + ) + .await + .unwrap(); + let finalize_task = + TaskBuilderImpl::finalize_tasks(&info_fetcher, &intent) + .await + .unwrap(); + + let execution_mode = TaskStrategist::build_execution_strategy( + commit_task, + finalize_task, + &Pubkey::new_unique(), + &None::, + ) + .expect("Execution mode created"); + + let StrategyExecutionMode::SingleStage(value) = execution_mode else { + panic!("Unexpected execution mode"); + }; + assert!(value.uses_alts()); + } } diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 28a550668..2b35a3cf7 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -1,6 +1,5 @@ use std::{collections::HashSet, ops::ControlFlow, time::Duration}; -use borsh::BorshDeserialize; use futures_util::future::{join, join_all, try_join_all}; use log::{error, info}; use magicblock_committor_program::{ @@ -16,7 +15,6 @@ use magicblock_rpc_client::{ MagicBlockSendTransactionOutcome, MagicblockRpcClient, }; use magicblock_table_mania::{error::TableManiaError, TableMania}; -use solana_account::ReadableAccount; use solana_pubkey::Pubkey; use solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -67,28 +65,20 @@ impl DeliveryPreparator { ) -> DeliveryPreparatorResult> { let preparation_futures = strategy.optimized_tasks.iter_mut().map(|task| async move { - let timer = + let _timer = metrics::observe_committor_intent_task_preparation_time( task.as_ref(), ); - let res = self - .prepare_task_handling_errors(authority, task, persister) - .await; - timer.stop_and_record(); - - res + self.prepare_task_handling_errors(authority, task, persister) + .await }); let task_preparations = join_all(preparation_futures); let alts_preparations = async { - let timer = + let _timer = metrics::observe_committor_intent_alt_preparation_time(); - let res = self - .prepare_lookup_tables(authority, &strategy.lookup_tables_keys) - .await; - timer.stop_and_record(); - - res + self.prepare_lookup_tables(authority, &strategy.lookup_tables_keys) + .await }; let (res1, res2) = join(task_preparations, alts_preparations).await; @@ -247,30 +237,22 @@ impl DeliveryPreparator { Ok(()) } + /// Fills up initialized buffer async fn write_buffer_with_retries( &self, authority: &Keypair, preparation_task: &PreparationTask, ) -> DeliveryPreparatorResult<(), InternalError> { let authority_pubkey = authority.pubkey(); - let chunks_pda = preparation_task.chunks_pda(&authority_pubkey); let write_instructions = preparation_task.write_instructions(&authority_pubkey); - let chunks = if let Some(account) = - self.rpc_client.get_account(&chunks_pda).await? - { - Ok(Chunks::try_from_slice(account.data())?) - } else { - error!( - "Chunks PDA does not exist for writing. pda: {}", - chunks_pda - ); - Err(InternalError::ChunksPDAMissingError(chunks_pda)) - }?; - - self.write_missing_chunks(authority, &chunks, &write_instructions) - .await + self.write_missing_chunks( + authority, + &preparation_task.chunks, + &write_instructions, + ) + .await } /// Extract & write missing chunks asynchronously @@ -415,6 +397,10 @@ impl DeliveryPreparator { lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult, InternalError> { + if lookup_table_keys.is_empty() { + return Ok(vec![]); + } + let pubkeys = HashSet::from_iter(lookup_table_keys.iter().copied()); self.table_mania .reserve_pubkeys(authority, &pubkeys) diff --git a/test-integration/test-committor-service/tests/test_ix_commit_local.rs b/test-integration/test-committor-service/tests/test_ix_commit_local.rs index 6aed3a4bd..16ea10441 100644 --- a/test-integration/test-committor-service/tests/test_ix_commit_local.rs +++ b/test-integration/test-committor-service/tests/test_ix_commit_local.rs @@ -247,11 +247,7 @@ async fn test_commit_5_accounts_1kb_bundle_size_3() { async fn test_commit_5_accounts_1kb_bundle_size_3_undelegate_all() { commit_5_accounts_1kb( 3, - expect_strategies(&[ - // Intent fits in 1 TX only with ALT, see IntentExecutorImpl::try_unite_tasks - (CommitStrategy::FromBufferWithLookupTable, 3), - (CommitStrategy::FromBuffer, 2), - ]), + expect_strategies(&[(CommitStrategy::FromBuffer, 5)]), true, ) .await;