Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed Apr 29, 2024
1 parent b914203 commit b40281f
Show file tree
Hide file tree
Showing 16 changed files with 353 additions and 241 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use mithril_persistence::sqlite::{Provider, SourceAlias, SqLiteEntity, SqliteConnection};
use mithril_common::entities::BlockNumber;
use mithril_persistence::sqlite::{
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
};
use sqlite::Value;

use crate::database::record::BlockRangeRootRecord;

Expand All @@ -8,11 +12,14 @@ pub struct GetBlockRangeRootProvider<'client> {
}

impl<'client> GetBlockRangeRootProvider<'client> {
#[cfg(test)]
/// Create a new instance
pub fn new(connection: &'client SqliteConnection) -> Self {
Self { connection }
}

pub fn get_up_to_block_number_condition(&self, block_number: BlockNumber) -> WhereCondition {
WhereCondition::new("end < ?*", vec![Value::Integer(block_number as i64)])
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod get_block_range_root;
mod get_interval_without_block_range_provider;
mod insert_block_range;

#[cfg(test)]
pub use get_block_range_root::*;
pub use get_interval_without_block_range_provider::*;
pub use insert_block_range::*;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Range;

use sqlite::Value;

use mithril_common::entities::{BlockNumber, ImmutableFileNumber, TransactionHash};
use mithril_common::entities::{BlockNumber, TransactionHash};
use mithril_persistence::sqlite::{
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
};
Expand Down Expand Up @@ -40,16 +40,6 @@ impl<'client> GetCardanoTransactionProvider<'client> {
WhereCondition::where_in("transaction_hash", hashes_values)
}

pub fn get_transaction_up_to_beacon_condition(
&self,
beacon: ImmutableFileNumber,
) -> WhereCondition {
WhereCondition::new(
"immutable_file_number <= ?*",
vec![Value::Integer(beacon as i64)],
)
}

pub fn get_transaction_between_blocks_condition(
&self,
range: Range<BlockNumber>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use mithril_common::entities::{
BlockHash, BlockNumber, BlockRange, CardanoDbBeacon, CardanoTransaction, ImmutableFileNumber,
SlotNumber, TransactionHash,
};
use mithril_common::signable_builder::BlockRangeRootRetriever;
use mithril_common::StdResult;
use mithril_persistence::sqlite::{Provider, SqliteConnection, WhereCondition};
use sqlite::Value;

use crate::database::provider::{
GetCardanoTransactionProvider, GetIntervalWithoutBlockRangeRootProvider,
InsertBlockRangeRootProvider, InsertCardanoTransactionProvider,
GetBlockRangeRootProvider, GetCardanoTransactionProvider,
GetIntervalWithoutBlockRangeRootProvider, InsertBlockRangeRootProvider,
InsertCardanoTransactionProvider,
};
use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
use crate::services::{TransactionStore, TransactionsRetriever};
Expand Down Expand Up @@ -60,8 +63,14 @@ impl CardanoTransactionRepository {
&self,
beacon: ImmutableFileNumber,
) -> StdResult<Vec<CardanoTransactionRecord>> {
// Get the highest block number for the given immutable number.
// This is a temporary fix that will be removed when the retrieval is based on block number instead of immutable number.
let block_number = self
.get_highest_block_number_for_immutable_number(beacon)
.await?
.unwrap_or(0);
let provider = GetCardanoTransactionProvider::new(&self.connection);
let filters = provider.get_transaction_up_to_beacon_condition(beacon);
let filters = provider.get_transaction_between_blocks_condition(0..block_number);
let transactions = provider.find(filters)?;

Ok(transactions.collect())
Expand Down Expand Up @@ -130,6 +139,48 @@ impl CardanoTransactionRepository {

Ok(cursor.collect())
}

// TODO: remove this function when the Cardano transaction signature is based on block number instead of immutable number
async fn get_highest_block_number_for_immutable_number(
&self,
immutable_file_number: ImmutableFileNumber,
) -> StdResult<Option<BlockNumber>> {
let sql =
"select max(block_number) as highest from cardano_tx where immutable_file_number <= $1;";
match self
.connection
.prepare(sql)
.with_context(|| {
format!(
"Prepare query error: SQL=`{}`",
&sql.replace('\n', " ").trim()
)
})?
.iter()
.bind::<&[(_, Value)]>(&[(1, Value::Integer(immutable_file_number as i64))])?
.next()
{
None => Ok(None),
Some(row) => {
let highest = row?.read::<Option<i64>, _>(0);
highest
.map(u64::try_from)
.transpose()
.with_context(||
format!("Integer field max(block_number) (value={highest:?}) is incompatible with u64 representation.")
)
}
}
}

#[cfg(test)]
pub(crate) async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
let provider = GetCardanoTransactionProvider::new(&self.connection);
let filters = WhereCondition::default();
let transactions = provider.find(filters)?;

Ok(transactions.map(|record| record.into()).collect::<Vec<_>>())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -261,6 +312,31 @@ impl TransactionsRetriever for CardanoTransactionRepository {
}
}

#[async_trait]
impl BlockRangeRootRetriever for CardanoTransactionRepository {
async fn retrieve_block_range_roots(
&self,
up_to_beacon: ImmutableFileNumber,
) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)>>> {
// Get the highest block number for the given immutable number.
// This is a temporary fix that will be removed when the retrieval is based on block number instead of immutable number.
let block_number = self
.get_highest_block_number_for_immutable_number(up_to_beacon)
.await?
.unwrap_or(0);
let provider = GetBlockRangeRootProvider::new(&self.connection);
let filters = provider.get_up_to_block_number_condition(block_number);
let block_range_roots = provider.find(filters)?;
let block_range_roots = block_range_roots
.into_iter()
.map(|record| -> (BlockRange, MKTreeNode) { record.into() })
.collect::<Vec<_>>() // TODO: remove this collect when we should ba able return the iterator directly
.into_iter();

Ok(Box::new(block_range_roots))
}
}

#[cfg(test)]
mod tests {
use mithril_persistence::sqlite::GetAllProvider;
Expand Down Expand Up @@ -413,7 +489,7 @@ mod tests {
let cardano_transactions: Vec<CardanoTransactionRecord> = (20..=40)
.map(|i| CardanoTransactionRecord {
transaction_hash: format!("tx-hash-{i}"),
block_number: i % 10,
block_number: i / 10,
slot_number: i * 100,
block_hash: format!("block-hash-{i}"),
immutable_file_number: i,
Expand All @@ -425,10 +501,10 @@ mod tests {
.unwrap();

let transaction_result = repository.get_transactions_up_to(34).await.unwrap();
assert_eq!(cardano_transactions[0..=14].to_vec(), transaction_result);
assert_eq!(cardano_transactions[0..10].to_vec(), transaction_result);

let transaction_result = repository.get_transactions_up_to(300).await.unwrap();
assert_eq!(cardano_transactions.clone(), transaction_result);
assert_eq!(cardano_transactions[0..20].to_vec(), transaction_result);

let transaction_result = repository.get_transactions_up_to(19).await.unwrap();
assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
Expand Down
2 changes: 2 additions & 0 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,10 @@ impl DependenciesBuilder {
Some(1),
self.get_logger().await?,
));
let block_range_root_retriever = self.get_transaction_repository().await?;
let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::new(
transactions_importer,
block_range_root_retriever,
self.get_logger().await?,
));
let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
Expand Down
37 changes: 17 additions & 20 deletions mithril-aggregator/src/services/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,9 @@ impl CardanoTransactionsImporter {

#[async_trait]
impl TransactionsImporter for CardanoTransactionsImporter {
async fn import(
&self,
up_to_beacon: ImmutableFileNumber,
) -> StdResult<Vec<CardanoTransaction>> {
async fn import(&self, up_to_beacon: ImmutableFileNumber) -> StdResult<()> {
self.import_transactions(up_to_beacon).await?;
self.import_block_ranges().await?;

let transactions = self.transaction_store.get_up_to(up_to_beacon).await?;
Ok(transactions)
self.import_block_ranges().await
}
}

Expand Down Expand Up @@ -288,7 +282,7 @@ mod tests {
.await
.expect("Transactions Importer should succeed");

let stored_transactions = repository.get_up_to(10000).await.unwrap();
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(expected_transactions, stored_transactions);
}

Expand Down Expand Up @@ -410,7 +404,7 @@ mod tests {
.await
.expect("Transactions Importer should succeed");

let transactions = repository.get_up_to(10000).await.unwrap();
let transactions = repository.get_all().await.unwrap();
assert_eq!(vec![last_tx], transactions);
}

Expand Down Expand Up @@ -448,15 +442,15 @@ mod tests {
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};

let stored_transactions = repository.get_up_to(10000).await.unwrap();
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(stored_block.into_transactions(), stored_transactions);

importer
.import_transactions(up_to_beacon)
.await
.expect("Transactions Importer should succeed");

let stored_transactions = repository.get_up_to(10000).await.unwrap();
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(expected_transactions, stored_transactions);
}

Expand Down Expand Up @@ -619,24 +613,27 @@ mod tests {
ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]),
];
let transactions = into_transactions(&blocks);
let importer = {
let connection = cardano_tx_db_connection().unwrap();

CardanoTransactionsImporter::new_for_test(
let (importer, repository) = {
let connection = Arc::new(cardano_tx_db_connection().unwrap());
let repository = Arc::new(CardanoTransactionRepository::new(connection.clone()));
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(DumbBlockScanner::new(blocks.clone())),
Arc::new(CardanoTransactionRepository::new(Arc::new(connection))),
)
Arc::new(CardanoTransactionRepository::new(connection.clone())),
);
(importer, repository)
};

let cold_imported_transactions = importer
importer
.import(12)
.await
.expect("Transactions Importer should succeed");
let cold_imported_transactions = repository.get_all().await.unwrap();

let warm_imported_transactions = importer
importer
.import(12)
.await
.expect("Transactions Importer should succeed");
let warm_imported_transactions = repository.get_all().await.unwrap();

assert_eq!(transactions, cold_imported_transactions);
assert_eq!(cold_imported_transactions, warm_imported_transactions);
Expand Down
41 changes: 27 additions & 14 deletions mithril-aggregator/src/services/prover.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use anyhow::Context;
use async_trait::async_trait;
Expand Down Expand Up @@ -70,25 +70,37 @@ impl MithrilProverService {
&self,
transactions: Vec<CardanoTransaction>,
) -> StdResult<MKMap<BlockRange, MKMapNode<BlockRange>>> {
let mut transactions_by_block_ranges: HashMap<BlockRange, Vec<TransactionHash>> =
HashMap::new();
let mut transactions_by_block_ranges: BTreeMap<BlockRange, Vec<TransactionHash>> =
BTreeMap::new();
let mut last_transaction: Option<CardanoTransaction> = None;
for transaction in transactions {
let block_range = BlockRange::from_block_number(transaction.block_number);
last_transaction = Some(transaction.clone());
transactions_by_block_ranges
.entry(block_range)
.or_default()
.push(transaction.transaction_hash);
}
let mut block_ranges = transactions_by_block_ranges.into_iter().try_fold(
vec![],
|mut acc, (block_range, transactions)| -> StdResult<Vec<(_, MKMapNode<_>)>> {
acc.push((block_range, MKTree::new(&transactions)?.into()));
Ok(acc)
},
)?;

// This is a temporary fix to avoid including an incomplete block ranges in the computation of the prover.
// This will be swiftly replaced by the new way of computing proof relying on the block range roots stored in database.
if let Some(transaction) = last_transaction {
if let Some((last_block_range, _)) = block_ranges.last() {
if transaction.block_number < last_block_range.end - 1 {
block_ranges.pop();
}
}
}

let mk_hash_map = MKMap::new_from_iter(
transactions_by_block_ranges
.into_iter()
.try_fold(
vec![],
|mut acc, (block_range, transactions)| -> StdResult<Vec<(_, MKMapNode<_>)>> {
acc.push((block_range, MKTree::new(&transactions)?.into()));
Ok(acc)
},
)?,
block_ranges
)
.with_context(|| "ProverService failed to compute the merkelized structure that proves ownership of the transaction")?;

Expand Down Expand Up @@ -143,6 +155,8 @@ impl ProverService for MithrilProverService {

#[cfg(test)]
mod tests {
use std::cmp::max;

use anyhow::anyhow;
use mithril_common::entities::CardanoTransaction;
use mithril_common::test_utils::fake_data;
Expand All @@ -160,7 +174,7 @@ mod tests {
let hash = format!("tx-{i}");
transactions.push(CardanoTransaction::new(
&hash,
10 * i as u64,
max(0, 10 * i - 1) as u64,
100 * i as u64,
format!("block_hash-{i}"),
i as u64,
Expand Down Expand Up @@ -200,7 +214,6 @@ mod tests {
.compute_transactions_proofs(&fake_data::beacon(), &transaction_hashes)
.await
.unwrap();

assert_eq!(transactions_set_proof.len(), 1);
assert_eq!(
transactions_set_proof[0].transactions_hashes(),
Expand Down

0 comments on commit b40281f

Please sign in to comment.