Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
target
README.html
restore.sh
startup.sh
configuration/
docker-compose.yaml

# Nix
result
Expand Down
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ snap-test-streaming: $(SNAPSHOT)
@echo "=================================="
@echo "Snapshot: $(SNAPSHOT)"
@echo "Size: $$(du -h $(SNAPSHOT) | cut -f1)"
@echo "Log Level: $(LOG_LEVEL)"
@echo ""
@test -f "$(SNAPSHOT)" || (echo "Error: Snapshot file not found: $(SNAPSHOT)"; exit 1)
@echo "This will parse the entire snapshot and collect all data with callbacks..."
@echo "Expected time: ~1-3 minutes for 2.4GB snapshot with 11M UTXOs"
@echo ""
@$(CARGO) run --release --example test_streaming_parser -- "$(SNAPSHOT)"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add rust logging (default is "info")

RUST_LOG=$(LOG_LEVEL) $(CARGO) run --release --example test_streaming_parser -- "$(SNAPSHOT)"

# Pattern rule: generate .json manifest from .cbor snapshot
# Usage: make tests/fixtures/my-snapshot.json
Expand Down
3 changes: 2 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bs58 = "0.5"
chrono = { workspace = true }
crc = "3"
hex = { workspace = true }
log = "0.4"
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log dependency is added but appears to be unused in the codebase. The code uses tracing for logging instead (as seen in streaming_snapshot.rs and other modules).

Consider removing this unused dependency to keep the dependency list clean and avoid confusion.

Suggested change
log = "0.4"

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I couldn't get it working with tracing. Needed log.

memmap2 = "0.9"
num-rational = { version = "0.4.2", features = ["serde"] }
regex = "1"
Expand All @@ -45,7 +46,7 @@ sha2 = "0.10.8"
tempfile = "3.23"
config = { workspace = true }
caryatid_process = { workspace = true }

env_logger = "0.10"

[lib]
crate-type = ["rlib"]
Expand Down
129 changes: 100 additions & 29 deletions common/examples/test_streaming_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
//
// Usage: cargo run --example test_streaming_parser --release -- <snapshot_path>

use acropolis_common::snapshot::streaming_snapshot::{
use acropolis_common::snapshot::{
AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo,
ProposalCallback, SnapshotCallbacks, SnapshotMetadata, StakeCallback, StreamingSnapshotParser,
UtxoCallback, UtxoEntry,
ProposalCallback, RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata,
SnapshotsCallback, StakeCallback, StreamingSnapshotParser, UtxoCallback, UtxoEntry,
};
use anyhow::Result;
use std::env;
use std::time::Instant;

use env_logger::Env;

// Simple counter callback that doesn't store data in memory
#[derive(Default)]
struct CountingCallbacks {
Expand Down Expand Up @@ -55,7 +57,7 @@ impl UtxoCallback for CountingCallbacks {
impl PoolCallback for CountingCallbacks {
fn on_pools(&mut self, pools: Vec<PoolInfo>) -> Result<()> {
self.pool_count = pools.len();
eprintln!("Parsed {} stake pools", pools.len());
eprintln!("Parsed {} stake pools", pools.len());

// Show first 10 pools
for (i, pool) in pools.iter().take(10).enumerate() {
Expand All @@ -79,7 +81,7 @@ impl StakeCallback for CountingCallbacks {
fn on_accounts(&mut self, accounts: Vec<AccountState>) -> Result<()> {
self.account_count = accounts.len();
if !accounts.is_empty() {
eprintln!("Parsed {} stake accounts", accounts.len());
eprintln!("Parsed {} stake accounts", accounts.len());

// Show first 10 accounts
for (i, account) in accounts.iter().take(10).enumerate() {
Expand All @@ -104,7 +106,7 @@ impl StakeCallback for CountingCallbacks {
impl DRepCallback for CountingCallbacks {
fn on_dreps(&mut self, dreps: Vec<DRepInfo>) -> Result<()> {
self.drep_count = dreps.len();
eprintln!("Parsed {} DReps", self.drep_count);
eprintln!("Parsed {} DReps", self.drep_count);

// Show first 10 DReps
for (i, drep) in dreps.iter().take(10).enumerate() {
Expand Down Expand Up @@ -136,7 +138,7 @@ impl ProposalCallback for CountingCallbacks {
fn on_proposals(&mut self, proposals: Vec<GovernanceProposal>) -> Result<()> {
self.proposal_count = proposals.len();
if !proposals.is_empty() {
eprintln!("Parsed {} governance proposals", proposals.len());
eprintln!("Parsed {} governance proposals", proposals.len());

// Show first 10 proposals
for (i, proposal) in proposals.iter().take(10).enumerate() {
Expand All @@ -159,22 +161,22 @@ impl ProposalCallback for CountingCallbacks {

impl SnapshotCallbacks for CountingCallbacks {
fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> {
eprintln!("📊 Snapshot Metadata:");
eprintln!(" Epoch: {}", metadata.epoch);
eprintln!("Snapshot Metadata:");
eprintln!(" Epoch: {}", metadata.epoch);
eprintln!(
" Treasury: {} ADA",
" Treasury: {} ADA",
metadata.pot_balances.treasury as f64 / 1_000_000.0
);
eprintln!(
" Reserves: {} ADA",
" Reserves: {} ADA",
metadata.pot_balances.reserves as f64 / 1_000_000.0
);
eprintln!(
" Deposits: {} ADA",
" Deposits: {} ADA",
metadata.pot_balances.deposits as f64 / 1_000_000.0
);
if let Some(count) = metadata.utxo_count {
eprintln!(" UTXO count: {}", count);
eprintln!(" UTXO count: {count}");
}
// Calculate total blocks produced
let total_blocks_previous: u32 =
Expand All @@ -183,19 +185,43 @@ impl SnapshotCallbacks for CountingCallbacks {
metadata.blocks_current_epoch.iter().map(|p| p.block_count as u32).sum();

eprintln!(
" Block production previous epoch: {} pools produced {} blocks total",
" Block production previous epoch: {} pools produced {} blocks total",
metadata.blocks_previous_epoch.len(),
total_blocks_previous
);
eprintln!(
" Block production current epoch: {} pools produced {} blocks total",
" Block production current epoch: {} pools produced {} blocks total",
metadata.blocks_current_epoch.len(),
total_blocks_current
);

// Show snapshots info if available
if let Some(snapshots_info) = &metadata.snapshots {
eprintln!(" Snapshots Info:");
eprintln!(
" Mark snapshot: {} sections",
snapshots_info.mark.sections_count
);
eprintln!(
" Set snapshot: {} sections",
snapshots_info.set.sections_count
);
eprintln!(
" Go snapshot: {} sections",
snapshots_info.go.sections_count
);
eprintln!(
" Fee value: {} lovelace ({} ADA)",
snapshots_info.fee,
snapshots_info.fee as f64 / 1_000_000.0
);
} else {
eprintln!(" No snapshots data available");
}

// Show top block producers if any
if !metadata.blocks_previous_epoch.is_empty() {
eprintln!(" 📦 Previous epoch top producers (first 3):");
eprintln!(" Previous epoch top producers (first 3):");
let mut sorted_previous = metadata.blocks_previous_epoch.clone();
sorted_previous.sort_by(|a, b| b.block_count.cmp(&a.block_count));
for (i, production) in sorted_previous.iter().take(3).enumerate() {
Expand All @@ -216,7 +242,7 @@ impl SnapshotCallbacks for CountingCallbacks {
}

if !metadata.blocks_current_epoch.is_empty() {
eprintln!(" 📦 Current epoch top producers (first 3):");
eprintln!(" Current epoch top producers (first 3):");
let mut sorted_current = metadata.blocks_current_epoch.clone();
sorted_current.sort_by(|a, b| b.block_count.cmp(&a.block_count));
for (i, production) in sorted_current.iter().take(3).enumerate() {
Expand Down Expand Up @@ -246,6 +272,35 @@ impl SnapshotCallbacks for CountingCallbacks {
}
}

impl SnapshotsCallback for CountingCallbacks {
fn on_snapshots(&mut self, snapshots: RawSnapshotsContainer) -> Result<()> {
eprintln!("Raw Snapshots Data:");

// Calculate total stakes and delegator counts from VMap data
let mark_total: i64 = snapshots.mark.0.iter().map(|(_, amount)| amount).sum();
let set_total: i64 = snapshots.set.0.iter().map(|(_, amount)| amount).sum();
let go_total: i64 = snapshots.go.0.iter().map(|(_, amount)| amount).sum();
Comment on lines +280 to +282
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential overflow when summing stakes: The code sums i64 values which could overflow if there are many large stake amounts. While Cardano's total supply is limited, summing many individual stakes without overflow checking could still be problematic.

Consider using checked arithmetic or a larger type (e.g., i128) for the accumulation to prevent potential overflow issues.

Suggested change
let mark_total: i64 = snapshots.mark.0.iter().map(|(_, amount)| amount).sum();
let set_total: i64 = snapshots.set.0.iter().map(|(_, amount)| amount).sum();
let go_total: i64 = snapshots.go.0.iter().map(|(_, amount)| amount).sum();
let mark_total: i128 = snapshots.mark.0.iter().map(|(_, amount)| *amount as i128).sum();
let set_total: i128 = snapshots.set.0.iter().map(|(_, amount)| *amount as i128).sum();
let go_total: i128 = snapshots.go.0.iter().map(|(_, amount)| *amount as i128).sum();

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems a bit overkill to me.


eprintln!(
" Mark snapshot: {} delegators, {} total stake (ADA)",
snapshots.mark.0.len(),
mark_total as f64 / 1_000_000.0
);
eprintln!(
" Set snapshot: {} delegators, {} total stake (ADA)",
snapshots.set.0.len(),
set_total as f64 / 1_000_000.0
);
eprintln!(
" Go snapshot: {} delegators, {} total stake (ADA)",
snapshots.go.0.len(),
go_total as f64 / 1_000_000.0
);
eprintln!(" Fee: {} ADA", snapshots.fee as f64 / 1_000_000.0);
Ok(())
}
}

fn main() {
// Get snapshot path from command line
let args: Vec<String> = env::args().collect();
Expand All @@ -255,11 +310,14 @@ fn main() {
std::process::exit(1);
}

// Initialize env_logger to read RUST_LOG environment variable
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let snapshot_path = &args[1];
println!("🚀 Streaming Snapshot Parser Test with Block Parsing");
println!("Streaming Snapshot Parser Test with Block Parsing");
println!("====================================================");
println!("Snapshot: {}", snapshot_path);
println!("Features: UTXOs, Pools, Accounts, DReps, Proposals, and 📦 BLOCKS!");
println!("Snapshot: {snapshot_path}");
println!("Features: UTXOs, Pools, Accounts, DReps, Proposals, and BLOCKS!");
println!();

// Create parser and callbacks
Expand All @@ -273,37 +331,50 @@ fn main() {
match parser.parse(&mut callbacks) {
Ok(()) => {
let duration = start.elapsed();
println!("Parse completed successfully in {:.2?}", duration);
println!("Parse completed successfully in {duration:.2?}");
println!();

// Display results
if let Some(metadata) = &callbacks.metadata {
println!("📊 Final Metadata Summary:");
println!("Final Metadata Summary:");
println!(" Epoch: {}", metadata.epoch);
println!(" Treasury: {} lovelace", metadata.pot_balances.treasury);
println!(" Reserves: {} lovelace", metadata.pot_balances.reserves);
println!(" Deposits: {} lovelace", metadata.pot_balances.deposits);
if let Some(count) = metadata.utxo_count {
println!(" UTXO Count (metadata): {}", count);
println!(" UTXO Count (metadata): {count}");
}
let total_blocks_previous: u32 =
metadata.blocks_previous_epoch.iter().map(|p| p.block_count as u32).sum();
let total_blocks_current: u32 =
metadata.blocks_current_epoch.iter().map(|p| p.block_count as u32).sum();
println!(
" 📦 Block production previous epoch: {} pools, {} blocks total",
" Block production previous epoch: {} pools, {} blocks total",
metadata.blocks_previous_epoch.len(),
total_blocks_previous
);
println!(
" 📦 Block production current epoch: {} pools, {} blocks total",
" Block production current epoch: {} pools, {} blocks total",
metadata.blocks_current_epoch.len(),
total_blocks_current
);

// Show snapshots info summary
if let Some(snapshots_info) = &metadata.snapshots {
println!(" Snapshots Summary:");
println!(
" Mark: {} sections, Set: {} sections, Go: {} sections, Fee: {} ADA",
snapshots_info.mark.sections_count,
snapshots_info.set.sections_count,
snapshots_info.go.sections_count,
snapshots_info.fee as f64 / 1_000_000.0
);
}

println!();
}

println!("📈 Parsed Data Summary:");
println!("Parsed Data Summary:");
println!(" UTXOs: {}", callbacks.utxo_count);
println!(" Stake Pools: {}", callbacks.pool_count);
println!(" Stake Accounts: {}", callbacks.account_count);
Expand Down Expand Up @@ -395,14 +466,14 @@ fn main() {
// Performance stats
let utxos_per_sec = callbacks.utxo_count as f64 / duration.as_secs_f64();
println!("Performance:");
println!(" Total time: {:.2?}", duration);
println!(" UTXOs/second: {:.0}", utxos_per_sec);
println!(" Total time: {duration:.2?}");
println!(" UTXOs/second: {utxos_per_sec:.0}");
println!();

std::process::exit(0);
}
Err(e) => {
eprintln!("Parse failed: {:?}", e);
eprintln!("Parse failed: {e:?}");
eprintln!();
std::process::exit(1);
}
Expand Down
Loading