Skip to content

Commit

Permalink
Sync TransactionImporter changes to the mithril-signer
Browse files Browse the repository at this point in the history
  • Loading branch information
Alenar committed Jul 15, 2024
1 parent 0056172 commit 684fde6
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 48 deletions.
149 changes: 107 additions & 42 deletions mithril-signer/src/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ pub trait TransactionStore: Send + Sync {
/// Get the highest known transaction beacon
async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>>;

/// Get the highest stored block range root bounds
async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>>;

/// Store list of transactions
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;

/// Get the interval of blocks whose merkle root has yet to be computed
async fn get_block_interval_without_block_range_root(
&self,
) -> StdResult<Option<Range<BlockNumber>>>;

/// Get transactions in an interval of blocks
async fn get_transactions_in_range(
&self,
Expand Down Expand Up @@ -129,15 +127,16 @@ impl CardanoTransactionsImporter {
Ok(())
}

async fn import_block_ranges(&self) -> StdResult<()> {
let block_ranges = match self
.transaction_store
.get_block_interval_without_block_range_root()
.await?
.map(|range| BlockRange::all_block_ranges_in(BlockRange::start(range.start)..range.end))
{
// Everything is already computed
None => return Ok(()),
async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
|highest_stored_block_range| {
BlockRange::all_block_ranges_in(
BlockRange::start(highest_stored_block_range.end)..=(until),
)
},
) {
// No block range root stored yet, start from the beginning
None => BlockRange::all_block_ranges_in(0..=(until)),
// Not enough block to form at least one block range
Some(ranges) if ranges.is_empty() => return Ok(()),
Some(ranges) => ranges,
Expand Down Expand Up @@ -181,7 +180,7 @@ impl CardanoTransactionsImporter {
up_to_beacon: BlockNumber,
) -> StdResult<()> {
self.import_transactions(up_to_beacon).await?;
self.import_block_ranges().await
self.import_block_ranges(up_to_beacon).await
}
}

Expand Down Expand Up @@ -315,7 +314,8 @@ mod tests {
SqliteConnectionPool::build_from_connection(connection),
)));

let blocks = build_blocks(0, BlockRange::LENGTH * 5 + 1);
let up_to_block_number = BlockRange::LENGTH * 5;
let blocks = build_blocks(0, up_to_block_number + 1);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();

Expand All @@ -325,7 +325,7 @@ mod tests {
);

importer
.import_block_ranges()
.import_block_ranges(up_to_block_number)
.await
.expect("Transactions Importer should succeed");

Expand All @@ -352,6 +352,7 @@ mod tests {
SqliteConnectionPool::build_from_connection(connection),
)));

let up_to_block_number = BlockRange::LENGTH * 4;
// Two block ranges with a gap
let blocks: Vec<ScannedBlock> = [
build_blocks(0, BlockRange::LENGTH),
Expand All @@ -367,7 +368,7 @@ mod tests {
);

importer
.import_block_ranges()
.import_block_ranges(up_to_block_number)
.await
.expect("Transactions Importer should succeed");

Expand Down Expand Up @@ -397,7 +398,7 @@ mod tests {
);

importer
.import_block_ranges()
.import_block_ranges(10_000)
.await
.expect("Transactions Importer should succeed");

Expand Down Expand Up @@ -504,7 +505,8 @@ mod tests {
SqliteConnectionPool::build_from_connection(connection),
)));

let blocks = build_blocks(0, BlockRange::LENGTH * 4 + 1);
let up_to_block_number = BlockRange::LENGTH * 4;
let blocks = build_blocks(0, up_to_block_number + 1);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();
repository
Expand Down Expand Up @@ -540,7 +542,7 @@ mod tests {
);

importer
.import_block_ranges()
.import_block_ranges(up_to_block_number)
.await
.expect("Transactions Importer should succeed");

Expand All @@ -559,35 +561,97 @@ mod tests {
);
}

#[tokio::test]
async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
SqliteConnectionPool::build_from_connection(connection),
)));

// Transactions for all blocks in the (15..=29) interval
let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();

let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);

importer
.import_block_ranges(BlockRange::LENGTH * 2 - 1)
.await
.expect("Transactions Importer should succeed");

let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![BlockRange::from_block_number(BlockRange::LENGTH)],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
}

#[tokio::test]
async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
SqliteConnectionPool::build_from_connection(connection),
)));

// For the block range (15..=29) we only have transactions in the 10 first blocks (15..=24)
let blocks = build_blocks(BlockRange::LENGTH, 10);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();

let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);

importer
.import_block_ranges(BlockRange::LENGTH * 2)
.await
.expect("Transactions Importer should succeed");

let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![BlockRange::from_block_number(BlockRange::LENGTH)],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
}

#[tokio::test]
async fn block_range_root_retrieves_only_strictly_required_transactions() {
fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
Ok(build_blocks(range.start, range.count() as BlockNumber)
.iter()
.flat_map(|b| b.clone().into_transactions())
.into_iter()
.flat_map(|b| b.into_transactions())
.collect())
}
const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
const UP_TO_BLOCK_NUMBER: BlockNumber = BlockRange::LENGTH * 5;

let importer = {
let mut store_mock = MockTransactionStore::new();
store_mock
.expect_get_block_interval_without_block_range_root()
// Specification of the interval without block range root
// Note: in reality the lower bound will always be a multiple of BlockRange::LENGTH
// since it's computed from the `block_range_root` table
.returning(|| Ok(Some((BlockRange::LENGTH + 2)..(BlockRange::LENGTH * 5))))
.expect_get_highest_block_range()
.returning(|| {
Ok(Some(BlockRange::from_block_number(
HIGHEST_BLOCK_RANGE_START,
)))
})
.once();
store_mock
.expect_get_transactions_in_range()
// Lower bound should be the block number that start after the last known block range end
//
// if it's not a multiple of BlockRange::LENGTH, it should be the start block number
// of the block range that contains the end of the last known block range.
//
// Upper bound should be the block number of the highest transaction in a db that can be
// included in a block range
// Lower bound should be the end block number of the last known block range
// Upper bound should be the block number provided to `import_block_ranges`
.withf(|range| {
BlockRangesSequence::new(BlockRange::LENGTH..(BlockRange::LENGTH * 5))
BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=UP_TO_BLOCK_NUMBER)
.contains(range)
})
.returning(transactions_for_block);
Expand All @@ -602,7 +666,7 @@ mod tests {
};

importer
.import_block_ranges()
.import_block_ranges(UP_TO_BLOCK_NUMBER)
.await
.expect("Transactions Importer should succeed");
}
Expand All @@ -615,7 +679,8 @@ mod tests {
)));

// 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation
let blocks = build_blocks(0, BlockRange::LENGTH * 2 + 1);
let up_to_block_number = BlockRange::LENGTH * 2;
let blocks = build_blocks(0, up_to_block_number + 1);
let transactions = into_transactions(&blocks);
let expected_block_range_roots = vec![
(
Expand All @@ -638,7 +703,7 @@ mod tests {
);

importer
.import_block_ranges()
.import_block_ranges(up_to_block_number)
.await
.expect("Transactions Importer should succeed");

Expand Down Expand Up @@ -717,14 +782,14 @@ mod tests {
.await
.unwrap();

let chain_point = ChainPoint::new(5, 130, "block_hash-131");
let chain_point = ChainPoint::new(5, 130, "block_hash-130");
let scanner = DumbBlockScanner::new().backward(chain_point);

let importer =
CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());

importer
.import(3000)
.import_transactions(3000)
.await
.expect("Transactions Importer should succeed");

Expand Down Expand Up @@ -791,7 +856,7 @@ mod tests {
CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());

importer
.import(3000)
.import_transactions(3000)
.await
.expect("Transactions Importer should succeed");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ impl TransactionStore for CardanoTransactionRepository {
self.get_transaction_highest_chain_point().await
}

async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
self.store_transactions(transactions).await
async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
let record = self.retrieve_highest_block_range_root().await?;
Ok(record.map(|record| record.range))
}

async fn get_block_interval_without_block_range_root(
&self,
) -> StdResult<Option<Range<BlockNumber>>> {
self.get_block_interval_without_block_range_root().await
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
self.store_transactions(transactions).await
}

async fn get_transactions_in_range(
Expand Down

0 comments on commit 684fde6

Please sign in to comment.