Skip to content

Commit

Permalink
chore: refactor event-db queries to do batch inserts and updates
Browse files Browse the repository at this point in the history
  • Loading branch information
FelipeRosa committed May 3, 2024
1 parent 424975e commit 46a8d20
Show file tree
Hide file tree
Showing 5 changed files with 750 additions and 132 deletions.
344 changes: 215 additions & 129 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
//! Logic for orchestrating followers
use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc, time::Duration};

/// Handler for follower tasks, allows for control over spawned follower threads
pub type ManageTasks = JoinHandle<()>;

use cardano_chain_follower::{
network_genesis_values, ChainUpdate, Follower, FollowerConfigBuilder, Network, Point,
};
use pallas::ledger::traverse::{wellknown::GenesisValues, MultiEraBlock};
use pallas::ledger::{
addresses::Address,
traverse::{wellknown::GenesisValues, MultiEraBlock},
};
use tokio::{task::JoinHandle, time};
use tracing::{debug, error, info};

use self::util::parse_policy_assets;
use crate::{
cardano::util::valid_era,
cardano::{
cip36_registration::{Cip36Metadata, VotingInfo},
util::valid_era,
},
event_db::{
cardano::{chain_state::MachineId, config::FollowerConfig},
cardano::{
chain_state::{IndexedFollowerDataParams, MachineId},
cip36_registration::IndexedVoterRegistrationParams,
config::FollowerConfig,
utxo::{IndexedTxnInputParams, IndexedTxnOutputParams, IndexedTxnParams},
},
error::NotFoundError,
EventDB,
},
Expand Down Expand Up @@ -130,159 +142,233 @@ async fn process_blocks(
follower: &mut Follower, db: Arc<EventDB>, network: Network, machine_id: MachineId,
genesis_values: &GenesisValues,
) {
loop {
let chain_update = match follower.next().await {
Ok(chain_update) => chain_update,
Err(err) => {
error!(
"Unable to receive next update from the {network:?} follower err: {err} - skip..",
);
continue;
},
};
info!("Follower started processing blocks");

let mut blocks_buffer = Vec::new();

let mut ticker = tokio::time::interval(Duration::from_secs(1));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

match chain_update {
ChainUpdate::Block(data) => {
let block = match data.decode() {
Ok(block) => block,
Err(err) => {
error!("Unable to decode {network:?} block {err} - skip..",);
continue;
loop {
tokio::select! {
result = follower.next() => match result {
Ok(chain_update) => match chain_update {
ChainUpdate::Block(data) => {
blocks_buffer.push(data);

// We have enough blocks to index
if blocks_buffer.len() >= 1024 {
let current_buffer = std::mem::take(&mut blocks_buffer);
index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await;

// Since we just wrote stuff to the database,
// reset the ticker so the next write is at least 1 interval from now.
ticker.reset();
}
},
};
let start_index_block = time::Instant::now();
index_block(db.clone(), genesis_values, network, &machine_id, &block).await;
debug!(
"{network:?} block {} indexing time: {}ns. txs amount: {}",
block.hash().to_string(),
start_index_block.elapsed().as_nanos(),
block.txs().len()
);
},
ChainUpdate::Rollback(data) => {
let block = match data.decode() {
Ok(block) => block,
Err(err) => {
error!("Unable to decode {network:?} block {err} - skip..");
continue;
ChainUpdate::Rollback(data) => {
let block = match data.decode() {
Ok(block) => block,
Err(err) => {
error!("Unable to decode {network:?} block {err} - skip..");
continue;
},
};

info!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
};
},
Err(err) => {
error!(
"Unable to receive next update from the {network:?} follower err: {err} - skip..",
);
continue;
},
},
_ = ticker.tick() => {
/// This executes when we have not indexed blocks for more than the configured
/// tick interval. This means that if any errors occur in that time we lose the buffered block data (e.g.
/// cat-gateway is shutdown ungracefully). This is not a problem since cat-gateway
/// checkpoints the latest database writes so it simply restarts from the last
/// written block.
///
/// This is most likely to happen when following from the tip or receiving blocks
/// from the network (since updates will come at larger intervals).
if blocks_buffer.is_empty() {
continue;
}

info!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
let current_buffer = std::mem::take(&mut blocks_buffer);
index_block_buffer(db.clone(), genesis_values, network, &machine_id, current_buffer).await;

// Reset the ticker so it counts the interval as starting after we wrote everything
// to the database.
ticker.reset();
}
}
}
}

///
async fn index_block_buffer(
db: Arc<EventDB>, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId,
buffer: Vec<cardano_chain_follower::MultiEraBlockData>,
) {
info!("Starting data indexing");

let mut blocks = Vec::new();

for block_data in &buffer {
match block_data.decode() {
Ok(block) => blocks.push(block),
Err(e) => {
error!(error = ?e, "Failed to decode block");
},
}
}

index_many_blocks(db.clone(), genesis_values, network, machine_id, &blocks).await;
}

/// Index block data, store it in our db
async fn index_block(
///
#[allow(clippy::too_many_lines)]
async fn index_many_blocks(
db: Arc<EventDB>, genesis_values: &GenesisValues, network: Network, machine_id: &MachineId,
block: &MultiEraBlock<'_>,
blocks: &[MultiEraBlock<'_>],
) {
// Parse block
let epoch = match block.epoch(genesis_values).0.try_into() {
Ok(epoch) => epoch,
Err(err) => {
error!("Cannot parse epoch from {network:?} block {err} - skip..");
return;
},
let Some(last_block) = blocks.last() else {
return;
};

let wallclock = match block.wallclock(genesis_values).try_into() {
Ok(time) => chrono::DateTime::from_timestamp(time, 0).unwrap_or_default(),
Err(err) => {
error!("Cannot parse wall time from {network:?} block {err} - skip..");
return;
},
};
let network_str = network.to_string();

let slot = match block.slot().try_into() {
Ok(slot) => slot,
Err(err) => {
error!("Cannot parse slot from {network:?} block {err} - skip..");
return;
},
};
// Index blocks data
{
let values: Vec<_> = blocks
.iter()
.filter_map(|block| {
IndexedFollowerDataParams::from_block_data(genesis_values, &network_str, block)
})
.collect();

match db.index_many_follower_data(&values).await {
Ok(()) => {
info!(count = values.len(), "Finished indexing block data");
},
Err(e) => {
error!(error = ?e, "Failed to write DB entries");
return;
},
}
}

let start_index_follower_data = time::Instant::now();
match db
.index_follower_data(slot, network, epoch, wallclock, block.hash().to_vec())
.await
let blocks_txs: Vec<_> = blocks
.iter()
.flat_map(|b| b.txs().into_iter().map(|tx| (b.slot(), tx)))
.collect();

// Index transaction data
{
Ok(()) => (),
Err(err) => {
error!("Unable to index {network:?} follower data {err} - skip..");
return;
},
let values: Vec<_> = blocks_txs
.iter()
.map(|(slot, tx)| {
// SAFETY: This is safe to ignore because we would not reach this point if
// the try_into inside the block indexing loop had failed.
#[allow(clippy::cast_possible_wrap)]
IndexedTxnParams {
id: tx.hash().to_vec(),
slot_no: *slot as i64,
network: &network_str,
}
})
.collect();

match db.index_many_txn_data(&values).await {
Ok(()) => info!(count = values.len(), "Finished indexing transactions"),
Err(e) => {
error!(error = ?e, "Failed to index transactions");
return;
},
}
}
debug!(
"{network:?} follower data indexing time: {}ns",
start_index_follower_data.elapsed().as_nanos()
);

for tx in block.txs() {
// index tx
let start_index_txn_data = time::Instant::now();
match db.index_txn_data(tx.hash().as_slice(), slot, network).await {
Ok(()) => (),
Err(err) => {
error!("Unable to index {network:?} txn data {err} - skip..");
continue;

// Index transaction output data
{
let values: Vec<_> = blocks_txs
.iter()
.flat_map(|(_, tx)| IndexedTxnOutputParams::from_txn_data(tx))
.collect();

match db.index_many_txn_output_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing transaction outputs data"
);
},
Err(e) => {
error!(error = ?e, "Failed to index transaction outputs data");
return;
},
}
debug!(
"{network:?} tx {} data indexing time: {}ns",
tx.hash().to_string(),
start_index_txn_data.elapsed().as_nanos()
);

// index utxo
let start_index_utxo_data = time::Instant::now();
match db.index_utxo_data(&tx).await {
Ok(()) => (),
Err(err) => {
error!("Unable to index {network:?} utxo data for tx {err} - skip..");
continue;
}

// Index transaction input data
{
let values: Vec<_> = blocks_txs
.iter()
.flat_map(|(_, tx)| IndexedTxnInputParams::from_txn_data(tx))
.collect();

match db.index_many_txn_input_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing transaction inputs data"
);
},
Err(e) => {
error!(error = ?e, "Failed to index transaction inputs data");
return;
},
}
debug!(
"{network:?} tx {} utxo data indexing time: {}ns",
tx.hash().to_string(),
start_index_utxo_data.elapsed().as_nanos()
);

// Block processing for Eras before staking are ignored.
if valid_era(block.era()) {
// index catalyst registrations
let start_index_registration_data = time::Instant::now();
match db.index_registration_data(&tx, network).await {
Ok(()) => (),
Err(err) => {
error!("Unable to index {network:?} registration data for tx {err} - skip..",);
continue;
},
}
debug!(
"{network:?} tx {} registration data indexing time: {}ns",
tx.hash().to_string(),
start_index_registration_data.elapsed().as_nanos()
);
}

// Rewards
// Index voter registrations
{
let values: Vec<_> = blocks
.iter()
.filter_map(|block| IndexedVoterRegistrationParams::from_block_data(block, network))
.flatten()
.collect();

match db.index_many_voter_registration_data(&values).await {
Ok(()) => {
info!(
count = values.len(),
"Finished indexing voter registrations data"
);
},
Err(e) => {
error!(error = ?e, "Failed to index voter registrations data");
return;
},
}
}

// Refresh update metadata for future followers
// SAFETY: This is safe to ignore because we would not reach this point if
// the try_into inside the block indexing loop had failed.
#[allow(clippy::cast_possible_wrap)]
match db
.refresh_last_updated(
chrono::offset::Utc::now(),
slot,
block.hash().to_vec(),
last_block.slot() as i64,
last_block.hash().to_vec(),
network,
machine_id,
)
Expand Down

0 comments on commit 46a8d20

Please sign in to comment.