Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions modules/parameters_state/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ fn main() {
download(main, "alonzo", "mainnet-alonzo-genesis.json", &vec![]);
download(main, "conway", "mainnet-conway-genesis.json", &vec![]);

let preview = "https://book.world.dev.cardano.org/environments/preview";
download(preview, "byron", "preview-byron-genesis.json", &vec![]);
download(
preview,
"shelley",
"preview-shelley-genesis.json",
&shelley_fix,
);
download(preview, "alonzo", "preview-alonzo-genesis.json", &vec![]);
download(preview, "conway", "preview-conway-genesis.json", &vec![]);

let sancho =
"https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis";
download(sancho, "byron", "sanchonet-byron-genesis.json", &vec![]);
Expand Down
22 changes: 21 additions & 1 deletion modules/parameters_state/src/genesis_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use pallas::ledger::configs::*;
use serde::Deserialize;
use std::collections::HashMap;

const PREDEFINED_GENESIS: [(&str, Era, &[u8]); 8] = [
const PREDEFINED_GENESIS: [(&str, Era, &[u8]); 12] = [
(
"sanchonet",
Era::Byron,
Expand All @@ -33,6 +33,26 @@ const PREDEFINED_GENESIS: [(&str, Era, &[u8]); 8] = [
Era::Conway,
include_bytes!("../downloads/sanchonet-conway-genesis.json"),
),
(
"preview",
Era::Byron,
include_bytes!("../downloads/preview-byron-genesis.json"),
),
(
"preview",
Era::Shelley,
include_bytes!("../downloads/preview-shelley-genesis.json"),
),
(
"preview",
Era::Alonzo,
include_bytes!("../downloads/preview-alonzo-genesis.json"),
),
(
"preview",
Era::Conway,
include_bytes!("../downloads/preview-conway-genesis.json"),
),
(
"mainnet",
Era::Byron,
Expand Down
61 changes: 45 additions & 16 deletions modules/peer_network_interface/src/peer_network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,27 @@ impl PeerNetworkInterface {
SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?),
_ => None,
};
let (events_sender, events) = mpsc::channel(1024);

context.clone().run(async move {
let genesis_values = if let Some(mut sub) = genesis_complete {
Self::wait_genesis_completion(&mut sub)
.await
.expect("could not fetch genesis values")
} else {
cfg.genesis_values.expect("genesis values not found")
cfg.genesis_values.clone().expect("genesis values not found")
};

let mut upstream_cache = None;
let mut last_epoch = None;
let mut cache_sync_point = Point::Origin;
if cfg.sync_point == SyncPoint::Cache {
match Self::init_cache(&cfg.cache_dir, &cfg.block_topic, &context).await {
Ok((cache, sync_point)) => {
upstream_cache = Some(cache);
if let Point::Specific(slot, _) = sync_point {
let (epoch, _) = genesis_values.slot_to_epoch(slot);
last_epoch = Some(epoch);
}
cache_sync_point = sync_point;
}
Err(e) => {
Expand All @@ -68,39 +72,53 @@ impl PeerNetworkInterface {
}
}

let sink = BlockSink {
let mut sink = BlockSink {
context,
topic: cfg.block_topic,
topic: cfg.block_topic.clone(),
genesis_values,
upstream_cache,
last_epoch,
};

let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink);
for address in cfg.node_addresses {
manager.handle_new_connection(address, Duration::ZERO);
}

match cfg.sync_point {
SyncPoint::Origin => manager.sync_to_point(Point::Origin),
let manager = match cfg.sync_point {
SyncPoint::Origin => {
let mut manager = Self::init_manager(cfg, sink);
manager.sync_to_point(Point::Origin);
manager
}
SyncPoint::Tip => {
let mut manager = Self::init_manager(cfg, sink);
if let Err(error) = manager.sync_to_tip().await {
warn!("could not sync to tip: {error:#}");
return;
}
manager
}
SyncPoint::Cache => {
let mut manager = Self::init_manager(cfg, sink);
manager.sync_to_point(cache_sync_point);
manager
}
SyncPoint::Cache => manager.sync_to_point(cache_sync_point),
SyncPoint::Snapshot => {
let mut subscription =
snapshot_complete.expect("Snapshot topic subscription missing");
match Self::wait_snapshot_completion(&mut subscription).await {
Ok(point) => manager.sync_to_point(point),
Ok(point) => {
if let Point::Specific(slot, _) = point {
let (epoch, _) = sink.genesis_values.slot_to_epoch(slot);
sink.last_epoch = Some(epoch);
}
let mut manager = Self::init_manager(cfg, sink);
manager.sync_to_point(point);
manager
}
Err(error) => {
warn!("snapshot restoration never completed: {error:#}");
return;
}
}
}
}
};

if let Err(err) = manager.run().await {
error!("chain sync failed: {err:#}");
Expand All @@ -110,6 +128,15 @@ impl PeerNetworkInterface {
Ok(())
}

fn init_manager(cfg: InterfaceConfig, sink: BlockSink) -> NetworkManager {
let (events_sender, events) = mpsc::channel(1024);
let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink);
for address in cfg.node_addresses {
manager.handle_new_connection(address, Duration::ZERO);
}
manager
}

async fn init_cache(
cache_dir: &Path,
block_topic: &str,
Expand Down Expand Up @@ -163,6 +190,7 @@ struct BlockSink {
topic: String,
genesis_values: GenesisValues,
upstream_cache: Option<UpstreamCache>,
last_epoch: Option<u64>,
}
impl BlockSink {
pub async fn announce(
Expand Down Expand Up @@ -190,10 +218,11 @@ impl BlockSink {
self.context.publish(&self.topic, message).await
}

fn make_block_info(&self, header: &Header, rolled_back: bool) -> BlockInfo {
fn make_block_info(&mut self, header: &Header, rolled_back: bool) -> BlockInfo {
let slot = header.slot;
let (epoch, epoch_slot) = self.genesis_values.slot_to_epoch(slot);
let new_epoch = slot == self.genesis_values.epoch_to_first_slot(epoch);
let new_epoch = self.last_epoch != Some(epoch);
self.last_epoch = Some(epoch);
let timestamp = self.genesis_values.slot_to_timestamp(slot);
BlockInfo {
status: if rolled_back {
Expand Down
36 changes: 17 additions & 19 deletions modules/utxo_state/src/volatile_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::error;

pub struct VolatileIndex {
/// First block number represented in the index VecDeque
first_block: u64,
first_block: Option<u64>,

/// List of UTXOs for each block number
blocks: VecDeque<Vec<UTxOIdentifier>>,
Expand All @@ -16,7 +16,7 @@ impl VolatileIndex {
/// Create a new empty index
pub fn new() -> Self {
Self {
first_block: 0,
first_block: None,
blocks: VecDeque::new(),
}
}
Expand All @@ -29,11 +29,9 @@ impl VolatileIndex {
/// Add a new block entry
pub fn add_block(&mut self, number: u64) {
// Capture the first volatile block we get
if self.first_block == 0 {
self.first_block = number;
}
let first_block = *self.first_block.get_or_insert(number);

if number == self.first_block + self.blocks.len() as u64 {
if number == first_block + self.blocks.len() as u64 {
// Add empty UTXO set
self.blocks.push_back(Vec::new());
} else {
Expand All @@ -54,7 +52,7 @@ impl VolatileIndex {
let mut utxos = Vec::<UTxOIdentifier>::new();

// Remove blocks before boundary, calling back for all UTXOs in them
while self.first_block < boundary {
while let Some(first_block) = self.first_block.as_mut().filter(|b| **b < boundary) {
if let Some(block) = self.blocks.pop_front() {
for utxo in block {
utxos.push(utxo);
Expand All @@ -63,7 +61,7 @@ impl VolatileIndex {
break;
}

self.first_block += 1;
*first_block += 1;
}

utxos
Expand All @@ -74,10 +72,10 @@ impl VolatileIndex {
pub fn prune_on_or_after(&mut self, boundary: u64) -> Vec<UTxOIdentifier> {
let mut utxos = Vec::<UTxOIdentifier>::new();

if self.first_block == 0 {
let Some(first_block) = self.first_block else {
return utxos;
}
let mut last_block = self.first_block + self.blocks.len() as u64 - 1;
};
let mut last_block = first_block + self.blocks.len() as u64 - 1;

// Remove blocks before boundary, calling back for all UTXOs in them
while last_block >= boundary {
Expand All @@ -104,20 +102,20 @@ mod tests {
#[test]
fn new_index_is_empty() {
let index = VolatileIndex::new();
assert_eq!(0, index.first_block);
assert_eq!(None, index.first_block);
assert_eq!(0, index.blocks.len());
}

#[test]
fn add_block_sequential_captures_number_and_adds_block() {
let mut index = VolatileIndex::new();
index.add_block(42);
assert_eq!(42, index.first_block);
assert_eq!(Some(42), index.first_block);
assert_eq!(1, index.blocks.len());
assert!(index.blocks[0].is_empty());

index.add_block(43);
assert_eq!(42, index.first_block);
assert_eq!(Some(42), index.first_block);
assert_eq!(2, index.blocks.len());
assert!(index.blocks[1].is_empty());
}
Expand All @@ -126,11 +124,11 @@ mod tests {
fn add_block_non_sequential_ignores_it() {
let mut index = VolatileIndex::new();
index.add_block(42);
assert_eq!(42, index.first_block);
assert_eq!(Some(42), index.first_block);
assert_eq!(1, index.blocks.len());

index.add_block(99);
assert_eq!(42, index.first_block);
assert_eq!(Some(42), index.first_block);
assert_eq!(1, index.blocks.len());
}

Expand All @@ -139,7 +137,7 @@ mod tests {
let mut index = VolatileIndex::new();
index.add_block(1);
index.add_block(2);
assert_eq!(1, index.first_block);
assert_eq!(Some(1), index.first_block);
assert_eq!(2, index.blocks.len());

let utxo = UTxOIdentifier::new(42, 42, 42);
Expand All @@ -160,7 +158,7 @@ mod tests {
index.add_utxo(&UTxOIdentifier::new(3, 3, 3));

let pruned = index.prune_before(2);
assert_eq!(2, index.first_block);
assert_eq!(Some(2), index.first_block);
assert_eq!(1, index.blocks.len());
assert_eq!(2, pruned.len());
assert_eq!(1, pruned[0].output_index());
Expand All @@ -176,7 +174,7 @@ mod tests {
index.add_block(2);
index.add_utxo(&UTxOIdentifier::new(3, 3, 3));
let pruned = index.prune_on_or_after(1);
assert_eq!(1, index.first_block);
assert_eq!(Some(1), index.first_block);
assert_eq!(0, index.blocks.len());
assert_eq!(3, pruned.len());

Expand Down
2 changes: 2 additions & 0 deletions processes/omnibus/omnibus-local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ publish-certificates-topic = "cardano.certificates"
publish-governance-topic = "cardano.governance"
publish-block-txs-topic = "cardano.block.txs"

[module.consensus]

[module.utxo-state]
store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-async", "fake"
address-delta-topic = "cardano.address.delta"
Expand Down
2 changes: 2 additions & 0 deletions processes/omnibus/omnibus-sancho.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ publish-certificates-topic = "cardano.certificates"
publish-governance-topic = "cardano.governance"
publish-block-txs-topic = "cardano.block.txs"

[module.consensus]

[module.utxo-state]
store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-async", "fake"
address-delta-topic = "cardano.address.delta"
Expand Down