From d34fac7c0bf704d06eb99372f787d9cc5b29dc91 Mon Sep 17 00:00:00 2001 From: Sarfaraz Nawaz Date: Tue, 18 Nov 2025 18:04:49 +0400 Subject: [PATCH] refactor: Simplify Task types --- magicblock-committor-service/README.md | 10 +- .../src/intent_executor/error.rs | 6 +- .../src/intent_executor/mod.rs | 12 +- .../src/tasks/args_task.rs | 154 ----- .../src/tasks/buffer_lifecycle.rs | 215 +++++++ .../src/tasks/buffer_task.rs | 127 ---- .../src/tasks/commit_task.rs | 280 +++++++++ .../src/tasks/commit_task_builder.rs | 52 ++ magicblock-committor-service/src/tasks/mod.rs | 595 +++--------------- .../src/tasks/task.rs | 158 +++++ .../src/tasks/task_builder.rs | 57 +- .../src/tasks/task_strategist.rs | 92 ++- .../tasks/task_visitors/persistor_visitor.rs | 36 +- .../tasks/task_visitors/utility_visitor.rs | 20 +- .../src/tasks/utils.rs | 14 +- .../src/tasks/visitor.rs | 5 +- .../delivery_preparator.rs | 18 +- .../src/transaction_preparator/mod.rs | 8 +- 18 files changed, 900 insertions(+), 959 deletions(-) delete mode 100644 magicblock-committor-service/src/tasks/args_task.rs create mode 100644 magicblock-committor-service/src/tasks/buffer_lifecycle.rs delete mode 100644 magicblock-committor-service/src/tasks/buffer_task.rs create mode 100644 magicblock-committor-service/src/tasks/commit_task.rs create mode 100644 magicblock-committor-service/src/tasks/commit_task_builder.rs create mode 100644 magicblock-committor-service/src/tasks/task.rs diff --git a/magicblock-committor-service/README.md b/magicblock-committor-service/README.md index 714dfcb0e..891d7ad5a 100644 --- a/magicblock-committor-service/README.md +++ b/magicblock-committor-service/README.md @@ -23,18 +23,18 @@ IntentExecutor - responsible for execution of Intent. Calls **TransactionPrepar TransactionPreparator - is an entity that handles all of the above "Transaction preparation" calling **TaskBuilderV1**, **TaskStrategist**, **DeliveryPreparator** and then assempling it all and passing to **MessageExecutor** ## DeliveryPreparator -After our **BaseTask**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers +After our **Task**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers ## TaskBuilder First, lets build atomic tasks from scheduled message/intent. -High level: TaskBuilder responsible for creating BaseTasks(to be renamed...) from ScheduledBaseIntent(to be renamed...). +High level: TaskBuilder responsible for creating Tasks(to be renamed...) from ScheduledBaseIntent(to be renamed...). Details: To do that is requires additional information from DelegationMetadata, it is provided **CommitIdFetcher** -### BaseTask -High level: BaseTask - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action. +### Task +High level: Task - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action. -Details: There's to implementation of BaseTask: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits +Details: There's to implementation of Task: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits ### TaskInfoFetcher High level: for account to be accepted by `dlp` it needs to have incremental commit ids. TaskInfoFetcher provides a user with the correct ids/nonces for set of committees diff --git a/magicblock-committor-service/src/intent_executor/error.rs b/magicblock-committor-service/src/intent_executor/error.rs index 62199ac56..989ea71a3 100644 --- a/magicblock-committor-service/src/intent_executor/error.rs +++ b/magicblock-committor-service/src/intent_executor/error.rs @@ -12,7 +12,7 @@ use solana_sdk::{ use crate::{ tasks::{ task_builder::TaskBuilderError, task_strategist::TaskStrategistError, - BaseTask, TaskType, + Task, TaskType, }, transaction_preparator::error::TransactionPreparatorError, }; @@ -138,7 +138,7 @@ impl TransactionStrategyExecutionError { pub fn try_from_transaction_error( err: TransactionError, signature: Option, - tasks: &[Box], + tasks: &[Task], ) -> Result { // There's always 2 budget instructions in front const OFFSET: u8 = 2; @@ -196,7 +196,7 @@ impl TransactionStrategyExecutionError { } pub(crate) struct IntentTransactionErrorMapper<'a> { - pub tasks: &'a [Box], + pub tasks: &'a [Task], } impl TransactionErrorMapper for IntentTransactionErrorMapper<'_> { type ExecutionError = TransactionStrategyExecutionError; diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index 0f7a59fea..ae707f813 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -53,7 +53,7 @@ use crate::{ TaskStrategist, TaskStrategistError, TransactionStrategy, }, task_visitors::utility_visitor::TaskVisitorUtils, - BaseTask, TaskType, + Task, TaskType, }, transaction_preparator::{ error::TransactionPreparatorError, TransactionPreparator, @@ -127,8 +127,8 @@ where /// Checks if it is possible to unite Commit & Finalize stages in 1 transaction /// Returns corresponding `TransactionStrategy` if possible, otherwise `None` fn try_unite_tasks( - commit_tasks: &[Box], - finalize_task: &[Box], + commit_tasks: &[Task], + finalize_task: &[Task], authority: &Pubkey, persister: &Option

, ) -> Result, SignerError> { @@ -179,7 +179,7 @@ where } // Build tasks for commit stage - let commit_tasks = TaskBuilderImpl::commit_tasks( + let commit_tasks = TaskBuilderImpl::create_commit_tasks( &self.task_info_fetcher, &base_intent, persister, @@ -626,7 +626,7 @@ where async fn execute_message_with_retries( &self, prepared_message: VersionedMessage, - tasks: &[Box], + tasks: &[Task], ) -> IntentExecutorResult { struct IntentErrorMapper { @@ -837,7 +837,7 @@ mod tests { let intent = create_test_intent(0, &pubkey); let info_fetcher = Arc::new(MockInfoFetcher); - let commit_task = TaskBuilderImpl::commit_tasks( + let commit_task = TaskBuilderImpl::create_commit_tasks( &info_fetcher, &intent, &None::, diff --git a/magicblock-committor-service/src/tasks/args_task.rs b/magicblock-committor-service/src/tasks/args_task.rs deleted file mode 100644 index a2a0342ec..000000000 --- a/magicblock-committor-service/src/tasks/args_task.rs +++ /dev/null @@ -1,154 +0,0 @@ -use dlp::args::CallHandlerArgs; -use solana_pubkey::Pubkey; -use solana_sdk::instruction::{AccountMeta, Instruction}; - -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::tasks::{ - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - BaseActionTask, BaseTask, BaseTaskError, BaseTaskResult, CommitTask, - FinalizeTask, PreparationState, TaskType, UndelegateTask, -}; - -/// Task that will be executed on Base layer via arguments -#[derive(Clone)] -pub enum ArgsTaskType { - Commit(CommitTask), - Finalize(FinalizeTask), - Undelegate(UndelegateTask), // Special action really - BaseAction(BaseActionTask), -} - -#[derive(Clone)] -pub struct ArgsTask { - preparation_state: PreparationState, - pub task_type: ArgsTaskType, -} - -impl From for ArgsTask { - fn from(value: ArgsTaskType) -> Self { - Self::new(value) - } -} - -impl ArgsTask { - pub fn new(task_type: ArgsTaskType) -> Self { - Self { - preparation_state: PreparationState::NotNeeded, - task_type, - } - } -} - -impl BaseTask for ArgsTask { - fn instruction(&self, validator: &Pubkey) -> Instruction { - match &self.task_type { - ArgsTaskType::Commit(value) => value.create_commit_ix(validator), - ArgsTaskType::Finalize(value) => { - dlp::instruction_builder::finalize( - *validator, - value.delegated_account, - ) - } - ArgsTaskType::Undelegate(value) => { - dlp::instruction_builder::undelegate( - *validator, - value.delegated_account, - value.owner_program, - value.rent_reimbursement, - ) - } - ArgsTaskType::BaseAction(value) => { - let action = &value.action; - let account_metas = action - .account_metas_per_program - .iter() - .map(|short_meta| AccountMeta { - pubkey: short_meta.pubkey, - is_writable: short_meta.is_writable, - is_signer: false, - }) - .collect(); - dlp::instruction_builder::call_handler( - *validator, - action.destination_program, - action.escrow_authority, - account_metas, - CallHandlerArgs { - data: action.data_per_program.data.clone(), - escrow_index: action.data_per_program.escrow_index, - }, - ) - } - } - } - - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - match self.task_type { - ArgsTaskType::Commit(value) => { - Ok(Box::new(BufferTask::new_preparation_required( - BufferTaskType::Commit(value.switch_to_buffer_strategy()), - ))) - } - ArgsTaskType::BaseAction(_) - | ArgsTaskType::Finalize(_) - | ArgsTaskType::Undelegate(_) => Err(self), - } - } - - /// Nothing to prepare for [`ArgsTaskType`] type - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if !matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - // Do nothing - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match &self.task_type { - ArgsTaskType::Commit(_) => 70_000, - ArgsTaskType::BaseAction(task) => task.action.compute_units, - ArgsTaskType::Undelegate(_) => 70_000, - ArgsTaskType::Finalize(_) => 70_000, - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Args - } - - fn task_type(&self) -> TaskType { - match &self.task_type { - ArgsTaskType::Commit(_) => TaskType::Commit, - ArgsTaskType::BaseAction(_) => TaskType::Action, - ArgsTaskType::Undelegate(_) => TaskType::Undelegate, - ArgsTaskType::Finalize(_) => TaskType::Finalize, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_args_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - let ArgsTaskType::Commit(commit_task) = &mut self.task_type else { - return; - }; - - commit_task.commit_id = commit_id; - } -} diff --git a/magicblock-committor-service/src/tasks/buffer_lifecycle.rs b/magicblock-committor-service/src/tasks/buffer_lifecycle.rs new file mode 100644 index 000000000..9272f0445 --- /dev/null +++ b/magicblock-committor-service/src/tasks/buffer_lifecycle.rs @@ -0,0 +1,215 @@ +use magicblock_committor_program::{ + instruction_builder::{ + close_buffer::{create_close_ix, CreateCloseIxArgs}, + init_buffer::{create_init_ix, CreateInitIxArgs}, + realloc_buffer::{ + create_realloc_buffer_ixs, CreateReallocBufferIxArgs, + }, + write_buffer::{create_write_ix, CreateWriteIxArgs}, + }, + pdas, ChangesetChunks, Chunks, +}; +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use solana_account::Account; +use solana_pubkey::Pubkey; +use solana_sdk::instruction::Instruction; + +use crate::consts::MAX_WRITE_CHUNK_SIZE; + +#[derive(Debug, Clone)] +pub struct BufferLifecycle { + // TODO (snawaz): rename + // PreparationTask -> CreateBufferTask + // CleanupTask -> DestroyBufferTask + pub preparation: PreparationTask, + pub cleanup: CleanupTask, +} + +impl BufferLifecycle { + pub fn new( + commit_id: u64, + account: &CommittedAccount, + base_account: Option<&Account>, + ) -> BufferLifecycle { + let data = if let Some(base_account) = base_account { + dlp::compute_diff(&base_account.data, &account.account.data) + .to_vec() + } else { + account.account.data.clone() + }; + + BufferLifecycle { + preparation: PreparationTask { + commit_id, + pubkey: account.pubkey, + chunks: Chunks::from_data_length( + data.len(), + MAX_WRITE_CHUNK_SIZE, + ), + state_or_diff: data, + }, + cleanup: CleanupTask { + pubkey: account.pubkey, + commit_id, + }, + } + } +} + +#[derive(Clone, Debug)] +pub struct PreparationTask { + pub commit_id: u64, + pub pubkey: Pubkey, + pub chunks: Chunks, + pub state_or_diff: Vec, +} + +impl PreparationTask { + /// Returns initialization [`Instruction`] + pub fn init_instruction(&self, authority: &Pubkey) -> Instruction { + // // SAFETY: as object_length internally uses only already allocated or static buffers, + // // and we don't use any fs writers, so the only error that may occur here is of kind + // // OutOfMemory or WriteZero. This is impossible due to: + // // Chunks::new panics if its size exceeds MAX_ACCOUNT_ALLOC_PER_INSTRUCTION_SIZE or 10_240 + // // https://github.com/near/borsh-rs/blob/f1b75a6b50740bfb6231b7d0b1bd93ea58ca5452/borsh/src/ser/helpers.rs#L59 + let chunks_account_size = + borsh::object_length(&self.chunks).unwrap() as u64; + let buffer_account_size = self.state_or_diff.len() as u64; + + let (instruction, _, _) = create_init_ix(CreateInitIxArgs { + authority: *authority, + pubkey: self.pubkey, + chunks_account_size, + buffer_account_size, + commit_id: self.commit_id, + chunk_count: self.chunks.count(), + chunk_size: self.chunks.chunk_size(), + }); + + instruction + } + + /// Returns compute units required for realloc instruction + pub fn init_compute_units(&self) -> u32 { + 12_000 + } + + /// Returns realloc instruction required for Buffer preparation + #[allow(clippy::let_and_return)] + pub fn realloc_instructions(&self, authority: &Pubkey) -> Vec { + let buffer_account_size = self.state_or_diff.len() as u64; + let realloc_instructions = + create_realloc_buffer_ixs(CreateReallocBufferIxArgs { + authority: *authority, + pubkey: self.pubkey, + buffer_account_size, + commit_id: self.commit_id, + }); + + realloc_instructions + } + + /// Returns compute units required for realloc instruction + pub fn realloc_compute_units(&self) -> u32 { + 6_000 + } + + /// Returns realloc instruction required for Buffer preparation + #[allow(clippy::let_and_return)] + pub fn write_instructions(&self, authority: &Pubkey) -> Vec { + let chunks_iter = + ChangesetChunks::new(&self.chunks, self.chunks.chunk_size()) + .iter(&self.state_or_diff); + let write_instructions = chunks_iter + .map(|chunk| { + create_write_ix(CreateWriteIxArgs { + authority: *authority, + pubkey: self.pubkey, + offset: chunk.offset, + data_chunk: chunk.data_chunk, + commit_id: self.commit_id, + }) + }) + .collect::>(); + + write_instructions + } + + pub fn write_compute_units(&self, bytes_count: usize) -> u32 { + const PER_BYTE: u32 = 3; + + u32::try_from(bytes_count) + .ok() + .and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE)) + .unwrap_or(u32::MAX) + } + + pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::chunks_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::buffer_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn cleanup_task(&self) -> CleanupTask { + CleanupTask { + pubkey: self.pubkey, + commit_id: self.commit_id, + } + } +} + +#[derive(Clone, Debug)] +pub struct CleanupTask { + pub pubkey: Pubkey, + pub commit_id: u64, +} + +impl CleanupTask { + pub fn instruction(&self, authority: &Pubkey) -> Instruction { + create_close_ix(CreateCloseIxArgs { + authority: *authority, + pubkey: self.pubkey, + commit_id: self.commit_id, + }) + } + + /// Returns compute units required to execute [`CleanupTask`] + pub fn compute_units(&self) -> u32 { + 30_000 + } + + /// Returns a number of [`CleanupTask`]s that is possible to fit in single + pub const fn max_tx_fit_count_with_budget() -> usize { + 8 + } + + pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::chunks_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::buffer_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } +} diff --git a/magicblock-committor-service/src/tasks/buffer_task.rs b/magicblock-committor-service/src/tasks/buffer_task.rs deleted file mode 100644 index caeb77f50..000000000 --- a/magicblock-committor-service/src/tasks/buffer_task.rs +++ /dev/null @@ -1,127 +0,0 @@ -use magicblock_committor_program::Chunks; -use solana_pubkey::Pubkey; -use solana_sdk::instruction::Instruction; - -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::{ - consts::MAX_WRITE_CHUNK_SIZE, - tasks::{ - visitor::Visitor, BaseTask, BaseTaskError, BaseTaskResult, CommitTask, - PreparationState, PreparationTask, TaskType, - }, -}; - -/// Tasks that could be executed using buffers -#[derive(Clone)] -pub enum BufferTaskType { - Commit(CommitTask), - // Action in the future -} - -#[derive(Clone)] -pub struct BufferTask { - preparation_state: PreparationState, - pub task_type: BufferTaskType, -} - -impl BufferTask { - pub fn new_preparation_required(task_type: BufferTaskType) -> Self { - Self { - preparation_state: Self::preparation_required(&task_type), - task_type, - } - } - - pub fn new( - preparation_state: PreparationState, - task_type: BufferTaskType, - ) -> Self { - Self { - preparation_state, - task_type, - } - } - - fn preparation_required(task_type: &BufferTaskType) -> PreparationState { - let BufferTaskType::Commit(ref commit_task) = task_type; - let state_or_diff = if let Some(diff) = commit_task.compute_diff() { - diff.to_vec() - } else { - commit_task.committed_account.account.data.clone() - }; - let chunks = - Chunks::from_data_length(state_or_diff.len(), MAX_WRITE_CHUNK_SIZE); - - PreparationState::Required(PreparationTask { - commit_id: commit_task.commit_id, - pubkey: commit_task.committed_account.pubkey, - committed_data: state_or_diff, - chunks, - }) - } -} - -impl BaseTask for BufferTask { - fn instruction(&self, validator: &Pubkey) -> Instruction { - let BufferTaskType::Commit(ref value) = self.task_type; - value.create_commit_ix(validator) - } - - /// No further optimizations - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - // Since the buffer in BufferTask doesn't contribute to the size of - // transaction, there is nothing we can do here to optimize/reduce the size. - Err(self) - } - - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - self.preparation_state = new_state; - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match self.task_type { - BufferTaskType::Commit(_) => 70_000, - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Buffer - } - - fn task_type(&self) -> TaskType { - match self.task_type { - BufferTaskType::Commit(_) => TaskType::Commit, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_buffer_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - let BufferTaskType::Commit(commit_task) = &mut self.task_type; - if commit_id == commit_task.commit_id { - return; - } - - commit_task.commit_id = commit_id; - self.preparation_state = Self::preparation_required(&self.task_type) - } -} diff --git a/magicblock-committor-service/src/tasks/commit_task.rs b/magicblock-committor-service/src/tasks/commit_task.rs new file mode 100644 index 000000000..9e76166d0 --- /dev/null +++ b/magicblock-committor-service/src/tasks/commit_task.rs @@ -0,0 +1,280 @@ +use core::panic; +use std::sync::Arc; + +use dlp::{ + args::{CommitDiffArgs, CommitStateArgs, CommitStateFromBufferArgs}, + compute_diff, +}; +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use solana_account::{Account, ReadableAccount}; +use solana_pubkey::Pubkey; +use solana_sdk::instruction::Instruction; + +use crate::intent_executor::task_info_fetcher::TaskInfoFetcher; + +use super::{BufferLifecycle, TaskStrategy}; + +#[derive(Debug, Clone)] +pub enum CommitStrategy { + StateInArgs, + StateInBuffer { + lifecycle: BufferLifecycle, + }, + DiffInArgs { + base_account: Account, + }, + DiffInBuffer { + base_account: Account, + lifecycle: BufferLifecycle, + }, +} + +// CommitTask owns both "what to commit" (committed_account) and "how to commit" (strategy). +#[derive(Debug, Clone)] +pub struct CommitTask { + pub commit_id: u64, + pub allow_undelegation: bool, + pub committed_account: CommittedAccount, + pub strategy: CommitStrategy, +} + +impl CommitTask { + // Accounts larger than COMMIT_STATE_SIZE_THRESHOLD, use CommitDiff to + // reduce instruction size. Below this, commit is sent as CommitState. + // Chose 256 as thresold seems good enough as it could hold 8 u32 fields + // or 4 u64 fields! + pub const COMMIT_STATE_SIZE_THRESHOLD: usize = 256; + + pub async fn new( + commit_id: u64, + allow_undelegation: bool, + committed_account: CommittedAccount, + task_info_fetcher: &Arc, + ) -> Self { + let base_account = if committed_account.account.data.len() + > CommitTask::COMMIT_STATE_SIZE_THRESHOLD + { + match task_info_fetcher + .get_base_account(&committed_account.pubkey) + .await + { + Ok(Some(account)) => Some(account), + Ok(None) => { + log::warn!("AccountNotFound for commit_diff, pubkey: {}, commit_id: {}, Falling back to commit_state.", + committed_account.pubkey, commit_id); + None + } + Err(e) => { + log::warn!("Failed to fetch base account for commit diff, pubkey: {}, commit_id: {}, error: {}. Falling back to commit_state.", + committed_account.pubkey, commit_id, e); + None + } + } + } else { + None + }; + + Self { + commit_id, + allow_undelegation, + committed_account, + strategy: match base_account { + Some(base_account) => { + CommitStrategy::DiffInArgs { base_account } + } + None => CommitStrategy::StateInArgs, + }, + } + } + + pub fn lifecycle(&self) -> Option<&BufferLifecycle> { + match &self.strategy { + CommitStrategy::StateInArgs => None, + CommitStrategy::StateInBuffer { lifecycle } => Some(&lifecycle), + CommitStrategy::DiffInArgs { base_account: _ } => None, + CommitStrategy::DiffInBuffer { + lifecycle, + base_account: _, + } => Some(&lifecycle), + } + } + + pub fn task_strategy(&self) -> TaskStrategy { + match &self.strategy { + CommitStrategy::StateInArgs => TaskStrategy::Args, + CommitStrategy::StateInBuffer { .. } => TaskStrategy::Buffer, + CommitStrategy::DiffInArgs { base_account: _ } => { + TaskStrategy::Args + } + CommitStrategy::DiffInBuffer { .. } => TaskStrategy::Buffer, + } + } + + pub fn reset_commit_id(&mut self, commit_id: u64) { + if self.commit_id == commit_id { + return; + } + + self.commit_id = commit_id; + let lifecycle = match &mut self.strategy { + CommitStrategy::StateInArgs => None, + CommitStrategy::StateInBuffer { lifecycle } => Some(lifecycle), + CommitStrategy::DiffInArgs { base_account: _ } => None, + CommitStrategy::DiffInBuffer { + base_account: _, + lifecycle, + } => Some(lifecycle), + }; + + if let Some(lifecycle) = lifecycle { + lifecycle.preparation.commit_id = commit_id; + lifecycle.cleanup.commit_id = commit_id; + } + } + + pub fn create_commit_ix(&self, validator: &Pubkey) -> Instruction { + match &self.strategy { + CommitStrategy::StateInArgs => { + self.create_commit_state_ix(validator) + } + CommitStrategy::StateInBuffer { lifecycle: _ } => { + self.create_commit_state_from_buffer_ix(validator) + } + CommitStrategy::DiffInArgs { base_account } => { + self.create_commit_diff_ix(validator, base_account) + } + CommitStrategy::DiffInBuffer { + base_account: _, + lifecycle: _, + } => self.create_commit_diff_from_buffer_ix(validator), + } + } + + // FIXME: DELETE + pub fn compute_diff(&self) -> Option { + panic!() + } + + fn create_commit_state_ix(&self, validator: &Pubkey) -> Instruction { + let args = CommitStateArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + data: self.committed_account.account.data.clone(), + allow_undelegation: self.allow_undelegation, + }; + dlp::instruction_builder::commit_state( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + fn create_commit_diff_ix( + &self, + validator: &Pubkey, + base_account: &Account, + ) -> Instruction { + let args = CommitDiffArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + diff: compute_diff( + base_account.data(), + self.committed_account.account.data(), + ) + .to_vec(), + allow_undelegation: self.allow_undelegation, + }; + + dlp::instruction_builder::commit_diff( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + fn create_commit_state_from_buffer_ix( + &self, + validator: &Pubkey, + ) -> Instruction { + let commit_id_slice = self.commit_id.to_le_bytes(); + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &commit_id_slice, + ); + + dlp::instruction_builder::commit_state_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + fn create_commit_diff_from_buffer_ix( + &self, + validator: &Pubkey, + ) -> Instruction { + let commit_id_slice = self.commit_id.to_le_bytes(); + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &commit_id_slice, + ); + + dlp::instruction_builder::commit_diff_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + /// + /// In order to reduce the transition size, this function + /// flips *_InArgs to *_InBuffer and attach a LifecycleTask. + /// + pub fn try_optimize_tx_size(mut self) -> Result { + // The only way to optimize for tx size is to use buffer strategy. + // If the task is already using buffer strategy, then it cannot optimize further. + match self.strategy { + CommitStrategy::StateInArgs => { + self.strategy = CommitStrategy::StateInBuffer { + lifecycle: BufferLifecycle::new( + self.commit_id, + &self.committed_account, + None, + ), + }; + Ok(self) + } + CommitStrategy::StateInBuffer { .. } => Err(self), + CommitStrategy::DiffInArgs { base_account } => { + self.strategy = CommitStrategy::DiffInBuffer { + lifecycle: BufferLifecycle::new( + self.commit_id, + &self.committed_account, + Some(&base_account), + ), + base_account, + }; + Ok(self) + } + CommitStrategy::DiffInBuffer { .. } => Err(self), + } + } +} diff --git a/magicblock-committor-service/src/tasks/commit_task_builder.rs b/magicblock-committor-service/src/tasks/commit_task_builder.rs new file mode 100644 index 000000000..a6cf3b803 --- /dev/null +++ b/magicblock-committor-service/src/tasks/commit_task_builder.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; + +use super::{CommitStrategy, CommitTask}; +use crate::intent_executor::task_info_fetcher::TaskInfoFetcher; + +pub struct CommitTaskBuilder; + +impl CommitTaskBuilder { + pub async fn create_commit_task( + commit_id: u64, + allow_undelegation: bool, + committed_account: CommittedAccount, + task_info_fetcher: &Arc, + ) -> CommitTask { + let base_account = if committed_account.account.data.len() + > CommitTask::COMMIT_STATE_SIZE_THRESHOLD + { + match task_info_fetcher + .get_base_account(&committed_account.pubkey) + .await + { + Ok(Some(account)) => Some(account), + Ok(None) => { + log::warn!("AccountNotFound for commit_diff, pubkey: {}, commit_id: {}, Falling back to commit_state.", + committed_account.pubkey, commit_id); + None + } + Err(e) => { + log::warn!("Failed to fetch base account for commit diff, pubkey: {}, commit_id: {}, error: {}. Falling back to commit_state.", + committed_account.pubkey, commit_id, e); + None + } + } + } else { + None + }; + + CommitTask { + commit_id, + allow_undelegation, + committed_account, + strategy: match base_account { + Some(base_account) => { + CommitStrategy::DiffInArgs { base_account } + } + None => CommitStrategy::StateInArgs, + }, + } + } +} diff --git a/magicblock-committor-service/src/tasks/mod.rs b/magicblock-committor-service/src/tasks/mod.rs index 4bd7aaf21..3278e365c 100644 --- a/magicblock-committor-service/src/tasks/mod.rs +++ b/magicblock-committor-service/src/tasks/mod.rs @@ -1,42 +1,31 @@ -use std::sync::Arc; - -use dlp::{ - args::{CommitDiffArgs, CommitStateArgs, CommitStateFromBufferArgs}, - compute_diff, -}; -use dyn_clone::DynClone; -use magicblock_committor_program::{ - instruction_builder::{ - close_buffer::{create_close_ix, CreateCloseIxArgs}, - init_buffer::{create_init_ix, CreateInitIxArgs}, - realloc_buffer::{ - create_realloc_buffer_ixs, CreateReallocBufferIxArgs, - }, - write_buffer::{create_write_ix, CreateWriteIxArgs}, - }, - pdas, ChangesetChunks, Chunks, -}; -use magicblock_program::magic_scheduled_base_intent::{ - BaseAction, CommittedAccount, -}; -use solana_account::{Account, ReadableAccount}; +use std::fmt::Debug; + +use magicblock_program::magic_scheduled_base_intent::BaseAction; use solana_pubkey::Pubkey; -use solana_sdk::instruction::Instruction; use thiserror::Error; -use crate::{ - intent_executor::task_info_fetcher::TaskInfoFetcher, - tasks::visitor::Visitor, -}; - -pub mod args_task; -pub mod buffer_task; +pub mod task; pub mod task_builder; pub mod task_strategist; pub(crate) mod task_visitors; pub mod utils; pub mod visitor; +mod buffer_lifecycle; +mod commit_task; +mod commit_task_builder; + +pub use buffer_lifecycle::*; +pub use commit_task::*; +pub use commit_task_builder::*; +pub use task::*; +// +// TODO (snawaz): Ideally, TaskType should not exist. +// Instead we should have Task, an enum with all its variants. +// +// Also, instead of TaskStrategy, we can have requires_buffer() -> bool? +// + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum TaskType { Commit, @@ -58,441 +47,45 @@ pub enum TaskStrategy { Buffer, } -/// A trait representing a task that can be executed on Base layer -pub trait BaseTask: Send + Sync + DynClone { - /// Gets all pubkeys that involved in Task's instruction - fn involved_accounts(&self, validator: &Pubkey) -> Vec { - // TODO (snawaz): rewrite it. - // currently it is slow as it discards heavy computations and memory allocations. - self.instruction(validator) - .accounts - .iter() - .map(|meta| meta.pubkey) - .collect() - } - - /// Gets instruction for task execution - fn instruction(&self, validator: &Pubkey) -> Instruction; - - /// Optimize for transaction size so that more instructions can be buddled together in a single - /// transaction. Return Ok(new_tx_optimized_task), else Err(self) if task cannot be optimized. - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box>; - - /// Returns [`PreparationTask`] if task needs to be prepared before executing, - /// otherwise returns None - fn preparation_state(&self) -> &PreparationState; - - /// Switched [`PreparationTask`] to a new one - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()>; - - /// Returns [`Task`] budget - fn compute_units(&self) -> u32; - - /// Returns current [`TaskStrategy`] - #[cfg(test)] - fn strategy(&self) -> TaskStrategy; - - /// Returns [`TaskType`] - fn task_type(&self) -> TaskType; - - /// Calls [`Visitor`] with specific task type - fn visit(&self, visitor: &mut dyn Visitor); - - /// Resets commit id - fn reset_commit_id(&mut self, commit_id: u64); -} - -dyn_clone::clone_trait_object!(BaseTask); - -pub struct CommitTaskBuilder; - -impl CommitTaskBuilder { - // Accounts larger than COMMIT_STATE_SIZE_THRESHOLD, use CommitDiff to - // reduce instruction size. Below this, commit is sent as CommitState. - // Chose 256 as thresold seems good enough as it could hold 8 u32 fields - // or 4 u64 fields. - const COMMIT_STATE_SIZE_THRESHOLD: usize = 256; - - pub async fn create_commit_task( - commit_id: u64, - allow_undelegation: bool, - committed_account: CommittedAccount, - task_info_fetcher: &Arc, - ) -> CommitTask { - let base_account = if committed_account.account.data.len() - > CommitTaskBuilder::COMMIT_STATE_SIZE_THRESHOLD - { - match task_info_fetcher - .get_base_account(&committed_account.pubkey) - .await - { - Ok(Some(account)) => Some(account), - Ok(None) => { - log::warn!("AccountNotFound for commit_diff, pubkey: {}, commit_id: {}, Falling back to commit_state.", - committed_account.pubkey, commit_id); - None - } - Err(e) => { - log::warn!("Failed to fetch base account for commit diff, pubkey: {}, commit_id: {}, error: {}. Falling back to commit_state.", - committed_account.pubkey, commit_id, e); - None - } - } - } else { - None - }; - - CommitTask { - commit_id, - allow_undelegation, - committed_account, - base_account, - strategy: TaskStrategy::Args, - } - } -} - -#[derive(Clone)] -pub struct CommitTask { - pub commit_id: u64, - pub allow_undelegation: bool, - pub committed_account: CommittedAccount, - base_account: Option, - strategy: TaskStrategy, -} - -impl CommitTask { - pub fn switch_to_buffer_strategy(mut self) -> Self { - self.strategy = TaskStrategy::Buffer; - self - } - - pub fn create_commit_ix(&self, validator: &Pubkey) -> Instruction { - match self.strategy { - TaskStrategy::Args => { - if let Some(base_account) = self.base_account.as_ref() { - self.create_commit_diff_ix(validator, base_account) - } else { - self.create_commit_state_ix(validator) - } - } - TaskStrategy::Buffer => { - if let Some(base_account) = self.base_account.as_ref() { - self.create_commit_diff_from_buffer_ix( - validator, - base_account, - ) - } else { - self.create_commit_state_from_buffer_ix(validator) - } - } - } - } - - pub fn compute_diff(&self) -> Option { - self.base_account.as_ref().map(|base_account| { - compute_diff( - base_account.data(), - self.committed_account.account.data(), - ) - }) - } - - fn create_commit_state_ix(&self, validator: &Pubkey) -> Instruction { - let args = CommitStateArgs { - nonce: self.commit_id, - lamports: self.committed_account.account.lamports, - data: self.committed_account.account.data.clone(), - allow_undelegation: self.allow_undelegation, - }; - dlp::instruction_builder::commit_state( - *validator, - self.committed_account.pubkey, - self.committed_account.account.owner, - args, - ) - } - - fn create_commit_diff_ix( - &self, - validator: &Pubkey, - base_account: &Account, - ) -> Instruction { - let args = CommitDiffArgs { - nonce: self.commit_id, - lamports: self.committed_account.account.lamports, - diff: compute_diff( - base_account.data(), - self.committed_account.account.data(), - ) - .to_vec(), - allow_undelegation: self.allow_undelegation, - }; - - dlp::instruction_builder::commit_diff( - *validator, - self.committed_account.pubkey, - self.committed_account.account.owner, - args, - ) - } - - fn create_commit_state_from_buffer_ix( - &self, - validator: &Pubkey, - ) -> Instruction { - let commit_id_slice = self.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &self.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_state_from_buffer( - *validator, - self.committed_account.pubkey, - self.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: self.commit_id, - lamports: self.committed_account.account.lamports, - allow_undelegation: self.allow_undelegation, - }, - ) - } - - fn create_commit_diff_from_buffer_ix( - &self, - validator: &Pubkey, - _fetched_account: &Account, - ) -> Instruction { - let commit_id_slice = self.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &self.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_diff_from_buffer( - *validator, - self.committed_account.pubkey, - self.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: self.commit_id, - lamports: self.committed_account.account.lamports, - allow_undelegation: self.allow_undelegation, - }, - ) - } -} - -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct UndelegateTask { pub delegated_account: Pubkey, pub owner_program: Pubkey, pub rent_reimbursement: Pubkey, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct FinalizeTask { pub delegated_account: Pubkey, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct BaseActionTask { pub action: BaseAction, } -#[derive(Clone, Debug)] -pub struct PreparationTask { - pub commit_id: u64, - pub pubkey: Pubkey, - pub chunks: Chunks, - - // TODO(edwin): replace with reference once done - pub committed_data: Vec, -} - -impl PreparationTask { - /// Returns initialization [`Instruction`] - pub fn init_instruction(&self, authority: &Pubkey) -> Instruction { - // // SAFETY: as object_length internally uses only already allocated or static buffers, - // // and we don't use any fs writers, so the only error that may occur here is of kind - // // OutOfMemory or WriteZero. This is impossible due to: - // // Chunks::new panics if its size exceeds MAX_ACCOUNT_ALLOC_PER_INSTRUCTION_SIZE or 10_240 - // // https://github.com/near/borsh-rs/blob/f1b75a6b50740bfb6231b7d0b1bd93ea58ca5452/borsh/src/ser/helpers.rs#L59 - let chunks_account_size = - borsh::object_length(&self.chunks).unwrap() as u64; - let buffer_account_size = self.committed_data.len() as u64; - - let (instruction, _, _) = create_init_ix(CreateInitIxArgs { - authority: *authority, - pubkey: self.pubkey, - chunks_account_size, - buffer_account_size, - commit_id: self.commit_id, - chunk_count: self.chunks.count(), - chunk_size: self.chunks.chunk_size(), - }); - - instruction - } - - /// Returns compute units required for realloc instruction - pub fn init_compute_units(&self) -> u32 { - 12_000 - } - - /// Returns realloc instruction required for Buffer preparation - #[allow(clippy::let_and_return)] - pub fn realloc_instructions(&self, authority: &Pubkey) -> Vec { - let buffer_account_size = self.committed_data.len() as u64; - let realloc_instructions = - create_realloc_buffer_ixs(CreateReallocBufferIxArgs { - authority: *authority, - pubkey: self.pubkey, - buffer_account_size, - commit_id: self.commit_id, - }); - - realloc_instructions - } - - /// Returns compute units required for realloc instruction - pub fn realloc_compute_units(&self) -> u32 { - 6_000 - } - - /// Returns realloc instruction required for Buffer preparation - #[allow(clippy::let_and_return)] - pub fn write_instructions(&self, authority: &Pubkey) -> Vec { - let chunks_iter = - ChangesetChunks::new(&self.chunks, self.chunks.chunk_size()) - .iter(&self.committed_data); - let write_instructions = chunks_iter - .map(|chunk| { - create_write_ix(CreateWriteIxArgs { - authority: *authority, - pubkey: self.pubkey, - offset: chunk.offset, - data_chunk: chunk.data_chunk, - commit_id: self.commit_id, - }) - }) - .collect::>(); - - write_instructions - } - - pub fn write_compute_units(&self, bytes_count: usize) -> u32 { - const PER_BYTE: u32 = 3; - - u32::try_from(bytes_count) - .ok() - .and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE)) - .unwrap_or(u32::MAX) - } - - pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::chunks_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } - - pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::buffer_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } - - pub fn cleanup_task(&self) -> CleanupTask { - CleanupTask { - pubkey: self.pubkey, - commit_id: self.commit_id, - } - } -} - -#[derive(Clone, Debug)] -pub struct CleanupTask { - pub pubkey: Pubkey, - pub commit_id: u64, -} - -impl CleanupTask { - pub fn instruction(&self, authority: &Pubkey) -> Instruction { - create_close_ix(CreateCloseIxArgs { - authority: *authority, - pubkey: self.pubkey, - commit_id: self.commit_id, - }) - } - - /// Returns compute units required to execute [`CleanupTask`] - pub fn compute_units(&self) -> u32 { - 30_000 - } - - /// Returns a number of [`CleanupTask`]s that is possible to fit in single - pub const fn max_tx_fit_count_with_budget() -> usize { - 8 - } - - pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::chunks_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } - - pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::buffer_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } -} - #[derive(Error, Debug)] -pub enum BaseTaskError { +pub enum TaskError { #[error("Invalid preparation state transition")] PreparationStateTransitionError, } -pub type BaseTaskResult = Result; +pub type TaskResult = Result; #[cfg(test)] mod serialization_safety_test { + use std::sync::Arc; + use magicblock_program::{ - args::ShortAccountMeta, magic_scheduled_base_intent::ProgramArgs, + args::ShortAccountMeta, + magic_scheduled_base_intent::{CommittedAccount, ProgramArgs}, }; use solana_account::Account; + use solana_sdk::instruction::Instruction; use crate::{ intent_executor::NullTaskInfoFetcher, - tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - *, - }, + tasks::{Task, *}, }; // Test all ArgsTask variants @@ -501,8 +94,8 @@ mod serialization_safety_test { let validator = Pubkey::new_unique(); // Test Commit variant - let commit_task: ArgsTask = ArgsTaskType::Commit( - CommitTaskBuilder::create_commit_task( + let commit_task = Task::Commit( + CommitTask::new( 123, true, CommittedAccount { @@ -518,29 +111,25 @@ mod serialization_safety_test { &Arc::new(NullTaskInfoFetcher), ) .await, - ) - .into(); + ); assert_serializable(&commit_task.instruction(&validator)); // Test Finalize variant - let finalize_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); + let finalize_task = Task::Finalize(FinalizeTask { + delegated_account: Pubkey::new_unique(), + }); assert_serializable(&finalize_task.instruction(&validator)); // Test Undelegate variant - let undelegate_task: ArgsTask = - ArgsTaskType::Undelegate(UndelegateTask { - delegated_account: Pubkey::new_unique(), - owner_program: Pubkey::new_unique(), - rent_reimbursement: Pubkey::new_unique(), - }) - .into(); + let undelegate_task = Task::Undelegate(UndelegateTask { + delegated_account: Pubkey::new_unique(), + owner_program: Pubkey::new_unique(), + rent_reimbursement: Pubkey::new_unique(), + }); assert_serializable(&undelegate_task.instruction(&validator)); // Test BaseAction variant - let base_action: ArgsTask = ArgsTaskType::BaseAction(BaseActionTask { + let base_action = Task::BaseAction(BaseActionTask { action: BaseAction { destination_program: Pubkey::new_unique(), escrow_authority: Pubkey::new_unique(), @@ -554,69 +143,70 @@ mod serialization_safety_test { }, compute_units: 10_000, }, - }) - .into(); + }); assert_serializable(&base_action.instruction(&validator)); } // Test BufferTask variants #[tokio::test] - async fn test_buffer_task_instruction_serialization() { + async fn test_task_instruction_serialization() { let validator = Pubkey::new_unique(); - let buffer_task = - BufferTask::new_preparation_required(BufferTaskType::Commit( - CommitTaskBuilder::create_commit_task( - 456, - false, - CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 2000, - data: vec![7, 8, 9], - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, + let task = Task::Commit( + CommitTask::new( + 456, + false, + CommittedAccount { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 2000, + data: vec![7, 8, 9], + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: 0, }, - &Arc::new(NullTaskInfoFetcher), - ) - .await, - )); - assert_serializable(&buffer_task.instruction(&validator)); + }, + &Arc::new(NullTaskInfoFetcher), + ) + .await, + ); + assert_serializable(&task.instruction(&validator)); } - // Test preparation instructions + // // Test preparation instructions #[tokio::test] async fn test_preparation_instructions_serialization() { let authority = Pubkey::new_unique(); - // Test BufferTask preparation - let buffer_task = - BufferTask::new_preparation_required(BufferTaskType::Commit( - CommitTaskBuilder::create_commit_task( - 789, - true, - CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 3000, - data: vec![0; 1024], // Larger data to test chunking - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, + // Test buffer strategy preparation + let task = Task::Commit( + CommitTask::new( + 789, + true, + CommittedAccount { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 3000, + data: vec![0; 1024], // Larger data to test chunking + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: 0, }, - &Arc::new(NullTaskInfoFetcher), - ) - .await, - )); - - let PreparationState::Required(preparation_task) = - buffer_task.preparation_state() - else { - panic!("invalid preparation state on creation!"); - }; + }, + &Arc::new(NullTaskInfoFetcher), + ) + .await, + ); + + assert_eq!(task.strategy(), TaskStrategy::Args); + + let task = task.try_optimize_tx_size().unwrap(); + + assert_eq!(task.strategy(), TaskStrategy::Buffer); + + let lifecycle = task.lifecycle().unwrap(); + let preparation_task = &lifecycle.preparation; + assert_serializable(&preparation_task.init_instruction(&authority)); for ix in preparation_task.realloc_instructions(&authority) { assert_serializable(&ix); @@ -624,6 +214,9 @@ mod serialization_safety_test { for ix in preparation_task.write_instructions(&authority) { assert_serializable(&ix); } + + let cleanup = &lifecycle.cleanup; + assert_serializable(&cleanup.instruction(&authority)); } // Helper function to assert serialization succeeds diff --git a/magicblock-committor-service/src/tasks/task.rs b/magicblock-committor-service/src/tasks/task.rs new file mode 100644 index 000000000..74fd26e1f --- /dev/null +++ b/magicblock-committor-service/src/tasks/task.rs @@ -0,0 +1,158 @@ +use dlp::args::CallHandlerArgs; +use solana_pubkey::Pubkey; +use solana_sdk::instruction::{AccountMeta, Instruction}; + +use crate::tasks::TaskStrategy; +use crate::tasks::{ + visitor::Visitor, BaseActionTask, CommitTask, FinalizeTask, + PreparationState, TaskError, TaskResult, TaskType, UndelegateTask, +}; + +use super::BufferLifecycle; + +/// Task that will be executed on Base layer via arguments +#[derive(Debug, Clone)] +pub enum Task { + Commit(CommitTask), + Finalize(FinalizeTask), + Undelegate(UndelegateTask), // Special action really + BaseAction(BaseActionTask), +} + +impl Task { + pub fn involved_accounts(&self, validator: &Pubkey) -> Vec { + // TODO (snawaz): rewrite it. + // currently it is slow as it discards heavy computations and memory allocations. + self.instruction(validator) + .accounts + .iter() + .map(|meta| meta.pubkey) + .collect() + } + + pub fn instruction(&self, validator: &Pubkey) -> Instruction { + match &self { + Task::Commit(value) => value.create_commit_ix(validator), + Task::Finalize(value) => dlp::instruction_builder::finalize( + *validator, + value.delegated_account, + ), + Task::Undelegate(value) => dlp::instruction_builder::undelegate( + *validator, + value.delegated_account, + value.owner_program, + value.rent_reimbursement, + ), + Task::BaseAction(value) => { + let action = &value.action; + let account_metas = action + .account_metas_per_program + .iter() + .map(|short_meta| AccountMeta { + pubkey: short_meta.pubkey, + is_writable: short_meta.is_writable, + is_signer: false, + }) + .collect(); + dlp::instruction_builder::call_handler( + *validator, + action.destination_program, + action.escrow_authority, + account_metas, + CallHandlerArgs { + data: action.data_per_program.data.clone(), + escrow_index: action.data_per_program.escrow_index, + }, + ) + } + } + } + + pub fn try_optimize_tx_size(self) -> Result { + // TODO (snawaz): do two things: + // 1. this does not properly handle preparation state as both ArgsTask + // and CommitTask have this. Only CommitTask needs to have this. + // 3. Remove PreparationState. + // 4. Instead have enum LifecycleTask { PreparationTask, CleanupTask } or struct (ee + // [2]. + // 5. NotNeeded is not needed (pun not intended). Instead use Option + // + // ref: + // 1: https://chatgpt.com/s/t_691e1c39f47081919efcc73a2f599cf9 + // 2: https://chatgpt.com/s/t_691e1d7e82a08191963b43c6c8ad7a96 + match self { + Task::Commit(value) => value + .try_optimize_tx_size() + .map(Task::Commit) + .map_err(Task::Commit), + Task::BaseAction(_) | Task::Finalize(_) | Task::Undelegate(_) => { + Err(self) + } + } + } + + /// Nothing to prepare for [`ArgsTaskType`] type + pub fn preparation_state(&self) -> &PreparationState { + todo!() + } + + pub fn switch_preparation_state( + &mut self, + new_state: PreparationState, + ) -> TaskResult<()> { + if !matches!(new_state, PreparationState::NotNeeded) { + Err(TaskError::PreparationStateTransitionError) + } else { + // Do nothing + Ok(()) + } + } + + pub fn compute_units(&self) -> u32 { + match &self { + Task::Commit(_) => 70_000, + Task::BaseAction(task) => task.action.compute_units, + Task::Undelegate(_) => 70_000, + Task::Finalize(_) => 70_000, + } + } + + pub fn strategy(&self) -> TaskStrategy { + match &self { + Task::Commit(commit) => commit.task_strategy(), + Task::BaseAction(_) => TaskStrategy::Args, + Task::Undelegate(_) => TaskStrategy::Args, + Task::Finalize(_) => TaskStrategy::Args, + } + } + + pub fn lifecycle(&self) -> Option<&BufferLifecycle> { + match &self { + Task::Commit(commit) => commit.lifecycle(), + Task::BaseAction(_) => None, + Task::Undelegate(_) => None, + Task::Finalize(_) => None, + } + } + + pub fn task_type(&self) -> TaskType { + match &self { + Task::Commit(_) => TaskType::Commit, + Task::BaseAction(_) => TaskType::Action, + Task::Undelegate(_) => TaskType::Undelegate, + Task::Finalize(_) => TaskType::Finalize, + } + } + + /// For tasks using Args strategy call corresponding `Visitor` method + pub fn visit(&self, visitor: &mut dyn Visitor) { + visitor.visit_task(self); + } + + pub fn reset_commit_id(&mut self, commit_id: u64) { + let Task::Commit(commit_task) = self else { + return; + }; + commit_task.reset_commit_id(commit_id); + } +} diff --git a/magicblock-committor-service/src/tasks/task_builder.rs b/magicblock-committor-service/src/tasks/task_builder.rs index 95b11b030..3029b4441 100644 --- a/magicblock-committor-service/src/tasks/task_builder.rs +++ b/magicblock-committor-service/src/tasks/task_builder.rs @@ -15,26 +15,24 @@ use crate::{ }, persist::IntentPersister, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - BaseActionTask, BaseTask, CommitTaskBuilder, FinalizeTask, - UndelegateTask, + BaseActionTask, CommitTaskBuilder, FinalizeTask, Task, UndelegateTask, }, }; #[async_trait] pub trait TasksBuilder { // Creates tasks for commit stage - async fn commit_tasks( + async fn create_commit_tasks( task_info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, persister: &Option

, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; // Create tasks for finalize stage async fn finalize_tasks( task_info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; } /// V1 Task builder @@ -44,20 +42,17 @@ pub struct TaskBuilderImpl; #[async_trait] impl TasksBuilder for TaskBuilderImpl { /// Returns [`Task`]s for Commit stage - async fn commit_tasks( + async fn create_commit_tasks( task_info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, persister: &Option

, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { let (accounts, allow_undelegation) = match &base_intent.base_intent { MagicBaseIntent::BaseActions(actions) => { let tasks = actions .iter() .map(|el| { - let task = BaseActionTask { action: el.clone() }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + Task::BaseAction(BaseActionTask { action: el.clone() }) }) .collect(); @@ -91,14 +86,12 @@ impl TasksBuilder for TaskBuilderImpl { .iter() .map(|account| async { let commit_id = *commit_ids.get(&account.pubkey).expect("CommitIdFetcher provide commit ids for all listed pubkeys, or errors!"); - let task = ArgsTaskType::Commit(CommitTaskBuilder::create_commit_task( + Task::Commit(CommitTaskBuilder::create_commit_task( commit_id, allow_undelegation, account.clone(), task_info_fetcher, - ).await); - - Box::new(ArgsTask::new(task)) as Box + ).await) })).await; Ok(tasks) @@ -108,30 +101,28 @@ impl TasksBuilder for TaskBuilderImpl { async fn finalize_tasks( task_info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { // Helper to create a finalize task - fn finalize_task(account: &CommittedAccount) -> Box { - let task_type = ArgsTaskType::Finalize(FinalizeTask { + fn finalize_task(account: &CommittedAccount) -> Task { + Task::Finalize(FinalizeTask { delegated_account: account.pubkey, - }); - Box::new(ArgsTask::new(task_type)) + }) } // Helper to create an undelegate task fn undelegate_task( account: &CommittedAccount, rent_reimbursement: &Pubkey, - ) -> Box { - let task_type = ArgsTaskType::Undelegate(UndelegateTask { + ) -> Task { + Task::Undelegate(UndelegateTask { delegated_account: account.pubkey, owner_program: account.account.owner, rent_reimbursement: *rent_reimbursement, - }); - Box::new(ArgsTask::new(task_type)) + }) } // Helper to process commit types - fn process_commit(commit: &CommitType) -> Vec> { + fn process_commit(commit: &CommitType) -> Vec { match commit { CommitType::Standalone(accounts) => { accounts.iter().map(finalize_task).collect() @@ -145,12 +136,9 @@ impl TasksBuilder for TaskBuilderImpl { .map(finalize_task) .collect::>(); tasks.extend(base_actions.iter().map(|action| { - let task = BaseActionTask { + Task::BaseAction(BaseActionTask { action: action.clone(), - }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + }) })); tasks } @@ -184,12 +172,9 @@ impl TasksBuilder for TaskBuilderImpl { UndelegateType::Standalone => Ok(tasks), UndelegateType::WithBaseActions(actions) => { tasks.extend(actions.iter().map(|action| { - let task = BaseActionTask { + Task::BaseAction(BaseActionTask { action: action.clone(), - }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + }) })); Ok(tasks) diff --git a/magicblock-committor-service/src/tasks/task_strategist.rs b/magicblock-committor-service/src/tasks/task_strategist.rs index 7767e7683..010f24bfa 100644 --- a/magicblock-committor-service/src/tasks/task_strategist.rs +++ b/magicblock-committor-service/src/tasks/task_strategist.rs @@ -9,18 +9,17 @@ use solana_sdk::{ use crate::{ persist::IntentPersister, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, task_visitors::persistor_visitor::{ PersistorContext, PersistorVisitor, }, utils::TransactionUtils, - BaseTask, FinalizeTask, + FinalizeTask, Task, }, transactions::{serialize_and_encode_base64, MAX_ENCODED_TRANSACTION_SIZE}, }; pub struct TransactionStrategy { - pub optimized_tasks: Vec>, + pub optimized_tasks: Vec, pub lookup_tables_keys: Vec, } @@ -48,7 +47,7 @@ impl TaskStrategist { /// Returns [`TaskDeliveryStrategy`] for every [`Task`] /// Returns Error if all optimizations weren't enough pub fn build_strategy( - mut tasks: Vec>, + mut tasks: Vec, validator: &Pubkey, persistor: &Option

, ) -> TaskStrategistResult { @@ -103,7 +102,7 @@ impl TaskStrategist { /// Attempt to use ALTs for ALL keys in tx /// Returns `true` if ALTs make tx fit, otherwise `false` /// TODO(edwin): optimize to use only necessary amount of pubkeys - pub fn attempt_lookup_tables(tasks: &[Box]) -> bool { + pub fn attempt_lookup_tables(tasks: &[Task]) -> bool { let placeholder = Keypair::new(); // Gather all involved keys in tx let budgets = TransactionUtils::tasks_compute_units(tasks); @@ -138,7 +137,7 @@ impl TaskStrategist { pub fn collect_lookup_table_keys( authority: &Pubkey, - tasks: &[Box], + tasks: &[Task], ) -> Vec { let budgets = TransactionUtils::tasks_compute_units(tasks); let budget_instructions = @@ -156,10 +155,10 @@ impl TaskStrategist { /// Note that the returned size, though possibly optimized one, may still not be under /// the limit MAX_ENCODED_TRANSACTION_SIZE. The caller needs to check and make decision accordingly. fn try_optimize_tx_size_if_needed( - tasks: &mut [Box], + tasks: &mut [Task], ) -> Result { // Get initial transaction size - let calculate_tx_length = |tasks: &[Box]| { + let calculate_tx_length = |tasks: &[Task]| { // TODO (snawaz): we seem to discard lots of heavy computations here match TransactionUtils::assemble_tasks_tx( &Keypair::new(), // placeholder @@ -203,11 +202,9 @@ impl TaskStrategist { let task = { // This is tmp task that will be replaced by old or optimized one - let tmp_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); - let tmp_task = Box::new(tmp_task) as Box; + let tmp_task = Task::Finalize(FinalizeTask { + delegated_account: Pubkey::new_unique(), + }); std::mem::replace(&mut tasks[index], tmp_task) }; match task.try_optimize_tx_size() { @@ -266,18 +263,13 @@ mod tests { use crate::{ intent_executor::NullTaskInfoFetcher, persist::IntentPersisterImpl, - tasks::{ - BaseActionTask, CommitTaskBuilder, TaskStrategy, UndelegateTask, - }, + tasks::{BaseActionTask, CommitTask, TaskStrategy, UndelegateTask}, }; // Helper to create a simple commit task - async fn create_test_commit_task( - commit_id: u64, - data_size: usize, - ) -> ArgsTask { - ArgsTask::new(ArgsTaskType::Commit( - CommitTaskBuilder::create_commit_task( + async fn create_test_commit_task(commit_id: u64, data_size: usize) -> Task { + Task::Commit( + CommitTask::new( commit_id, false, CommittedAccount { @@ -293,12 +285,12 @@ mod tests { &Arc::new(NullTaskInfoFetcher), ) .await, - )) + ) } // Helper to create a Base action task - fn create_test_base_action_task(len: usize) -> ArgsTask { - ArgsTask::new(ArgsTaskType::BaseAction(BaseActionTask { + fn create_test_base_action_task(len: usize) -> Task { + Task::BaseAction(BaseActionTask { action: BaseAction { destination_program: Pubkey::new_unique(), escrow_authority: Pubkey::new_unique(), @@ -309,30 +301,29 @@ mod tests { }, compute_units: 30_000, }, - })) + }) } // Helper to create a finalize task - fn create_test_finalize_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + fn create_test_finalize_task() -> Task { + Task::Finalize(FinalizeTask { delegated_account: Pubkey::new_unique(), - })) + }) } // Helper to create an undelegate task - fn create_test_undelegate_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Undelegate(UndelegateTask { + fn create_test_undelegate_task() -> Task { + Task::Undelegate(UndelegateTask { delegated_account: Pubkey::new_unique(), owner_program: system_program::id(), rent_reimbursement: Pubkey::new_unique(), - })) + }) } #[tokio::test] async fn test_build_strategy_with_single_small_task() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 100).await; - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 100).await]; let strategy = TaskStrategist::build_strategy( tasks, @@ -349,8 +340,7 @@ mod tests { async fn test_build_strategy_optimizes_to_buffer_when_needed() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 1000).await; // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 1000).await]; // Large task let strategy = TaskStrategist::build_strategy( tasks, @@ -370,8 +360,7 @@ mod tests { async fn test_build_strategy_optimizes_to_buffer_u16_exceeded() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 66_000).await; // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 66_000).await]; // Large task let strategy = TaskStrategist::build_strategy( tasks, @@ -395,8 +384,7 @@ mod tests { let validator = Pubkey::new_unique(); let tasks = join_all((0..NUM_COMMITS).map(|i| async move { - let task = create_test_commit_task(i, 500).await; // Large task - Box::new(task) as Box + create_test_commit_task(i, 500).await // Large task })) .await; @@ -422,8 +410,7 @@ mod tests { let tasks = join_all((0..NUM_COMMITS).map(|i| async move { // Large task - let task = create_test_commit_task(i, 10000).await; - Box::new(task) as Box + create_test_commit_task(i, 10000).await })) .await; @@ -448,8 +435,7 @@ mod tests { let tasks = join_all((0..NUM_COMMITS).map(|i| async move { // Large task - let task = create_test_commit_task(i, 1000).await; - Box::new(task) as Box + create_test_commit_task(i, 1000).await })) .await; @@ -464,12 +450,9 @@ mod tests { #[tokio::test] async fn test_optimize_strategy_prioritizes_largest_tasks() { let mut tasks = [ - Box::new(create_test_commit_task(1, 100).await) - as Box, - Box::new(create_test_commit_task(2, 1000).await) - as Box, // Larger task - Box::new(create_test_commit_task(3, 1000).await) - as Box, // Larger task + create_test_commit_task(1, 100).await, + create_test_commit_task(2, 1000).await, // Larger task + create_test_commit_task(3, 1000).await, // Larger task ]; let _ = TaskStrategist::try_optimize_tx_size_if_needed(&mut tasks); @@ -482,11 +465,10 @@ mod tests { async fn test_mixed_task_types_with_optimization() { let validator = Pubkey::new_unique(); let tasks = vec![ - Box::new(create_test_commit_task(1, 1000).await) - as Box, - Box::new(create_test_finalize_task()) as Box, - Box::new(create_test_base_action_task(500)) as Box, - Box::new(create_test_undelegate_task()) as Box, + create_test_commit_task(1, 1000).await, + create_test_finalize_task(), + create_test_base_action_task(500), + create_test_undelegate_task(), ]; let strategy = TaskStrategist::build_strategy( diff --git a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs index 5aac14fe7..6840834cb 100644 --- a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs +++ b/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs @@ -2,11 +2,7 @@ use log::error; use crate::{ persist::{CommitStrategy, IntentPersister}, - tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - }, + tasks::{visitor::Visitor, Task}, }; pub enum PersistorContext { @@ -23,11 +19,10 @@ impl

Visitor for PersistorVisitor<'_, P> where P: IntentPersister, { - fn visit_args_task(&mut self, task: &ArgsTask) { + fn visit_task(&mut self, task: &Task) { match self.context { PersistorContext::PersistStrategy { uses_lookup_tables } => { - let ArgsTaskType::Commit(ref commit_task) = task.task_type - else { + let Task::Commit(ref commit_task) = task else { return; }; @@ -51,29 +46,4 @@ where } } } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - match self.context { - PersistorContext::PersistStrategy { uses_lookup_tables } => { - let BufferTaskType::Commit(ref commit_task) = task.task_type; - let commit_strategy = if uses_lookup_tables { - CommitStrategy::StateBufferWithLookupTable - } else { - CommitStrategy::StateBuffer - }; - - if let Err(err) = self.persistor.set_commit_strategy( - commit_task.commit_id, - &commit_task.committed_account.pubkey, - commit_strategy, - ) { - error!( - "Failed to persist commit strategy {}: {}", - commit_strategy.as_str(), - err - ); - } - } - } - } } diff --git a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs index fef7cade1..277888c4b 100644 --- a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs +++ b/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs @@ -1,10 +1,6 @@ use solana_pubkey::Pubkey; -use crate::tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, -}; +use crate::tasks::{visitor::Visitor, Task}; pub struct CommitMeta { pub committed_pubkey: Pubkey, @@ -16,10 +12,10 @@ pub enum TaskVisitorUtils { } impl Visitor for TaskVisitorUtils { - fn visit_args_task(&mut self, task: &ArgsTask) { + fn visit_task(&mut self, task: &Task) { let Self::GetCommitMeta(commit_meta) = self; - if let ArgsTaskType::Commit(ref commit_task) = task.task_type { + if let Task::Commit(ref commit_task) = task { *commit_meta = Some(CommitMeta { committed_pubkey: commit_task.committed_account.pubkey, commit_id: commit_task.commit_id, @@ -28,14 +24,4 @@ impl Visitor for TaskVisitorUtils { *commit_meta = None } } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - let Self::GetCommitMeta(commit_meta) = self; - - let BufferTaskType::Commit(ref commit_task) = task.task_type; - *commit_meta = Some(CommitMeta { - committed_pubkey: commit_task.committed_account.pubkey, - commit_id: commit_task.commit_id, - }) - } } diff --git a/magicblock-committor-service/src/tasks/utils.rs b/magicblock-committor-service/src/tasks/utils.rs index b7f380c8e..259cf3cba 100644 --- a/magicblock-committor-service/src/tasks/utils.rs +++ b/magicblock-committor-service/src/tasks/utils.rs @@ -13,7 +13,9 @@ use solana_sdk::{ transaction::VersionedTransaction, }; -use crate::tasks::{task_strategist::TaskStrategistResult, BaseTask}; +use crate::tasks::task_strategist::TaskStrategistResult; + +use super::Task; pub struct TransactionUtils; impl TransactionUtils { @@ -30,7 +32,7 @@ impl TransactionUtils { } pub fn unique_involved_pubkeys( - tasks: &[Box], + tasks: &[Task], validator: &Pubkey, budget_instructions: &[Instruction], ) -> Vec { @@ -51,7 +53,7 @@ impl TransactionUtils { pub fn tasks_instructions( validator: &Pubkey, - tasks: &[Box], + tasks: &[Task], ) -> Vec { tasks .iter() @@ -61,7 +63,7 @@ impl TransactionUtils { pub fn assemble_tasks_tx( authority: &Keypair, - tasks: &[Box], + tasks: &[Task], compute_unit_price: u64, lookup_tables: &[AddressLookupTableAccount], ) -> TaskStrategistResult { @@ -125,8 +127,8 @@ impl TransactionUtils { Ok(tx) } - pub fn tasks_compute_units(tasks: &[impl AsRef]) -> u32 { - tasks.iter().map(|task| task.as_ref().compute_units()).sum() + pub fn tasks_compute_units(tasks: &[Task]) -> u32 { + tasks.iter().map(|task| task.compute_units()).sum() } pub fn budget_instructions( diff --git a/magicblock-committor-service/src/tasks/visitor.rs b/magicblock-committor-service/src/tasks/visitor.rs index 1b9940a09..171b984f0 100644 --- a/magicblock-committor-service/src/tasks/visitor.rs +++ b/magicblock-committor-service/src/tasks/visitor.rs @@ -1,6 +1,5 @@ -use crate::tasks::{args_task::ArgsTask, buffer_task::BufferTask}; +use crate::tasks::Task; pub trait Visitor { - fn visit_args_task(&mut self, task: &ArgsTask); - fn visit_buffer_task(&mut self, task: &BufferTask); + fn visit_task(&mut self, task: &Task); } diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 13001df0e..c2e4e9e4c 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -31,8 +31,8 @@ use solana_sdk::{ use crate::{ persist::{CommitStatus, IntentPersister}, tasks::{ - task_strategist::TransactionStrategy, BaseTask, BaseTaskError, - CleanupTask, PreparationState, PreparationTask, + task_strategist::TransactionStrategy, CleanupTask, PreparationState, + PreparationTask, Task, TaskError, }, utils::persist_status_update, ComputeBudgetConfig, @@ -87,7 +87,7 @@ impl DeliveryPreparator { pub async fn prepare_task( &self, authority: &Keypair, - task: &mut dyn BaseTask, + task: &mut Task, persister: &Option

, ) -> DeliveryPreparatorResult<(), InternalError> { let PreparationState::Required(preparation_task) = @@ -140,10 +140,10 @@ impl DeliveryPreparator { pub async fn prepare_task_handling_errors( &self, authority: &Keypair, - task: &mut Box, + task: &mut Task, persister: &Option

, ) -> Result<(), InternalError> { - let res = self.prepare_task(authority, task.as_mut(), persister).await; + let res = self.prepare_task(authority, task, persister).await; match res { Err(InternalError::BufferExecutionError( BufferExecutionError::AccountAlreadyInitializedError( @@ -175,7 +175,7 @@ impl DeliveryPreparator { preparation_task, ))?; - self.prepare_task(authority, task.as_mut(), persister).await + self.prepare_task(authority, task, persister).await } /// Initializes buffer account for future writes @@ -414,7 +414,7 @@ impl DeliveryPreparator { pub async fn cleanup( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), InternalError> { self.table_mania @@ -520,8 +520,8 @@ pub enum InternalError { MagicBlockRpcClientError(#[from] MagicBlockRpcClientError), #[error("BufferExecutionError: {0}")] BufferExecutionError(#[from] BufferExecutionError), - #[error("BaseTaskError: {0}")] - BaseTaskError(#[from] BaseTaskError), + #[error("TaskError: {0}")] + TaskError(#[from] TaskError), } #[derive(thiserror::Error, Debug)] diff --git a/magicblock-committor-service/src/transaction_preparator/mod.rs b/magicblock-committor-service/src/transaction_preparator/mod.rs index 47c795797..ed64cb3b6 100644 --- a/magicblock-committor-service/src/transaction_preparator/mod.rs +++ b/magicblock-committor-service/src/transaction_preparator/mod.rs @@ -7,7 +7,7 @@ use solana_sdk::{message::VersionedMessage, signature::Keypair}; use crate::{ persist::IntentPersister, tasks::{ - task_strategist::TransactionStrategy, utils::TransactionUtils, BaseTask, + task_strategist::TransactionStrategy, utils::TransactionUtils, Task, }, transaction_preparator::{ delivery_preparator::{ @@ -24,7 +24,7 @@ pub mod error; #[async_trait] pub trait TransactionPreparator: Send + Sync + 'static { /// Return [`VersionedMessage`] corresponding to [`TransactionStrategy`] - /// Handles all necessary preparation needed for successful [`BaseTask`] execution + /// Handles all necessary preparation needed for successful [`Task`] execution async fn prepare_for_strategy( &self, authority: &Keypair, @@ -36,7 +36,7 @@ pub trait TransactionPreparator: Send + Sync + 'static { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), InternalError>; } @@ -112,7 +112,7 @@ impl TransactionPreparator for TransactionPreparatorImpl { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), InternalError> { self.delivery_preparator