diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index 9e1ce0857b..1768347bb6 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -79,7 +79,13 @@ impl CardanoTransactionsImporter { return Ok(()); } - let parsed_transactions = self.block_scanner.parse(&self.dirpath, from, until).await?; + // todo: temp algorithm, should be optimized to avoid loading all blocks & transactions + // at once in memory (probably using iterators) + let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?; + let parsed_transactions: Vec = scanned_blocks + .into_iter() + .flat_map(|b| b.into_transactions()) + .collect(); debug!( self.logger, "TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'", @@ -114,6 +120,8 @@ mod tests { use mockall::mock; use mockall::predicate::eq; + use mithril_common::cardano_block_scanner::ScannedBlock; + use crate::database::repository::CardanoTransactionRepository; use crate::database::test_helper::cardano_tx_db_connection; @@ -124,19 +132,23 @@ mod tests { #[async_trait] impl BlockScanner for BlockScannerImpl { - async fn parse( + async fn scan( &self, dirpath: &Path, from_immutable: Option, until_immutable: ImmutableFileNumber, - ) -> StdResult>; + ) -> StdResult>; } } - fn build_importer( - scanner_mock_config: &dyn Fn(&mut MockBlockScannerImpl), - store_mock_config: &dyn Fn(&mut MockTransactionStore), - ) -> CardanoTransactionsImporter { + fn build_importer( + scanner_mock_config: TParser, + store_mock_config: TStore, + ) -> CardanoTransactionsImporter + where + TParser: FnOnce(&mut MockBlockScannerImpl), + TStore: FnOnce(&mut MockTransactionStore), + { let db_path = Path::new(""); let mut scanner = MockBlockScannerImpl::new(); scanner_mock_config(&mut scanner); @@ -156,23 +168,24 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_transactions() { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-2", 10, 20, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-3", 20, 25, "block_hash-2", 12), - CardanoTransaction::new("tx_hash-4", 20, 30, "block_hash-2", 12), + let blocks = vec![ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let up_to_beacon = 12; let importer = build_importer( - &|scanner_mock| { - let parsed_transactions = transactions.clone(); + |scanner_mock| { scanner_mock - .expect_parse() + .expect_scan() .withf(move |_, from, until| from.is_none() && until == &up_to_beacon) - .return_once(move |_, _, _| Ok(parsed_transactions)); + .return_once(move |_, _, _| Ok(blocks)); }, - &|store_mock| { + |store_mock| { let expected_stored_transactions = transactions.clone(); store_mock .expect_get_highest_beacon() @@ -196,10 +209,10 @@ mod tests { let up_to_beacon = 12; let importer = build_importer( - &|scanner_mock| { - scanner_mock.expect_parse().never(); + |scanner_mock| { + scanner_mock.expect_scan().never(); }, - &|store_mock| { + |store_mock| { store_mock .expect_get_highest_beacon() .returning(|| Ok(Some(12))); @@ -215,23 +228,25 @@ mod tests { #[tokio::test] async fn if_all_half_are_stored_the_other_half_is_parsed_and_stored() { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-10", 11), - CardanoTransaction::new("tx_hash-2", 20, 20, "block_hash-20", 12), - CardanoTransaction::new("tx_hash-3", 30, 25, "block_hash-30", 13), - CardanoTransaction::new("tx_hash-4", 40, 30, "block_hash-40", 14), + let blocks = [ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let up_to_beacon = 14; let importer = build_importer( - &|scanner_mock| { - let parsed_transactions = transactions[2..=3].to_vec(); + |scanner_mock| { + let scanned_blocks = vec![blocks[1].clone()]; scanner_mock - .expect_parse() + .expect_scan() .withf(move |_, from, until| from == &Some(13) && until == &up_to_beacon) - .return_once(move |_, _, _| Ok(parsed_transactions)); + .return_once(move |_, _, _| Ok(scanned_blocks)); }, - &|store_mock| { + |store_mock| { store_mock .expect_get_highest_beacon() .returning(|| Ok(Some(12))); @@ -253,19 +268,18 @@ mod tests { #[tokio::test] async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_the_transactions_in_same_order( ) { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-2", 10, 20, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-3", 20, 25, "block_hash-2", 12), - CardanoTransaction::new("tx_hash-4", 20, 30, "block_hash-2", 12), + let blocks = vec![ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let importer = { let connection = cardano_tx_db_connection().unwrap(); - let parsed_transactions = transactions.clone(); let mut scanner = MockBlockScannerImpl::new(); - scanner - .expect_parse() - .return_once(move |_, _, _| Ok(parsed_transactions)); + scanner.expect_scan().return_once(move |_, _, _| Ok(blocks)); CardanoTransactionsImporter::new( Arc::new(scanner), diff --git a/mithril-common/src/cardano_block_scanner.rs b/mithril-common/src/cardano_block_scanner.rs index bd8faf2ea4..77718d4797 100644 --- a/mithril-common/src/cardano_block_scanner.rs +++ b/mithril-common/src/cardano_block_scanner.rs @@ -33,19 +33,19 @@ use tokio::sync::RwLock; /// /// #[async_trait] /// impl BlockScanner for BlockScannerImpl { -/// async fn parse( +/// async fn scan( /// &self, /// dirpath: &Path, /// from_immutable: Option, /// until_immutable: ImmutableFileNumber, -/// ) -> StdResult>; +/// ) -> StdResult>; /// } /// } /// /// #[test] /// fn test_mock() { /// let mut mock = MockBlockScannerImpl::new(); -/// mock.expect_parse().return_once(|_, _| { +/// mock.expect_scan().return_once(|_, _| { /// Err(anyhow!("parse error")) /// }); /// } @@ -54,68 +54,110 @@ use tokio::sync::RwLock; #[async_trait] pub trait BlockScanner: Sync + Send { /// Parse the transactions - async fn parse( + async fn scan( &self, dirpath: &Path, from_immutable: Option, until_immutable: ImmutableFileNumber, - ) -> StdResult>; + ) -> StdResult>; } /// Dumb transaction parser pub struct DumbBlockScanner { - transactions: RwLock>, + blocks: RwLock>, } impl DumbBlockScanner { /// Factory - pub fn new(transactions: Vec) -> Self { + pub fn new(blocks: Vec) -> Self { Self { - transactions: RwLock::new(transactions), + blocks: RwLock::new(blocks), } } /// Update transactions returned by `parse` - pub async fn update_transactions(&self, new_transactions: Vec) { - let mut transactions = self.transactions.write().await; - *transactions = new_transactions; + pub async fn update_transactions(&self, new_blocks: Vec) { + let mut blocks = self.blocks.write().await; + *blocks = new_blocks; } } #[async_trait] impl BlockScanner for DumbBlockScanner { - async fn parse( + async fn scan( &self, _dirpath: &Path, _from_immutable: Option, _until_immutable: ImmutableFileNumber, - ) -> StdResult> { - Ok(self.transactions.read().await.clone()) + ) -> StdResult> { + Ok(self.blocks.read().await.clone()) } } -#[derive(Debug)] -struct Block { +/// A block scanned from a Cardano database +#[derive(Debug, Clone)] +pub struct ScannedBlock { + /// Block hash + pub block_hash: BlockHash, + /// Block number pub block_number: BlockNumber, + /// Slot number of the block + pub slot_number: SlotNumber, + /// Number of the immutable that own the block pub immutable_file_number: ImmutableFileNumber, + /// Hashes of the transactions in the block pub transactions: Vec, - pub slot_number: SlotNumber, - pub block_hash: BlockHash, } -impl Block { +impl ScannedBlock { + /// Scanned block factory + pub fn new, U: Into>( + block_hash: U, + block_number: BlockNumber, + slot_number: SlotNumber, + immutable_file_number: ImmutableFileNumber, + transaction_hashes: Vec, + ) -> Self { + Self { + block_hash: block_hash.into(), + block_number, + slot_number, + immutable_file_number, + transactions: transaction_hashes.into_iter().map(|h| h.into()).collect(), + } + } + fn convert(multi_era_block: MultiEraBlock, immutable_file_number: ImmutableFileNumber) -> Self { let mut transactions = Vec::new(); for tx in &multi_era_block.txs() { transactions.push(tx.hash().to_string()); } - Block { - block_number: multi_era_block.number(), + + Self::new( + multi_era_block.hash().to_string(), + multi_era_block.number(), + multi_era_block.slot(), immutable_file_number, transactions, - slot_number: multi_era_block.slot(), - block_hash: multi_era_block.hash().to_string(), - } + ) + } + + /// Convert the scanned block into a list of Cardano transactions. + /// + /// Consume the block. + pub fn into_transactions(self) -> Vec { + self.transactions + .into_iter() + .map(|transaction_hash| { + CardanoTransaction::new( + transaction_hash, + self.block_number, + self.slot_number, + self.block_hash.clone(), + self.immutable_file_number, + ) + }) + .collect::>() } } @@ -141,7 +183,7 @@ impl CardanoBlockScanner { fn read_blocks_from_immutable_file( &self, immutable_file: &ImmutableFile, - ) -> StdResult> { + ) -> StdResult> { let cardano_blocks_reader = CardanoBlockScanner::cardano_blocks_reader(immutable_file)?; let mut blocks = Vec::new(); @@ -169,7 +211,7 @@ impl CardanoBlockScanner { Ok(blocks) } - fn convert_to_block(block: &[u8], immutable_file: &ImmutableFile) -> StdResult { + fn convert_to_block(block: &[u8], immutable_file: &ImmutableFile) -> StdResult { let multi_era_block = MultiEraBlock::decode(block).with_context(|| { format!( "Error while decoding block in immutable file: '{:?}'", @@ -177,7 +219,10 @@ impl CardanoBlockScanner { ) })?; - Ok(Block::convert(multi_era_block, immutable_file.number)) + Ok(ScannedBlock::convert( + multi_era_block, + immutable_file.number, + )) } fn cardano_blocks_reader(immutable_file: &ImmutableFile) -> StdResult { @@ -200,12 +245,12 @@ impl CardanoBlockScanner { #[async_trait] impl BlockScanner for CardanoBlockScanner { - async fn parse( + async fn scan( &self, dirpath: &Path, from_immutable: Option, until_immutable: ImmutableFileNumber, - ) -> StdResult> { + ) -> StdResult> { let is_in_bounds = |number: ImmutableFileNumber| match from_immutable { Some(from) => (from..=until_immutable).contains(&number), None => number <= until_immutable, @@ -214,10 +259,10 @@ impl BlockScanner for CardanoBlockScanner { .into_iter() .filter(|f| is_in_bounds(f.number) && f.filename.contains("chunk")) .collect::>(); - let mut transactions: Vec = vec![]; + let mut scanned_blocks: Vec = vec![]; for immutable_file in &immutable_chunks { - let blocks = self + let mut blocks = self .read_blocks_from_immutable_file(immutable_file) .with_context(|| { format!( @@ -225,25 +270,10 @@ impl BlockScanner for CardanoBlockScanner { immutable_file.path.display() ) })?; - let mut block_transactions = blocks - .into_iter() - .flat_map(|block| { - block.transactions.into_iter().map(move |transaction_hash| { - CardanoTransaction::new( - transaction_hash, - block.block_number, - block.slot_number, - block.block_hash.clone(), - block.immutable_file_number, - ) - }) - }) - .collect::>(); - - transactions.append(&mut block_transactions); + scanned_blocks.append(&mut blocks); } - Ok(transactions) + Ok(scanned_blocks) } } @@ -282,16 +312,17 @@ mod tests { assert!(get_number_of_immutable_chunk_in_dir(db_path) >= 3); let until_immutable_file = 2; - let tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); + let expected_tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); let cardano_transaction_parser = CardanoBlockScanner::new(Logger::root(slog::Discard, slog::o!()), false); - let transactions = cardano_transaction_parser - .parse(db_path, None, until_immutable_file) + let blocks = cardano_transaction_parser + .scan(db_path, None, until_immutable_file) .await .unwrap(); + let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum(); - assert_eq!(transactions.len(), tx_count); + assert_eq!(tx_count, expected_tx_count); } #[tokio::test] @@ -302,16 +333,17 @@ mod tests { assert!(get_number_of_immutable_chunk_in_dir(db_path) >= 3); let until_immutable_file = 2; - let tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); + let expected_tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); let cardano_transaction_parser = CardanoBlockScanner::new(Logger::root(slog::Discard, slog::o!()), false); - let transactions = cardano_transaction_parser - .parse(db_path, Some(2), until_immutable_file) + let blocks = cardano_transaction_parser + .scan(db_path, Some(2), until_immutable_file) .await .unwrap(); + let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum(); - assert_eq!(transactions.len(), tx_count); + assert_eq!(tx_count, expected_tx_count); } #[tokio::test] @@ -322,7 +354,7 @@ mod tests { CardanoBlockScanner::new(Logger::root(slog::Discard, slog::o!()), false); let result = cardano_transaction_parser - .parse(db_path, None, until_immutable_file) + .scan(db_path, None, until_immutable_file) .await; assert!(result.is_err()); @@ -343,7 +375,7 @@ mod tests { CardanoBlockScanner::new(create_file_logger(&filepath), true); cardano_transaction_parser - .parse(db_path, None, until_immutable_file) + .scan(db_path, None, until_immutable_file) .await .expect_err("parse should have failed"); } @@ -360,15 +392,16 @@ mod tests { assert!(get_number_of_immutable_chunk_in_dir(db_path) >= 2); let until_immutable_file = 1; - let tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); + let expected_tx_count: usize = immutable_files.iter().map(|(_, count)| *count).sum(); let cardano_transaction_parser = CardanoBlockScanner::new(Logger::root(slog::Discard, slog::o!()), false); - let transactions = cardano_transaction_parser - .parse(db_path, None, until_immutable_file) + let blocks = cardano_transaction_parser + .scan(db_path, None, until_immutable_file) .await .unwrap(); + let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum(); - assert_eq!(transactions.len(), tx_count); + assert_eq!(tx_count, expected_tx_count); } } diff --git a/mithril-signer/src/cardano_transactions_importer.rs b/mithril-signer/src/cardano_transactions_importer.rs index 8fdf53be42..a5ad11f866 100644 --- a/mithril-signer/src/cardano_transactions_importer.rs +++ b/mithril-signer/src/cardano_transactions_importer.rs @@ -79,7 +79,13 @@ impl CardanoTransactionsImporter { return Ok(()); } - let parsed_transactions = self.block_scanner.parse(&self.dirpath, from, until).await?; + // todo: temp algorithm, should be optimized to avoid loading all blocks & transactions + // at once in memory (probably using iterators) + let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?; + let parsed_transactions: Vec = scanned_blocks + .into_iter() + .flat_map(|b| b.into_transactions()) + .collect(); debug!( self.logger, "TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'", @@ -114,6 +120,8 @@ mod tests { use mockall::mock; use mockall::predicate::eq; + use mithril_common::cardano_block_scanner::ScannedBlock; + use crate::database::repository::CardanoTransactionRepository; use crate::database::test_utils::cardano_tx_db_connection; @@ -124,12 +132,12 @@ mod tests { #[async_trait] impl BlockScanner for BlockScannerImpl { - async fn parse( + async fn scan( &self, dirpath: &Path, from_immutable: Option, until_immutable: ImmutableFileNumber, - ) -> StdResult>; + ) -> StdResult>; } } @@ -142,15 +150,15 @@ mod tests { TStore: FnOnce(&mut MockTransactionStore), { let db_path = Path::new(""); - let mut parser = MockBlockScannerImpl::new(); - scanner_mock_config(&mut parser); + let mut scanner = MockBlockScannerImpl::new(); + scanner_mock_config(&mut scanner); let mut store = MockTransactionStore::new(); store.expect_get_up_to().returning(|_| Ok(vec![])); store_mock_config(&mut store); CardanoTransactionsImporter::new( - Arc::new(parser), + Arc::new(scanner), Arc::new(store), db_path, None, @@ -160,21 +168,22 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_transactions() { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-2", 10, 20, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-3", 20, 25, "block_hash-2", 12), - CardanoTransaction::new("tx_hash-4", 20, 30, "block_hash-2", 12), + let blocks = vec![ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let up_to_beacon = 12; let importer = build_importer( |scanner_mock| { - let parsed_transactions = transactions.clone(); scanner_mock - .expect_parse() + .expect_scan() .withf(move |_, from, until| from.is_none() && until == &up_to_beacon) - .return_once(move |_, _, _| Ok(parsed_transactions)); + .return_once(move |_, _, _| Ok(blocks)); }, |store_mock| { let expected_stored_transactions = transactions.clone(); @@ -201,7 +210,7 @@ mod tests { let importer = build_importer( |scanner_mock| { - scanner_mock.expect_parse().never(); + scanner_mock.expect_scan().never(); }, |store_mock| { store_mock @@ -219,21 +228,23 @@ mod tests { #[tokio::test] async fn if_all_half_are_stored_the_other_half_is_parsed_and_stored() { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-10", 11), - CardanoTransaction::new("tx_hash-2", 20, 20, "block_hash-20", 12), - CardanoTransaction::new("tx_hash-3", 30, 25, "block_hash-30", 13), - CardanoTransaction::new("tx_hash-4", 40, 30, "block_hash-40", 14), + let blocks = [ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let up_to_beacon = 14; let importer = build_importer( |scanner_mock| { - let parsed_transactions = transactions[2..=3].to_vec(); + let scanned_blocks = vec![blocks[1].clone()]; scanner_mock - .expect_parse() + .expect_scan() .withf(move |_, from, until| from == &Some(13) && until == &up_to_beacon) - .return_once(move |_, _, _| Ok(parsed_transactions)); + .return_once(move |_, _, _| Ok(scanned_blocks)); }, |store_mock| { store_mock @@ -257,19 +268,18 @@ mod tests { #[tokio::test] async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_the_transactions_in_same_order( ) { - let transactions = vec![ - CardanoTransaction::new("tx_hash-1", 10, 15, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-2", 10, 20, "block_hash-1", 11), - CardanoTransaction::new("tx_hash-3", 20, 25, "block_hash-2", 12), - CardanoTransaction::new("tx_hash-4", 20, 30, "block_hash-2", 12), + let blocks = vec![ + ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), + ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]), ]; + let transactions: Vec = blocks + .iter() + .flat_map(|b| b.clone().into_transactions()) + .collect(); let importer = { let connection = cardano_tx_db_connection().unwrap(); - let parsed_transactions = transactions.clone(); let mut scanner = MockBlockScannerImpl::new(); - scanner - .expect_parse() - .return_once(move |_, _, _| Ok(parsed_transactions)); + scanner.expect_scan().return_once(move |_, _, _| Ok(blocks)); CardanoTransactionsImporter::new( Arc::new(scanner),