Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/src/queries/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum BlocksStateQuery {
GetBlockHashes {
block_numbers: Vec<u64>,
},
GetBlockHashesByNumberRange {
min_number: u64,
max_number: u64,
},
GetTransactionHashes {
tx_ids: Vec<TxIdentifier>,
},
Expand Down Expand Up @@ -97,6 +101,7 @@ pub enum BlocksStateQueryResponse {
BlockTransactionsCBOR(BlockTransactionsCBOR),
BlockInvolvedAddresses(BlockInvolvedAddresses),
BlockHashes(BlockHashes),
BlockHashesByNumberRange(Vec<BlockHash>),
TransactionHashes(TransactionHashes),
UTxOHashes(UTxOHashes),
TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps),
Expand Down
20 changes: 10 additions & 10 deletions modules/block_kes_validator/src/block_kes_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
"bootstrapped-subscribe-topic",
"cardano.sequence.bootstrapped",
);
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
("blocks-subscribe-topic", "cardano.block.proposed");
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
("block-subscribe-topic", "cardano.block.proposed");
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
"protocol-parameters-subscribe-topic",
"cardano.protocol.parameters",
Expand All @@ -50,7 +50,7 @@ impl BlockKesValidator {
history: Arc<Mutex<StateHistory<State>>>,
kes_validation_publisher: KesValidationPublisher,
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
mut blocks_subscription: Box<dyn Subscription<Message>>,
mut block_subscription: Box<dyn Subscription<Message>>,
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
mut spo_state_subscription: Box<dyn Subscription<Message>>,
) -> Result<()> {
Expand All @@ -70,7 +70,7 @@ impl BlockKesValidator {
let mut state = history.lock().await.get_or_init_with(State::new);
let mut current_block: Option<BlockInfo> = None;

let (_, message) = blocks_subscription.read().await?;
let (_, message) = block_subscription.read().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
// handle rollback here
Expand Down Expand Up @@ -160,10 +160,10 @@ impl BlockKesValidator {
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");

let blocks_subscribe_topic = config
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
let block_subscribe_topic = config
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating block subscription on '{block_subscribe_topic}'");

let protocol_parameters_subscribe_topic = config
.get_string(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.0)
Expand All @@ -181,7 +181,7 @@ impl BlockKesValidator {

// Subscribers
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
let protocol_parameters_subscription =
context.subscribe(&protocol_parameters_subscribe_topic).await?;
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
Expand All @@ -198,7 +198,7 @@ impl BlockKesValidator {
history,
kes_validation_publisher,
bootstrapped_subscription,
blocks_subscription,
block_subscription,
protocol_parameters_subscription,
spo_state_subscription,
)
Expand Down
20 changes: 10 additions & 10 deletions modules/block_vrf_validator/src/block_vrf_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
"bootstrapped-subscribe-topic",
"cardano.sequence.bootstrapped",
);
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
("blocks-subscribe-topic", "cardano.block.proposed");
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
("block-subscribe-topic", "cardano.block.proposed");
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
"protocol-parameters-subscribe-topic",
"cardano.protocol.parameters",
Expand All @@ -55,7 +55,7 @@ impl BlockVrfValidator {
history: Arc<Mutex<StateHistory<State>>>,
mut vrf_validation_publisher: VrfValidationPublisher,
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
mut blocks_subscription: Box<dyn Subscription<Message>>,
mut block_subscription: Box<dyn Subscription<Message>>,
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
mut spo_state_subscription: Box<dyn Subscription<Message>>,
Expand All @@ -77,7 +77,7 @@ impl BlockVrfValidator {
let mut state = history.lock().await.get_or_init_with(State::new);
let mut current_block: Option<BlockInfo> = None;

let (_, message) = blocks_subscription.read().await?;
let (_, message) = block_subscription.read().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
// handle rollback here
Expand Down Expand Up @@ -190,10 +190,10 @@ impl BlockVrfValidator {
.unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'");

let blocks_subscribe_topic = config
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating blocks subscription on '{blocks_subscribe_topic}'");
let block_subscribe_topic = config
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating block subscription on '{block_subscribe_topic}'");

let epoch_activity_subscribe_topic = config
.get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0)
Expand All @@ -218,7 +218,7 @@ impl BlockVrfValidator {
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
let protocol_parameters_subscription =
context.subscribe(&protocol_parameters_subscribe_topic).await?;
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
let epoch_activity_subscription =
context.subscribe(&epoch_activity_subscribe_topic).await?;
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
Expand All @@ -236,7 +236,7 @@ impl BlockVrfValidator {
history,
vrf_validation_publisher,
bootstrapped_subscription,
blocks_subscription,
block_subscription,
protocol_parameters_subscription,
epoch_activity_subscription,
spo_state_subscription,
Expand Down
20 changes: 20 additions & 0 deletions modules/chain_store/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,26 @@ impl ChainStore {
block_hashes,
}))
}
BlocksStateQuery::GetBlockHashesByNumberRange {
min_number,
max_number,
} => {
if *max_number < *min_number {
return Ok(BlocksStateQueryResponse::Error(
QueryError::invalid_request("Invalid number range"),
));
}
let mut block_hashes = Vec::new();
let blocks = store.get_blocks_by_number_range(*min_number, *max_number)?;
for block in blocks {
if let Ok(hash) = Self::get_block_hash(&block) {
block_hashes.push(hash);
}
}
Ok(BlocksStateQueryResponse::BlockHashesByNumberRange(
block_hashes,
))
}
BlocksStateQuery::GetTransactionHashes { tx_ids } => {
let mut block_ids: HashMap<_, Vec<_>> = HashMap::new();
for tx_id in tx_ids {
Expand Down
13 changes: 13 additions & 0 deletions modules/chain_store/src/stores/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ impl FjallBlockStore {
}

fn get_by_number_range(&self, min_number: u64, max_number: u64) -> Result<Vec<Block>> {
if max_number < min_number {
return Err(anyhow::anyhow!(
"Invalid number range min={min_number}, max={max_number}"
));
}
let expected_count = max_number - min_number + 1;

let min_number_bytes = min_number.to_be_bytes();
let max_number_bytes = max_number.to_be_bytes();
let mut blocks = vec![];
Expand All @@ -202,6 +209,12 @@ impl FjallBlockStore {
blocks.push(block);
}
}
if blocks.len() as u64 != expected_count {
return Err(anyhow::anyhow!(
"Expected {expected_count} blocks, got {}",
blocks.len()
));
}
Ok(blocks)
}

Expand Down
22 changes: 11 additions & 11 deletions modules/epochs_state/src/epochs_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
"bootstrapped-subscribe-topic",
"cardano.sequence.bootstrapped",
);
const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) =
("blocks-subscribe-topic", "cardano.block.proposed");
const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) =
("block-subscribe-topic", "cardano.block.proposed");
const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) =
("block-txs-subscribe-topic", "cardano.block.txs");
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
Expand All @@ -58,7 +58,7 @@ impl EpochsState {
history: Arc<Mutex<StateHistory<State>>>,
epochs_history: EpochsHistoryState,
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
mut blocks_subscription: Box<dyn Subscription<Message>>,
mut block_subscription: Box<dyn Subscription<Message>>,
mut block_txs_subscription: Box<dyn Subscription<Message>>,
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
mut epoch_activity_publisher: EpochActivityPublisher,
Expand All @@ -80,11 +80,11 @@ impl EpochsState {
let mut current_block: Option<BlockInfo> = None;

// Read both topics in parallel
let blocks_message_f = blocks_subscription.read();
let block_message_f = block_subscription.read();
let block_txs_message_f = block_txs_subscription.read();

// Handle blocks first
let (_, message) = blocks_message_f.await?;
let (_, message) = block_message_f.await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => {
// handle rollback here
Expand Down Expand Up @@ -188,10 +188,10 @@ impl EpochsState {
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");

let blocks_subscribe_topic = config
.get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'");
let block_subscribe_topic = config
.get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for blocks on '{block_subscribe_topic}'");

let block_txs_subscribe_topic = config
.get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0)
Expand Down Expand Up @@ -231,7 +231,7 @@ impl EpochsState {

// Subscribe
let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?;
let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?;
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
let protocol_parameters_subscription =
context.subscribe(&protocol_parameters_subscribe_topic).await?;
let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?;
Expand Down Expand Up @@ -342,7 +342,7 @@ impl EpochsState {
history,
epochs_history,
bootstrapped_subscription,
blocks_subscription,
block_subscription,
block_txs_subscription,
protocol_parameters_subscription,
epoch_activity_publisher,
Expand Down
26 changes: 23 additions & 3 deletions modules/epochs_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ impl State {
epoch: 0,
epoch_start_time: genesis.byron_timestamp,
first_block_time: genesis.byron_timestamp,
first_block_height: 0,
last_block_time: 0,
last_block_height: 0,
// NOTE:
// First block height is 1
// only because we don't handle EBB for now
// so by default, we counter epoch 0's EBB
first_block_height: 1,
last_block_time: genesis.byron_timestamp,
last_block_height: 1,
blocks_minted: HashMap::new(),
epoch_blocks: 0,
epoch_txs: 0,
Expand Down Expand Up @@ -342,6 +346,22 @@ mod tests {
);
}

#[test]
fn handle_mint_without_issuer() {
let mut state = State::new(&GenesisValues::mainnet());
let mut block = make_block(100);
state.handle_mint(&block, None);
block.number += 1;
state.handle_mint(&block, None);

assert_eq!(state.epoch_blocks, 2);
assert_eq!(state.blocks_minted.len(), 0);
assert_eq!(
state.blocks_minted.get(&keyhash_224(b"issuer").into()),
None
);
}

#[test]
fn handle_mint_multiple_issuer_records_counts() {
let mut state = State::new(&GenesisValues::mainnet());
Expand Down
44 changes: 26 additions & 18 deletions modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ use tracing::{debug, error, info, info_span, Instrument};
mod pause;
use pause::PauseType;

const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped";
const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available";
const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete";
const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
"bootstrapped-subscribe-topic",
"cardano.sequence.bootstrapped",
);
const DEFAULT_BLOCK_PUBLISH_TOPIC: (&str, &str) =
("block-publish-topic", "cardano.block.available");
const DEFAULT_COMPLETION_TOPIC: (&str, &str) = ("completion-topic", "cardano.snapshot.complete");

const DEFAULT_AGGREGATOR_URL: &str =
"https://aggregator.release-mainnet.api.mithril.network/aggregator";
Expand Down Expand Up @@ -237,12 +241,14 @@ impl MithrilSnapshotFetcher {
config: Arc<Config>,
genesis: GenesisValues,
) -> Result<()> {
let block_topic =
config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string());
info!("Publishing blocks on '{block_topic}'");

let completion_topic =
config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string());
let block_publish_topic = config
.get_string(DEFAULT_BLOCK_PUBLISH_TOPIC.0)
.unwrap_or(DEFAULT_BLOCK_PUBLISH_TOPIC.1.to_string());
info!("Publishing blocks on '{block_publish_topic}'");

let completion_topic = config
.get_string(DEFAULT_COMPLETION_TOPIC.0)
.unwrap_or(DEFAULT_COMPLETION_TOPIC.1.to_string());
info!("Publishing completion on '{completion_topic}'");

let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string());
Expand Down Expand Up @@ -356,7 +362,7 @@ impl MithrilSnapshotFetcher {

context
.message_bus
.publish(&block_topic, Arc::new(message_enum))
.publish(&block_publish_topic, Arc::new(message_enum))
.await
.unwrap_or_else(|e| error!("Failed to publish block message: {e}"));

Expand Down Expand Up @@ -389,21 +395,23 @@ impl MithrilSnapshotFetcher {

/// Main init function
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
let startup_topic =
config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string());
info!("Creating startup subscriber on '{startup_topic}'");
let bootstrapped_subscribe_topic = config
.get_string(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'");

let mut subscription = context.subscribe(&startup_topic).await?;
let mut bootstrapped_subscription =
context.subscribe(&bootstrapped_subscribe_topic).await?;
context.clone().run(async move {
let Ok((_, startup_message)) = subscription.read().await else {
let Ok((_, bootstrapped_message)) = bootstrapped_subscription.read().await else {
return;
};
info!("Received startup message");
let genesis = match startup_message.as_ref() {
info!("Received bootstrapped message");
let genesis = match bootstrapped_message.as_ref() {
Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => {
complete.values.clone()
}
x => panic!("unexpected startup message: {x:?}"),
x => panic!("unexpected bootstrapped message: {x:?}"),
};

let mut delay = 1;
Expand Down
Loading