diff --git a/common/src/queries/blocks.rs b/common/src/queries/blocks.rs index 0bda2960..14631546 100644 --- a/common/src/queries/blocks.rs +++ b/common/src/queries/blocks.rs @@ -66,6 +66,10 @@ pub enum BlocksStateQuery { GetBlockHashes { block_numbers: Vec, }, + GetBlockHashesByNumberRange { + min_number: u64, + max_number: u64, + }, GetTransactionHashes { tx_ids: Vec, }, @@ -97,6 +101,7 @@ pub enum BlocksStateQueryResponse { BlockTransactionsCBOR(BlockTransactionsCBOR), BlockInvolvedAddresses(BlockInvolvedAddresses), BlockHashes(BlockHashes), + BlockHashesByNumberRange(Vec), TransactionHashes(TransactionHashes), UTxOHashes(UTxOHashes), TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps), diff --git a/modules/block_kes_validator/src/block_kes_validator.rs b/modules/block_kes_validator/src/block_kes_validator.rs index 25eafcb1..3f88f45f 100644 --- a/modules/block_kes_validator/src/block_kes_validator.rs +++ b/modules/block_kes_validator/src/block_kes_validator.rs @@ -26,8 +26,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "bootstrapped-subscribe-topic", "cardano.sequence.bootstrapped", ); -const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-subscribe-topic", "cardano.block.proposed"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( "protocol-parameters-subscribe-topic", "cardano.protocol.parameters", @@ -50,7 +50,7 @@ impl BlockKesValidator { history: Arc>>, kes_validation_publisher: KesValidationPublisher, mut bootstrapped_subscription: Box>, - mut blocks_subscription: Box>, + mut block_subscription: Box>, mut protocol_parameters_subscription: Box>, mut spo_state_subscription: Box>, ) -> Result<()> { @@ -70,7 +70,7 @@ impl BlockKesValidator { let mut state = history.lock().await.get_or_init_with(State::new); let mut current_block: Option = None; - let (_, message) = blocks_subscription.read().await?; + let (_, message) = block_subscription.read().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -160,10 +160,10 @@ impl BlockKesValidator { .unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'"); - let blocks_subscribe_topic = config - .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating blocks subscription on '{blocks_subscribe_topic}'"); + let block_subscribe_topic = config + .get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating block subscription on '{block_subscribe_topic}'"); let protocol_parameters_subscribe_topic = config .get_string(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.0) @@ -181,7 +181,7 @@ impl BlockKesValidator { // Subscribers let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; - let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let block_subscription = context.subscribe(&block_subscribe_topic).await?; let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?; @@ -198,7 +198,7 @@ impl BlockKesValidator { history, kes_validation_publisher, bootstrapped_subscription, - blocks_subscription, + block_subscription, protocol_parameters_subscription, spo_state_subscription, ) diff --git a/modules/block_vrf_validator/src/block_vrf_validator.rs b/modules/block_vrf_validator/src/block_vrf_validator.rs index 2f202293..8519b936 100644 --- a/modules/block_vrf_validator/src/block_vrf_validator.rs +++ b/modules/block_vrf_validator/src/block_vrf_validator.rs @@ -27,8 +27,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "bootstrapped-subscribe-topic", "cardano.sequence.bootstrapped", ); -const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-subscribe-topic", "cardano.block.proposed"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( "protocol-parameters-subscribe-topic", "cardano.protocol.parameters", @@ -55,7 +55,7 @@ impl BlockVrfValidator { history: Arc>>, mut vrf_validation_publisher: VrfValidationPublisher, mut bootstrapped_subscription: Box>, - mut blocks_subscription: Box>, + mut block_subscription: Box>, mut protocol_parameters_subscription: Box>, mut epoch_activity_subscription: Box>, mut spo_state_subscription: Box>, @@ -77,7 +77,7 @@ impl BlockVrfValidator { let mut state = history.lock().await.get_or_init_with(State::new); let mut current_block: Option = None; - let (_, message) = blocks_subscription.read().await?; + let (_, message) = block_subscription.read().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -190,10 +190,10 @@ impl BlockVrfValidator { .unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'"); - let blocks_subscribe_topic = config - .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating blocks subscription on '{blocks_subscribe_topic}'"); + let block_subscribe_topic = config + .get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating block subscription on '{block_subscribe_topic}'"); let epoch_activity_subscribe_topic = config .get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0) @@ -218,7 +218,7 @@ impl BlockVrfValidator { let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; - let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let block_subscription = context.subscribe(&block_subscribe_topic).await?; let epoch_activity_subscription = context.subscribe(&epoch_activity_subscribe_topic).await?; let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?; @@ -236,7 +236,7 @@ impl BlockVrfValidator { history, vrf_validation_publisher, bootstrapped_subscription, - blocks_subscription, + block_subscription, protocol_parameters_subscription, epoch_activity_subscription, spo_state_subscription, diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index 9dfe7518..e7a38a7f 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -345,6 +345,26 @@ impl ChainStore { block_hashes, })) } + BlocksStateQuery::GetBlockHashesByNumberRange { + min_number, + max_number, + } => { + if *max_number < *min_number { + return Ok(BlocksStateQueryResponse::Error( + QueryError::invalid_request("Invalid number range"), + )); + } + let mut block_hashes = Vec::new(); + let blocks = store.get_blocks_by_number_range(*min_number, *max_number)?; + for block in blocks { + if let Ok(hash) = Self::get_block_hash(&block) { + block_hashes.push(hash); + } + } + Ok(BlocksStateQueryResponse::BlockHashesByNumberRange( + block_hashes, + )) + } BlocksStateQuery::GetTransactionHashes { tx_ids } => { let mut block_ids: HashMap<_, Vec<_>> = HashMap::new(); for tx_id in tx_ids { diff --git a/modules/chain_store/src/stores/fjall.rs b/modules/chain_store/src/stores/fjall.rs index 9cd70bcd..3c6adecb 100644 --- a/modules/chain_store/src/stores/fjall.rs +++ b/modules/chain_store/src/stores/fjall.rs @@ -193,6 +193,13 @@ impl FjallBlockStore { } fn get_by_number_range(&self, min_number: u64, max_number: u64) -> Result> { + if max_number < min_number { + return Err(anyhow::anyhow!( + "Invalid number range min={min_number}, max={max_number}" + )); + } + let expected_count = max_number - min_number + 1; + let min_number_bytes = min_number.to_be_bytes(); let max_number_bytes = max_number.to_be_bytes(); let mut blocks = vec![]; @@ -202,6 +209,12 @@ impl FjallBlockStore { blocks.push(block); } } + if blocks.len() as u64 != expected_count { + return Err(anyhow::anyhow!( + "Expected {expected_count} blocks, got {}", + blocks.len() + )); + } Ok(blocks) } diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index e0f3ce4e..23267fb2 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -32,8 +32,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "bootstrapped-subscribe-topic", "cardano.sequence.bootstrapped", ); -const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-subscribe-topic", "cardano.block.proposed"); const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) = ("block-txs-subscribe-topic", "cardano.block.txs"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -58,7 +58,7 @@ impl EpochsState { history: Arc>>, epochs_history: EpochsHistoryState, mut bootstrapped_subscription: Box>, - mut blocks_subscription: Box>, + mut block_subscription: Box>, mut block_txs_subscription: Box>, mut protocol_parameters_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, @@ -80,11 +80,11 @@ impl EpochsState { let mut current_block: Option = None; // Read both topics in parallel - let blocks_message_f = blocks_subscription.read(); + let block_message_f = block_subscription.read(); let block_txs_message_f = block_txs_subscription.read(); // Handle blocks first - let (_, message) = blocks_message_f.await?; + let (_, message) = block_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -188,10 +188,10 @@ impl EpochsState { .unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'"); - let blocks_subscribe_topic = config - .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'"); + let block_subscribe_topic = config + .get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for blocks on '{block_subscribe_topic}'"); let block_txs_subscribe_topic = config .get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0) @@ -231,7 +231,7 @@ impl EpochsState { // Subscribe let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; - let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let block_subscription = context.subscribe(&block_subscribe_topic).await?; let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?; @@ -342,7 +342,7 @@ impl EpochsState { history, epochs_history, bootstrapped_subscription, - blocks_subscription, + block_subscription, block_txs_subscription, protocol_parameters_subscription, epoch_activity_publisher, diff --git a/modules/epochs_state/src/state.rs b/modules/epochs_state/src/state.rs index 802b4179..3f379e0e 100644 --- a/modules/epochs_state/src/state.rs +++ b/modules/epochs_state/src/state.rs @@ -69,9 +69,13 @@ impl State { epoch: 0, epoch_start_time: genesis.byron_timestamp, first_block_time: genesis.byron_timestamp, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, + // NOTE: + // First block height is 1 + // only because we don't handle EBB for now + // so by default, we counter epoch 0's EBB + first_block_height: 1, + last_block_time: genesis.byron_timestamp, + last_block_height: 1, blocks_minted: HashMap::new(), epoch_blocks: 0, epoch_txs: 0, @@ -342,6 +346,22 @@ mod tests { ); } + #[test] + fn handle_mint_without_issuer() { + let mut state = State::new(&GenesisValues::mainnet()); + let mut block = make_block(100); + state.handle_mint(&block, None); + block.number += 1; + state.handle_mint(&block, None); + + assert_eq!(state.epoch_blocks, 2); + assert_eq!(state.blocks_minted.len(), 0); + assert_eq!( + state.blocks_minted.get(&keyhash_224(b"issuer").into()), + None + ); + } + #[test] fn handle_mint_multiple_issuer_records_counts() { let mut state = State::new(&GenesisValues::mainnet()); diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 2f24647e..8e623365 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -30,9 +30,13 @@ use tracing::{debug, error, info, info_span, Instrument}; mod pause; use pause::PauseType; -const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped"; -const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available"; -const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete"; +const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( + "bootstrapped-subscribe-topic", + "cardano.sequence.bootstrapped", +); +const DEFAULT_BLOCK_PUBLISH_TOPIC: (&str, &str) = + ("block-publish-topic", "cardano.block.available"); +const DEFAULT_COMPLETION_TOPIC: (&str, &str) = ("completion-topic", "cardano.snapshot.complete"); const DEFAULT_AGGREGATOR_URL: &str = "https://aggregator.release-mainnet.api.mithril.network/aggregator"; @@ -237,12 +241,14 @@ impl MithrilSnapshotFetcher { config: Arc, genesis: GenesisValues, ) -> Result<()> { - let block_topic = - config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string()); - info!("Publishing blocks on '{block_topic}'"); - - let completion_topic = - config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + let block_publish_topic = config + .get_string(DEFAULT_BLOCK_PUBLISH_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_PUBLISH_TOPIC.1.to_string()); + info!("Publishing blocks on '{block_publish_topic}'"); + + let completion_topic = config + .get_string(DEFAULT_COMPLETION_TOPIC.0) + .unwrap_or(DEFAULT_COMPLETION_TOPIC.1.to_string()); info!("Publishing completion on '{completion_topic}'"); let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string()); @@ -356,7 +362,7 @@ impl MithrilSnapshotFetcher { context .message_bus - .publish(&block_topic, Arc::new(message_enum)) + .publish(&block_publish_topic, Arc::new(message_enum)) .await .unwrap_or_else(|e| error!("Failed to publish block message: {e}")); @@ -389,21 +395,23 @@ impl MithrilSnapshotFetcher { /// Main init function pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - let startup_topic = - config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string()); - info!("Creating startup subscriber on '{startup_topic}'"); + let bootstrapped_subscribe_topic = config + .get_string(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'"); - let mut subscription = context.subscribe(&startup_topic).await?; + let mut bootstrapped_subscription = + context.subscribe(&bootstrapped_subscribe_topic).await?; context.clone().run(async move { - let Ok((_, startup_message)) = subscription.read().await else { + let Ok((_, bootstrapped_message)) = bootstrapped_subscription.read().await else { return; }; - info!("Received startup message"); - let genesis = match startup_message.as_ref() { + info!("Received bootstrapped message"); + let genesis = match bootstrapped_message.as_ref() { Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => { complete.values.clone() } - x => panic!("unexpected startup message: {x:?}"), + x => panic!("unexpected bootstrapped message: {x:?}"), }; let mut delay = 1; diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index b62b6791..57e229c9 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -4,7 +4,10 @@ use crate::{ EpochActivityRest, ProtocolParamsRest, SPDDByEpochAndPoolItemRest, SPDDByEpochItemRest, }, }; -use acropolis_common::queries::errors::QueryError; +use acropolis_common::queries::{ + blocks::{BlocksStateQuery, BlocksStateQueryResponse}, + errors::QueryError, +}; use acropolis_common::rest_error::RESTError; use acropolis_common::serialization::Bech32Conversion; use acropolis_common::{ @@ -501,11 +504,102 @@ pub async fn handle_epoch_pool_stakes_blockfrost( } pub async fn handle_epoch_total_blocks_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Epoch total blocks endpoint")) + if params.len() != 1 { + return Err(RESTError::BadRequest( + "Expected one parameter: an epoch number".to_string(), + )); + } + let param = ¶ms[0]; + + let epoch_number = param + .parse::() + .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + if epoch_number > latest_epoch.epoch { + return Err(RESTError::not_found("Epoch not found")); + } + + let (first_block_height, last_block_height) = if epoch_number == latest_epoch.epoch { + ( + latest_epoch.first_block_height, + latest_epoch.last_block_height, + ) + } else { + // Query from historical epochs state + let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetEpochInfo { epoch_number }, + ))); + let epoch_info = query_state( + &context, + &handlers_config.epochs_query_topic, + epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::EpochInfo(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving epoch info", + )), + }, + ) + .await?; + (epoch_info.first_block_height, epoch_info.last_block_height) + }; + + // Query all blocks hashes from chain_store + // using first_block_height and last_block_height + let block_hashes_msg = Arc::new(Message::StateQuery(StateQuery::Blocks( + BlocksStateQuery::GetBlockHashesByNumberRange { + min_number: first_block_height, + max_number: last_block_height, + }, + ))); + let block_hashes = query_state( + &context, + &handlers_config.blocks_query_topic, + block_hashes_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::BlockHashesByNumberRange(block_hashes), + )) => Ok(block_hashes), + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error("Unexpected message type")), + }, + ) + .await?; + + let json = serde_json::to_string_pretty(&block_hashes)?; + Ok(RESTResponse::with_json(200, &json)) } pub async fn handle_epoch_pool_blocks_blockfrost( diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 4c7f1ec8..7bc56188 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -47,8 +47,8 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = ("withdrawals-subscribe-topic", "cardano.withdrawals"); const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); -const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_BLOCK_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-subscribe-topic", "cardano.block.proposed"); const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = @@ -87,7 +87,7 @@ impl SPOState { store_config: &StoreConfig, // subscribers mut certificates_subscription: Box>, - mut blocks_subscription: Box>, + mut block_subscription: Box>, mut withdrawals_subscription: Option>>, mut governance_subscription: Option>>, mut epoch_activity_subscription: Box>, @@ -113,7 +113,7 @@ impl SPOState { // read per-block topics in parallel let certs_message_f = certificates_subscription.read(); - let blocks_message_f = blocks_subscription.read(); + let block_message_f = block_subscription.read(); let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); @@ -140,7 +140,7 @@ impl SPOState { // handle blocks (handle_mint) before handle_tx_certs // in case of epoch boundary - let (_, block_message) = blocks_message_f.await?; + let (_, block_message) = block_message_f.await?; match block_message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { let span = @@ -438,10 +438,10 @@ impl SPOState { .unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'"); - let blocks_subscribe_topic = config - .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating blocks subscriber on '{blocks_subscribe_topic}'"); + let block_subscribe_topic = config + .get_string(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating block subscriber on '{block_subscribe_topic}'"); let stake_reward_deltas_subscribe_topic = config .get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0) @@ -765,7 +765,7 @@ impl SPOState { // Subscriptions let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?; - let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let block_subscription = context.subscribe(&block_subscribe_topic).await?; let epoch_activity_subscription = context.subscribe(&epoch_activity_subscribe_topic).await?; let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; @@ -811,7 +811,7 @@ impl SPOState { retired_pools_history, &store_config, certificates_subscription, - blocks_subscription, + block_subscription, withdrawals_subscription, governance_subscription, epoch_activity_subscription,