Skip to content

Commit

Permalink
Change signature of BlockScanner::scan to return an Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Alenar committed Apr 23, 2024
1 parent 9ca6619 commit 988110f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 28 deletions.
16 changes: 8 additions & 8 deletions mithril-aggregator/src/services/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ impl CardanoTransactionsImporter {
// todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
// at once in memory (probably using iterators)
let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?;
let parsed_transactions: Vec<CardanoTransaction> = scanned_blocks
.into_iter()
.flat_map(|b| b.into_transactions())
.collect();
let parsed_transactions: Vec<CardanoTransaction> =
scanned_blocks.flat_map(|b| b.into_transactions()).collect();
debug!(
self.logger,
"TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'",
Expand Down Expand Up @@ -201,7 +199,7 @@ mod tests {
dirpath: &Path,
from_immutable: Option<ImmutableFileNumber>,
until_immutable: ImmutableFileNumber,
) -> StdResult<Vec<ScannedBlock>>;
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
}
}

Expand Down Expand Up @@ -269,7 +267,7 @@ mod tests {
scanner_mock
.expect_scan()
.withf(move |_, from, until| from.is_none() && until == &up_to_beacon)
.return_once(move |_, _, _| Ok(blocks));
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};

Expand Down Expand Up @@ -431,7 +429,7 @@ mod tests {
scanner_mock
.expect_scan()
.withf(move |_, from, until| from == &Some(12) && until == &up_to_beacon)
.return_once(move |_, _, _| Ok(scanned_blocks))
.return_once(move |_, _, _| Ok(Box::new(scanned_blocks.into_iter())))
.once();
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};
Expand Down Expand Up @@ -609,7 +607,9 @@ mod tests {
let importer = {
let connection = cardano_tx_db_connection().unwrap();
let mut scanner = MockBlockScannerImpl::new();
scanner.expect_scan().return_once(move |_, _, _| Ok(blocks));
scanner
.expect_scan()
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));

CardanoTransactionsImporter::new_for_test(
Arc::new(scanner),
Expand Down
25 changes: 13 additions & 12 deletions mithril-common/src/cardano_block_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tokio::sync::RwLock;
/// dirpath: &Path,
/// from_immutable: Option<ImmutableFileNumber>,
/// until_immutable: ImmutableFileNumber,
/// ) -> StdResult<Vec<ScannedBlock>>;
/// ) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
/// }
/// }
///
Expand All @@ -59,7 +59,7 @@ pub trait BlockScanner: Sync + Send {
dirpath: &Path,
from_immutable: Option<ImmutableFileNumber>,
until_immutable: ImmutableFileNumber,
) -> StdResult<Vec<ScannedBlock>>;
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
}

/// Dumb transaction parser
Expand Down Expand Up @@ -89,8 +89,9 @@ impl BlockScanner for DumbBlockScanner {
_dirpath: &Path,
_from_immutable: Option<ImmutableFileNumber>,
_until_immutable: ImmutableFileNumber,
) -> StdResult<Vec<ScannedBlock>> {
Ok(self.blocks.read().await.clone())
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>> {
let iter = self.blocks.read().await.clone().into_iter();
Ok(Box::new(iter))
}
}

Expand Down Expand Up @@ -256,7 +257,7 @@ impl BlockScanner for CardanoBlockScanner {
dirpath: &Path,
from_immutable: Option<ImmutableFileNumber>,
until_immutable: ImmutableFileNumber,
) -> StdResult<Vec<ScannedBlock>> {
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>> {
let is_in_bounds = |number: ImmutableFileNumber| match from_immutable {
Some(from) => (from..=until_immutable).contains(&number),
None => number <= until_immutable,
Expand All @@ -279,7 +280,7 @@ impl BlockScanner for CardanoBlockScanner {
scanned_blocks.append(&mut blocks);
}

Ok(scanned_blocks)
Ok(Box::new(scanned_blocks.into_iter()))
}
}

Expand Down Expand Up @@ -325,7 +326,7 @@ mod tests {
.scan(db_path, None, until_immutable_file)
.await
.unwrap();
let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum();
let tx_count: usize = blocks.map(|b| b.transactions.len()).sum();

assert_eq!(tx_count, expected_tx_count);
}
Expand All @@ -346,7 +347,7 @@ mod tests {
.scan(db_path, Some(2), until_immutable_file)
.await
.unwrap();
let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum();
let tx_count: usize = blocks.map(|b| b.transactions.len()).sum();

assert_eq!(tx_count, expected_tx_count);
}
Expand Down Expand Up @@ -379,10 +380,10 @@ mod tests {
let cardano_transaction_parser =
CardanoBlockScanner::new(create_file_logger(&filepath), true);

cardano_transaction_parser
let res = cardano_transaction_parser
.scan(db_path, None, until_immutable_file)
.await
.expect_err("parse should have failed");
.await;
assert!(res.is_err(), "parse should have failed");
}

let log_file = std::fs::read_to_string(&filepath).unwrap();
Expand All @@ -405,7 +406,7 @@ mod tests {
.scan(db_path, None, until_immutable_file)
.await
.unwrap();
let tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum();
let tx_count: usize = blocks.map(|b| b.transactions.len()).sum();

assert_eq!(tx_count, expected_tx_count);
}
Expand Down
16 changes: 8 additions & 8 deletions mithril-signer/src/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ impl CardanoTransactionsImporter {
// todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
// at once in memory (probably using iterators)
let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?;
let parsed_transactions: Vec<CardanoTransaction> = scanned_blocks
.into_iter()
.flat_map(|b| b.into_transactions())
.collect();
let parsed_transactions: Vec<CardanoTransaction> =
scanned_blocks.flat_map(|b| b.into_transactions()).collect();
debug!(
self.logger,
"TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'",
Expand Down Expand Up @@ -201,7 +199,7 @@ mod tests {
dirpath: &Path,
from_immutable: Option<ImmutableFileNumber>,
until_immutable: ImmutableFileNumber,
) -> StdResult<Vec<ScannedBlock>>;
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
}
}

Expand Down Expand Up @@ -269,7 +267,7 @@ mod tests {
scanner_mock
.expect_scan()
.withf(move |_, from, until| from.is_none() && until == &up_to_beacon)
.return_once(move |_, _, _| Ok(blocks));
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};

Expand Down Expand Up @@ -431,7 +429,7 @@ mod tests {
scanner_mock
.expect_scan()
.withf(move |_, from, until| from == &Some(12) && until == &up_to_beacon)
.return_once(move |_, _, _| Ok(scanned_blocks))
.return_once(move |_, _, _| Ok(Box::new(scanned_blocks.into_iter())))
.once();
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};
Expand Down Expand Up @@ -609,7 +607,9 @@ mod tests {
let importer = {
let connection = cardano_tx_db_connection().unwrap();
let mut scanner = MockBlockScannerImpl::new();
scanner.expect_scan().return_once(move |_, _, _| Ok(blocks));
scanner
.expect_scan()
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));

CardanoTransactionsImporter::new_for_test(
Arc::new(scanner),
Expand Down

0 comments on commit 988110f

Please sign in to comment.