From a71fbb9beeb8327b098af7632802a034499b2f3f Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 16 Apr 2024 09:30:19 -0300 Subject: [PATCH] refactor: use Redb for ledger data (#199) --- Cargo.lock | 46 +- Cargo.toml | 5 +- src/bin/dolos/bootstrap.rs | 10 +- src/bin/dolos/common.rs | 8 +- src/bin/dolos/data.rs | 37 +- src/bin/dolos/eval.rs | 57 ++- src/bin/dolos/main.rs | 2 +- src/ledger/mod.rs | 289 ++++++++++++ src/ledger/pparams/mod.rs | 258 +++++++++++ src/{ => ledger}/pparams/test_data.rs | 60 +-- src/ledger/store.rs | 177 +++++++ src/lib.rs | 3 +- src/pparams/mod.rs | 160 ------- src/prelude.rs | 4 +- src/querydb/mod.rs | 1 - src/querydb/prelude.rs | 71 --- src/querydb/store.rs | 341 +++++++++----- src/storage/applydb/genesis.rs | 129 ------ src/storage/applydb/mod.rs | 636 -------------------------- src/storage/kvtable.rs | 461 ------------------- src/storage/mod.rs | 2 - src/sync/ledger.rs | 142 +----- src/sync/mod.rs | 22 +- 23 files changed, 1106 insertions(+), 1815 deletions(-) create mode 100644 src/ledger/mod.rs create mode 100644 src/ledger/pparams/mod.rs rename src/{ => ledger}/pparams/test_data.rs (78%) create mode 100644 src/ledger/store.rs delete mode 100644 src/pparams/mod.rs delete mode 100644 src/querydb/prelude.rs delete mode 100644 src/storage/applydb/genesis.rs delete mode 100644 src/storage/applydb/mod.rs delete mode 100644 src/storage/kvtable.rs delete mode 100644 src/storage/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 11e8902..4495d8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,6 +786,7 @@ dependencies = [ "futures-util", "gasket", "hex", + "itertools 0.12.1", "lazy_static", "log", "miette", @@ -1916,8 +1917,7 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "pallas" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edf158720dbba347c142c83e3cc91aeb667609c748d34147902baa5a90e7fc7b" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "pallas-addresses", "pallas-applying", @@ -1937,8 +1937,7 @@ dependencies = [ [[package]] name = "pallas-addresses" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9148574f9e1b25d67d11c9680bd267e927128d287d6d4de394695ed57388f28f" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "base58", "bech32 0.9.1", @@ -1953,8 +1952,7 @@ dependencies = [ [[package]] name = "pallas-applying" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b44a8296435a1d47548ac51802445481d51dc7e4ad7839b253d7fd98887ce767" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "hex", "pallas-addresses", @@ -1980,8 +1978,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357301a71c685b3c586defca1aaea59da46fb397c1b960c8487277adf0c346a1" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "hex", "minicbor", @@ -1992,16 +1989,18 @@ dependencies = [ [[package]] name = "pallas-configs" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a3e785700317713aaee3efe4b0f86fdbc66fc9e199ffe7c43f3b6fc09c24164" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "base64 0.22.0", "hex", + "num-rational", "pallas-addresses", "pallas-codec 0.25.0", "pallas-crypto 0.25.0", + "pallas-primitives", "serde", "serde_json", + "serde_with 3.7.0", ] [[package]] @@ -2021,8 +2020,7 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a1adbeb46418fd6b1ca00db9d81253f131c7555864f4e6712c7f562063c316" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "cryptoxide", "hex", @@ -2035,8 +2033,7 @@ dependencies = [ [[package]] name = "pallas-hardano" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92836a18dd8411d5a6cfeb472b9d454b00e61dbcb466a41f0cc3a666b94726ea" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "binary-layout", "pallas-network 0.25.0", @@ -2067,8 +2064,7 @@ dependencies = [ [[package]] name = "pallas-network" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8397eb2e3c89cf17aa998725fcd6f8acb4bd3fa3802a1d96618dbe7d8145c1b" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "byteorder", "hex", @@ -2085,8 +2081,7 @@ dependencies = [ [[package]] name = "pallas-primitives" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95666399b6ae09fdf93cf405e032d1c950909daf0fe8954b43c895653359a194" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "base58", "bech32 0.9.1", @@ -2101,8 +2096,7 @@ dependencies = [ [[package]] name = "pallas-rolldb" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daaf870b1a8c93429613a5f6e024874d6d12a0fc23ca1f494d344018ca5a88b1" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "async-stream", "bincode", @@ -2119,8 +2113,7 @@ dependencies = [ [[package]] name = "pallas-traverse" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfd9057640b61de89bf3176aeee20145192df644d09dd6672dcc85824901f11e" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "hex", "pallas-addresses", @@ -2135,8 +2128,7 @@ dependencies = [ [[package]] name = "pallas-txbuilder" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f652b6f7fa8de36337c371b166da9c8f313e8fb701b925194e3bdf803f79ec" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "hex", "pallas-addresses", @@ -2153,8 +2145,7 @@ dependencies = [ [[package]] name = "pallas-utxorpc" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f79b0ee058aa9e339bd6a993940cd75265cc8568f8c3d70a745adbdcc8dfb9e" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "pallas-codec 0.25.0", "pallas-primitives", @@ -2165,8 +2156,7 @@ dependencies = [ [[package]] name = "pallas-wallet" version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dedad9ff8b3f5095c92a7df228a4d607853bd899b348e811e7ed8d1b57a9c07b" +source = "git+https://github.com/txpipe/pallas.git#2d6b6fda1e1b3782e62f583388015cc601c7a564" dependencies = [ "bech32 0.9.1", "bip39", diff --git a/Cargo.toml b/Cargo.toml index 9c510f5..dfd27a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,8 @@ authors = ["Santiago Carmuega "] [dependencies] -# pallas = { git = "https://github.com/txpipe/pallas.git", features = ["unstable"] } -pallas = { version = "^0.25", features = ["unstable"] } +pallas = { git = "https://github.com/txpipe/pallas.git", features = ["unstable"] } +# pallas = { version = "^0.25", features = ["unstable"] } # pallas = { path = "../pallas/pallas", features = ["unstable"] } gasket = { version = "^0.5", features = ["derive"] } @@ -49,6 +49,7 @@ async-stream = "0.3.5" serde_with = "3.4.0" mithril-client = { version = "0.5.17", optional = true, features = ["fs"] } protoc-wkt = "1.0.0" +itertools = "0.12.1" [dev-dependencies] tempfile = "3.3.0" diff --git a/src/bin/dolos/bootstrap.rs b/src/bin/dolos/bootstrap.rs index c681917..9228085 100644 --- a/src/bin/dolos/bootstrap.rs +++ b/src/bin/dolos/bootstrap.rs @@ -165,18 +165,22 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { let (mut wal, mut chain, mut ledger) = empty_stores.unwrap(); ledger - .apply_origin(&byron_genesis) + .apply(&[dolos::ledger::compute_origin_delta(&byron_genesis)]) .into_diagnostic() .context("applying origin utxos")?; for block in iter { let block = match block { - Ok(x) => x, + Ok(x) if x.is_empty() => { + warn!("can't continue reading from immutable db"); + break; + } Err(err) => { dbg!(err); warn!("can't continue reading from immutable db"); break; } + Ok(x) => x, }; let blockd = MultiEraBlock::decode(&block) @@ -191,7 +195,7 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { .context("adding chain entry")?; ledger - .apply_block(&blockd) + .apply(&[dolos::ledger::compute_delta(&blockd)]) .into_diagnostic() .context("applyting ledger block")?; } diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index 38647d5..b9d2fcd 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -3,7 +3,7 @@ use std::path::Path; use tracing::Level; use tracing_subscriber::{filter::Targets, prelude::*}; -use dolos::{prelude::*, storage::applydb::ApplyDB}; +use dolos::{ledger::store::LedgerStore, prelude::*}; use crate::LoggingConfig; @@ -15,7 +15,7 @@ fn define_rolldb_path(config: &crate::Config) -> &Path { .unwrap_or_else(|| Path::new("/rolldb")) } -pub type Stores = (wal::Store, chain::Store, ApplyDB); +pub type Stores = (wal::Store, chain::Store, LedgerStore); pub fn open_data_stores(config: &crate::Config) -> Result { let rolldb_path = define_rolldb_path(config); @@ -29,7 +29,7 @@ pub fn open_data_stores(config: &crate::Config) -> Result { let chain = chain::Store::open(rolldb_path.join("chain")).map_err(Error::storage)?; - let ledger = ApplyDB::open(rolldb_path.join("ledger")).map_err(Error::storage)?; + let ledger = LedgerStore::open(rolldb_path.join("ledger")).map_err(Error::storage)?; Ok((wal, chain, ledger)) } @@ -40,7 +40,7 @@ pub fn destroy_data_stores(config: &crate::Config) -> Result<(), Error> { wal::Store::destroy(rolldb_path.join("wal")).map_err(Error::storage)?; chain::Store::destroy(rolldb_path.join("chain")).map_err(Error::storage)?; - ApplyDB::destroy(rolldb_path.join("ledger")).map_err(Error::storage)?; + //LedgerStore::destroy(rolldb_path.join("ledger")).map_err(Error::storage)?; Ok(()) } diff --git a/src/bin/dolos/data.rs b/src/bin/dolos/data.rs index d32ab8c..dcd82f0 100644 --- a/src/bin/dolos/data.rs +++ b/src/bin/dolos/data.rs @@ -1,8 +1,13 @@ use std::path::Path; +use dolos::ledger::{pparams::Genesis, ChainPoint, PParamsBody}; +use itertools::Itertools; use miette::IntoDiagnostic; -use pallas::{ledger::traverse::MultiEraBlock, storage::rolldb::chain}; +use pallas::{ + ledger::traverse::{Era, MultiEraBlock, MultiEraUpdate}, + storage::rolldb::chain, +}; #[allow(dead_code)] fn dump_txs(chain: &chain::Store) -> miette::Result<()> { @@ -49,13 +54,41 @@ pub fn run(config: &super::Config, _args: &Args) -> miette::Result<()> { println!("---"); - if let Some((slot, hash)) = ledger.cursor().unwrap() { + if let Some(ChainPoint(slot, hash)) = ledger.cursor().unwrap() { println!("found ledger tip"); println!("slot: {slot}, hash: {hash}"); } else { println!("chain is empty"); } + println!("---"); + + let byron = pallas::ledger::configs::byron::from_file(&config.byron.path).unwrap(); + let shelley = pallas::ledger::configs::shelley::from_file(&config.shelley.path).unwrap(); + let alonzo = pallas::ledger::configs::alonzo::from_file(&config.alonzo.path).unwrap(); + + let data: Vec<_> = ledger.get_pparams(46153027).into_diagnostic()?; + + let updates = data + .iter() + .map(|PParamsBody(era, cbor)| -> miette::Result { + let era = Era::try_from(*era).into_diagnostic()?; + MultiEraUpdate::decode_for_era(era, cbor).into_diagnostic() + }) + .try_collect()?; + + let merged = dolos::ledger::pparams::fold_pparams( + Genesis { + byron: &byron, + shelley: &shelley, + alonzo: &alonzo, + }, + updates, + 500, + ); + + dbg!(merged); + // WIP utility to dump tx data for debugging purposes. Should be implemented as // a subcommand. diff --git a/src/bin/dolos/eval.rs b/src/bin/dolos/eval.rs index 1207be4..7830a04 100644 --- a/src/bin/dolos/eval.rs +++ b/src/bin/dolos/eval.rs @@ -1,9 +1,11 @@ +use dolos::ledger::{PParamsBody, TxoRef}; +use itertools::*; use miette::{Context, IntoDiagnostic}; use pallas::{ applying::{validate, Environment as ValidationContext, UTxOs}, - ledger::traverse::{Era, MultiEraInput, MultiEraOutput}, + ledger::traverse::{Era, MultiEraInput, MultiEraOutput, MultiEraUpdate}, }; -use std::{borrow::Cow, collections::HashMap, path::PathBuf}; +use std::{borrow::Cow, path::PathBuf}; #[derive(Debug, clap::Args)] pub struct Args { @@ -42,11 +44,16 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { .into_diagnostic() .context("decoding tx cbor")?; - let mut utxos = HashMap::new(); - ledger - .resolve_inputs_for_tx(&tx, &mut utxos) + let refs = tx + .consumes() + .iter() + .map(|utxo| TxoRef(*utxo.hash(), utxo.index() as u32)) + .collect_vec(); + + let resolved = ledger + .get_utxos(refs) .into_diagnostic() - .context("resolving tx inputs")?; + .context("resolving utxo")?; let byron_genesis = pallas::ledger::configs::byron::from_file(&config.byron.path) .into_diagnostic() @@ -56,43 +63,59 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { .into_diagnostic() .context("loading shelley genesis")?; + let alonzo_genesis = pallas::ledger::configs::alonzo::from_file(&config.alonzo.path) + .into_diagnostic() + .context("loading alonzo genesis")?; + let mut utxos2 = UTxOs::new(); - for (ref_, output) in utxos.iter() { + for (ref_, body) in resolved.iter() { let txin = pallas::ledger::primitives::byron::TxIn::Variant0( - pallas::codec::utils::CborWrap((ref_.0, ref_.1 as u32)), + pallas::codec::utils::CborWrap((ref_.0, ref_.1)), ); let key = MultiEraInput::Byron( >>::from(Cow::Owned(txin)), ); - let era = Era::try_from(output.0) + let era = Era::try_from(body.0) .into_diagnostic() .context("parsing era")?; - let value = MultiEraOutput::decode(era, &output.1) + let value = MultiEraOutput::decode(era, &body.1) .into_diagnostic() .context("decoding utxo")?; utxos2.insert(key, value); } - let pparams = dolos::pparams::compute_pparams( - dolos::pparams::Genesis { + let updates = ledger + .get_pparams(args.epoch) + .into_diagnostic() + .context("retrieving pparams")?; + + let updates = updates + .iter() + .map(|PParamsBody(era, cbor)| -> miette::Result { + let era = Era::try_from(*era).into_diagnostic()?; + MultiEraUpdate::decode_for_era(era, cbor).into_diagnostic() + }) + .try_collect()?; + + let pparams = dolos::ledger::pparams::fold_pparams( + dolos::ledger::pparams::Genesis { byron: &byron_genesis, shelley: &shelley_genesis, + alonzo: &alonzo_genesis, }, - &ledger, + updates, args.epoch, - ) - .into_diagnostic() - .context("computing protocol params")?; + ); let context = ValidationContext { block_slot: args.block_slot, prot_magic: config.upstream.network_magic as u32, - network_id: config.upstream.network_id, + network_id: args.network_id, prot_params: pparams, }; diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index f0201b1..c519ff1 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -72,6 +72,7 @@ pub struct Config { pub retries: Option, pub byron: GenesisFileRef, pub shelley: GenesisFileRef, + pub alonzo: GenesisFileRef, #[serde(default)] pub logging: LoggingConfig, } @@ -109,7 +110,6 @@ fn main() -> Result<()> { Command::Sync(x) => sync::run(&config, &x)?, Command::Data(x) => data::run(&config, &x)?, Command::Eval(x) => eval::run(&config, &x)?, - #[cfg(feature = "mithril")] Command::Bootstrap(x) => bootstrap::run(&config, &x)?, }; diff --git a/src/ledger/mod.rs b/src/ledger/mod.rs new file mode 100644 index 0000000..55b6496 --- /dev/null +++ b/src/ledger/mod.rs @@ -0,0 +1,289 @@ +use pallas::crypto::hash::Hash; +use pallas::ledger::traverse::MultiEraBlock; +use std::collections::{HashMap, HashSet}; + +pub mod pparams; +pub mod store; +//pub mod validate; + +pub type Era = u16; +pub type TxHash = Hash<32>; +pub type TxoIdx = u32; +pub type BlockSlot = u64; +pub type BlockHash = Hash<32>; +pub type TxOrder = usize; + +#[derive(Debug)] +pub struct UtxoBody(pub Era, pub Vec); + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct TxoRef(pub TxHash, pub TxoIdx); + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct ChainPoint(pub BlockSlot, pub BlockHash); + +#[derive(Debug)] +pub struct PParamsBody(pub Era, pub Vec); + +pub enum BrokenInvariant { + MissingUtxo(TxoRef), +} + +pub trait LedgerSlice<'a> { + fn get_tip(&'a self) -> ChainPoint; + fn get_utxo(&'a self, txo_ref: &TxoRef) -> Option<&'a UtxoBody>; + fn pparams(&'a self, until: BlockSlot) -> Vec; +} + +#[derive(Default, Debug)] +pub struct LedgerDelta { + pub new_position: Option, + pub undone_position: Option, + pub produced_utxo: HashMap, + pub consumed_utxo: HashSet, + pub recovered_stxi: HashSet, + pub undone_utxo: HashSet, + pub new_pparams: Vec, +} + +/// Computes the ledger delta of applying a particular block. +/// +/// The output represent a self-contained description of the changes that need +/// to occur at the data layer to advance the ledger to the new position (new +/// slot). +/// +/// The function is pure (stateless and without side-effects) with the goal of +/// allowing the logic to execute as an idem-potent, atomic operation, allowing +/// higher-layers to retry the logic if required. +/// +/// This method assumes that the block has already been validated, it will +/// return an error if any of the assumed invariant have been broken in the +/// process of computing the delta, but it own't provide a comprehensive +/// validation of the ledger rules. +pub fn compute_delta(block: &MultiEraBlock) -> LedgerDelta { + let mut delta = LedgerDelta { + new_position: Some(ChainPoint(block.slot(), block.hash())), + ..Default::default() + }; + + let txs = block.txs(); + + for tx in txs.iter() { + for (idx, produced) in tx.produces() { + let uxto_ref = TxoRef(tx.hash(), idx as u32); + let utxo_body = UtxoBody(tx.era().into(), produced.encode()); + + delta.produced_utxo.insert(uxto_ref, utxo_body); + } + + for consumed in tx.consumes() { + let stxi_ref = TxoRef(*consumed.hash(), consumed.index() as u32); + delta.consumed_utxo.insert(stxi_ref); + } + + if let Some(update) = tx.update() { + delta + .new_pparams + .push(PParamsBody(tx.era().into(), update.encode())); + } + } + + // check block-level updates (because of f#!@#@ byron) + if let Some(update) = block.update() { + delta + .new_pparams + .push(PParamsBody(block.era().into(), update.encode())); + } + + delta +} + +pub fn compute_undo_delta(block: &MultiEraBlock) -> LedgerDelta { + let mut delta = LedgerDelta { + undone_position: Some(ChainPoint(block.slot(), block.hash())), + ..Default::default() + }; + + for tx in block.txs() { + for (idx, _) in tx.produces() { + let utxo_ref = TxoRef(tx.hash(), idx as u32); + delta.undone_utxo.insert(utxo_ref); + } + } + + for tx in block.txs() { + for consumed in tx.consumes() { + let stxi_ref = TxoRef(*consumed.hash(), consumed.index() as u32); + delta.recovered_stxi.insert(stxi_ref); + } + } + + delta +} + +pub fn compute_origin_delta(byron: &pallas::ledger::configs::byron::GenesisFile) -> LedgerDelta { + let mut delta = LedgerDelta::default(); + + let utxos = pallas::ledger::configs::byron::genesis_utxos(byron); + + for (tx, addr, amount) in utxos { + let utxo_ref = TxoRef(tx, 0); + let utxo_body = pallas::ledger::primitives::byron::TxOut { + address: pallas::ledger::primitives::byron::Address { + payload: addr.payload, + crc: addr.crc, + }, + amount, + }; + + let utxo_body = pallas::codec::minicbor::to_vec(utxo_body).unwrap(); + let utxo_body = UtxoBody(0, utxo_body); + + delta.produced_utxo.insert(utxo_ref, utxo_body); + } + + delta +} + +#[cfg(test)] +mod tests { + use pallas::crypto::hash::Hash; + use std::str::FromStr; + + use super::*; + + fn assert_genesis_utxo_exists(db: &LedgerDelta, tx_hex: &str, addr_base58: &str, amount: u64) { + let tx = Hash::<32>::from_str(tx_hex).unwrap(); + + let utxo_body = db.produced_utxo.get(&TxoRef(tx, 0)); + + assert!(utxo_body.is_some(), "utxo not found"); + let UtxoBody(era, cbor) = utxo_body.unwrap(); + + assert_eq!(*era, 0); + + let txout: Result = + pallas::codec::minicbor::decode(&cbor); + + assert!(txout.is_ok(), "can't parse utxo cbor"); + let txout = txout.unwrap(); + + assert_eq!(txout.amount, amount, "utxo amount doesn't match"); + + let addr = pallas::ledger::addresses::ByronAddress::new( + txout.address.payload.as_ref(), + txout.address.crc, + ); + + assert_eq!(addr.to_base58(), addr_base58); + } + + #[test] + fn test_mainnet_genesis_utxos() { + let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) + .join("examples") + .join("sync-mainnet") + .join("byron.json"); + + let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); + let delta = compute_origin_delta(&byron); + + assert_genesis_utxo_exists( + &delta, + "0ae3da29711600e94a33fb7441d2e76876a9a1e98b5ebdefbf2e3bc535617616", + "Ae2tdPwUPEZKQuZh2UndEoTKEakMYHGNjJVYmNZgJk2qqgHouxDsA5oT83n", + 2_463_071_701_000_000, + ) + } + + #[test] + fn test_preview_genesis_utxos() { + let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) + .join("examples") + .join("sync-preview") + .join("byron.json"); + + let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); + let delta = compute_origin_delta(&byron); + + assert_genesis_utxo_exists( + &delta, + "4843cf2e582b2f9ce37600e5ab4cc678991f988f8780fed05407f9537f7712bd", + "FHnt4NL7yPXvDWHa8bVs73UEUdJd64VxWXSFNqetECtYfTd9TtJguJ14Lu3feth", + 30_000_000_000_000_000, + ); + } + + fn load_test_block(name: &str) -> Vec { + let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) + .join("test_data") + .join(name); + + let content = std::fs::read_to_string(path).unwrap(); + hex::decode(content).unwrap() + } + + #[test] + fn test_apply_delta() { + // nice block with several txs, it includes chaining edge case + let cbor = load_test_block("alonzo27.block"); + + let block = MultiEraBlock::decode(&cbor).unwrap(); + + let delta = super::compute_delta(&block); + + for tx in block.txs() { + for input in tx.consumes() { + // assert that consumed utxos are no longer in the unspent set + let consumed = delta + .consumed_utxo + .contains(&TxoRef(*input.hash(), input.index() as u32)); + + assert!(consumed); + } + + for (idx, expected) in tx.produces() { + let utxo = delta.produced_utxo.get(&TxoRef(tx.hash(), idx as u32)); + + match utxo { + Some(UtxoBody(era, cbor)) => { + assert_eq!( + tx.era(), + pallas::ledger::traverse::Era::try_from(*era).unwrap(), + "expected produced utxo era doesn't match" + ); + + let expected_cbor = expected.encode(); + + assert_eq!( + &expected_cbor, cbor, + "expected produced utxo cbor doesn't match" + ); + } + None => panic!("expected produced utxo is not in not in delta"), + } + } + } + } + + #[test] + fn test_undo_block() { + // nice block with several txs, it includes chaining edge case + let cbor = load_test_block("alonzo27.block"); + + let block = MultiEraBlock::decode(&cbor).unwrap(); + + let apply = super::compute_delta(&block); + let undo = super::compute_undo_delta(&block); + + for (produced, _) in apply.produced_utxo.iter() { + assert!(undo.undone_utxo.contains(produced)); + } + + for consumed in apply.consumed_utxo.iter() { + assert!(undo.recovered_stxi.contains(consumed)); + } + + assert_eq!(apply.new_position, undo.undone_position); + } +} diff --git a/src/ledger/pparams/mod.rs b/src/ledger/pparams/mod.rs new file mode 100644 index 0000000..6a2ed16 --- /dev/null +++ b/src/ledger/pparams/mod.rs @@ -0,0 +1,258 @@ +use pallas::{ + applying::utils::{ + AlonzoProtParams, BabbageProtParams, ByronProtParams, MultiEraProtocolParameters, + ShelleyProtParams, + }, + ledger::{ + configs::{alonzo, byron, shelley}, + primitives::alonzo::Language, + traverse::MultiEraUpdate, + }, +}; +use tracing::warn; + +//mod test_data; + +pub struct Genesis<'a> { + pub byron: &'a byron::GenesisFile, + pub shelley: &'a shelley::GenesisFile, + pub alonzo: &'a alonzo::GenesisFile, +} + +fn bootstrap_byron_pparams(byron: &byron::GenesisFile) -> ByronProtParams { + ByronProtParams { + block_version: (1, 0, 0), + summand: byron.block_version_data.tx_fee_policy.summand, + multiplier: byron.block_version_data.tx_fee_policy.multiplier, + max_tx_size: byron.block_version_data.max_tx_size, + script_version: byron.block_version_data.script_version, + slot_duration: byron.block_version_data.slot_duration, + max_block_size: byron.block_version_data.max_block_size, + max_header_size: byron.block_version_data.max_header_size, + max_proposal_size: byron.block_version_data.max_proposal_size, + mpc_thd: byron.block_version_data.mpc_thd, + heavy_del_thd: byron.block_version_data.heavy_del_thd, + update_vote_thd: byron.block_version_data.update_vote_thd, + update_proposal_thd: byron.block_version_data.update_proposal_thd, + update_implicit: byron.block_version_data.update_implicit, + soft_fork_rule: byron.block_version_data.softfork_rule.clone().into(), + unlock_stake_epoch: byron.block_version_data.unlock_stake_epoch, + } +} + +fn bootstrap_shelley_pparams( + _previous: ByronProtParams, + shelley: &shelley::GenesisFile, +) -> ShelleyProtParams { + ShelleyProtParams { + protocol_version: shelley.protocol_params.protocol_version.clone().into(), + max_block_body_size: shelley.protocol_params.max_block_body_size, + max_transaction_size: shelley.protocol_params.max_tx_size, + max_block_header_size: shelley.protocol_params.max_block_header_size, + key_deposit: shelley.protocol_params.key_deposit, + min_utxo_value: shelley.protocol_params.min_utxo_value, + minfee_a: shelley.protocol_params.min_fee_a, + minfee_b: shelley.protocol_params.min_fee_b, + pool_deposit: shelley.protocol_params.pool_deposit, + desired_number_of_stake_pools: shelley.protocol_params.n_opt, + min_pool_cost: shelley.protocol_params.min_pool_cost, + expansion_rate: shelley.protocol_params.rho.clone(), + treasury_growth_rate: shelley.protocol_params.tau.clone(), + maximum_epoch: shelley.protocol_params.e_max, + pool_pledge_influence: shelley.protocol_params.a0.clone(), + decentralization_constant: shelley.protocol_params.decentralisation_param.clone(), + extra_entropy: shelley.protocol_params.extra_entropy.clone().into(), + } +} + +fn bootstrap_alonzo_pparams( + previous: ShelleyProtParams, + genesis: &alonzo::GenesisFile, +) -> AlonzoProtParams { + AlonzoProtParams { + minfee_a: previous.minfee_a, + minfee_b: previous.minfee_b, + max_block_body_size: previous.max_block_body_size, + max_transaction_size: previous.max_transaction_size, + max_block_header_size: previous.max_block_header_size, + key_deposit: previous.key_deposit, + pool_deposit: previous.pool_deposit, + protocol_version: previous.protocol_version, + min_pool_cost: previous.min_pool_cost, + desired_number_of_stake_pools: previous.desired_number_of_stake_pools, + expansion_rate: previous.expansion_rate.clone(), + treasury_growth_rate: previous.treasury_growth_rate.clone(), + maximum_epoch: previous.maximum_epoch, + pool_pledge_influence: previous.pool_pledge_influence, + decentralization_constant: previous.decentralization_constant, + extra_entropy: previous.extra_entropy, + // new from genesis + ada_per_utxo_byte: genesis.lovelace_per_utxo_word, + cost_models_for_script_languages: genesis.cost_models.clone().into(), + execution_costs: genesis.execution_prices.clone().into(), + max_tx_ex_units: genesis.max_tx_ex_units.clone().into(), + max_block_ex_units: genesis.max_block_ex_units.clone().into(), + max_value_size: genesis.max_value_size, + collateral_percentage: genesis.collateral_percentage, + max_collateral_inputs: genesis.max_collateral_inputs, + } +} + +fn bootstrap_babbage_pparams(previous: AlonzoProtParams) -> BabbageProtParams { + BabbageProtParams { + minfee_a: previous.minfee_a, + minfee_b: previous.minfee_b, + max_block_body_size: previous.max_block_body_size, + max_transaction_size: previous.max_transaction_size, + max_block_header_size: previous.max_block_header_size, + key_deposit: previous.key_deposit, + pool_deposit: previous.pool_deposit, + protocol_version: previous.protocol_version, + min_pool_cost: previous.min_pool_cost, + desired_number_of_stake_pools: previous.desired_number_of_stake_pools, + ada_per_utxo_byte: previous.ada_per_utxo_byte, + execution_costs: previous.execution_costs, + max_tx_ex_units: previous.max_tx_ex_units, + max_block_ex_units: previous.max_block_ex_units, + max_value_size: previous.max_value_size, + collateral_percentage: previous.collateral_percentage, + max_collateral_inputs: previous.max_collateral_inputs, + expansion_rate: previous.expansion_rate, + treasury_growth_rate: previous.treasury_growth_rate, + maximum_epoch: previous.maximum_epoch, + pool_pledge_influence: previous.pool_pledge_influence, + decentralization_constant: previous.decentralization_constant, + extra_entropy: previous.extra_entropy, + cost_models_for_script_languages: pallas::ledger::primitives::babbage::CostMdls { + plutus_v1: previous + .cost_models_for_script_languages + .iter() + .filter(|(k, _)| k == &Language::PlutusV1) + .map(|(_, v)| v.clone()) + .next(), + plutus_v2: None, + }, + } +} + +fn apply_param_update( + current: MultiEraProtocolParameters, + update: &MultiEraUpdate, +) -> MultiEraProtocolParameters { + match current { + MultiEraProtocolParameters::Byron(mut pparams) => { + if let Some(new) = update.byron_proposed_block_version() { + warn!(?new, "found new block version"); + pparams.block_version = new; + } + + if let Some(pallas::ledger::primitives::byron::TxFeePol::Variant0(new)) = + update.byron_proposed_fee_policy() + { + warn!("found new byron fee policy update proposal"); + let (summand, multiplier) = new.unwrap(); + pparams.summand = summand as u64; + pparams.multiplier = multiplier as u64; + } + + if let Some(new) = update.byron_proposed_max_tx_size() { + warn!("found new byron max tx size update proposal"); + pparams.max_tx_size = new; + } + + MultiEraProtocolParameters::Byron(pparams) + } + MultiEraProtocolParameters::Shelley(mut pparams) => { + if let Some(new) = update.first_proposed_protocol_version() { + warn!(?new, "found new protocol version"); + pparams.protocol_version = new; + } + + if let Some(x) = update.first_proposed_minfee_a() { + warn!(x, "found new minfee a update proposal"); + pparams.minfee_a = x; + } + + if let Some(x) = update.first_proposed_minfee_b() { + warn!(x, "found new minfee b update proposal"); + pparams.minfee_b = x; + } + + if let Some(x) = update.first_proposed_max_transaction_size() { + warn!(x, "found new max tx size update proposal"); + pparams.max_transaction_size = x; + } + + // TODO: where's the min utxo value in the network primitives for shelley? do we + // have them wrong in Pallas? + + MultiEraProtocolParameters::Shelley(pparams) + } + MultiEraProtocolParameters::Alonzo(mut pparams) => { + if let Some(new) = update.first_proposed_protocol_version() { + warn!(?new, "found new protocol version"); + pparams.protocol_version = new; + } + + MultiEraProtocolParameters::Alonzo(pparams) + } + MultiEraProtocolParameters::Babbage(mut pparams) => { + if let Some(new) = update.first_proposed_protocol_version() { + warn!(?new, "found new protocol version"); + pparams.protocol_version = new; + } + + MultiEraProtocolParameters::Babbage(pparams) + } + _ => unimplemented!(), + } +} + +fn advance_hardfork( + current: MultiEraProtocolParameters, + genesis: &Genesis, + next_protocol: usize, +) -> MultiEraProtocolParameters { + match current { + MultiEraProtocolParameters::Byron(current) if next_protocol == 2 => { + MultiEraProtocolParameters::Shelley(bootstrap_shelley_pparams(current, genesis.shelley)) + } + MultiEraProtocolParameters::Shelley(current) if next_protocol == 3 => { + MultiEraProtocolParameters::Shelley(current) + } + MultiEraProtocolParameters::Shelley(current) if next_protocol == 4 => { + MultiEraProtocolParameters::Shelley(current) + } + MultiEraProtocolParameters::Shelley(current) if next_protocol == 5 => { + MultiEraProtocolParameters::Alonzo(bootstrap_alonzo_pparams(current, genesis.alonzo)) + } + MultiEraProtocolParameters::Alonzo(current) if next_protocol == 6 => { + MultiEraProtocolParameters::Babbage(bootstrap_babbage_pparams(current)) + } + MultiEraProtocolParameters::Babbage(_) => todo!("conway pparams handling pending"), + _ => unimplemented!("don't know how to handle hardfork"), + } +} + +pub fn fold_pparams( + genesis: Genesis, + updates: Vec, + for_epoch: u64, +) -> MultiEraProtocolParameters { + let mut pparams = MultiEraProtocolParameters::Byron(bootstrap_byron_pparams(genesis.byron)); + let mut last_protocol = 1; + + for epoch in 0..for_epoch { + for next_protocol in last_protocol + 1..=pparams.protocol_version() { + warn!(next_protocol, "advancing hardfork"); + pparams = advance_hardfork(pparams, &genesis, next_protocol); + last_protocol = next_protocol; + } + + for update in updates.iter().filter(|e| e.epoch() == epoch) { + pparams = apply_param_update(pparams, update); + } + } + + pparams +} diff --git a/src/pparams/test_data.rs b/src/ledger/pparams/test_data.rs similarity index 78% rename from src/pparams/test_data.rs rename to src/ledger/pparams/test_data.rs index dedb955..b095e4f 100644 --- a/src/pparams/test_data.rs +++ b/src/ledger/pparams/test_data.rs @@ -1,10 +1,10 @@ use pallas::applying::{ - utils::{AlonzoProtParams, BabbageProtParams, FeePolicy}, - MultiEraProtParams, + utils::{AlonzoProtParams, BabbageProtParams}, + MultiEraProtocolParameters, }; -fn preprod_values_epoch_7() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn preprod_values_epoch_7() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -21,8 +21,8 @@ fn preprod_values_epoch_7() -> MultiEraProtParams { }) } -fn preprod_values_epoch_12() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preprod_values_epoch_12() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -39,8 +39,8 @@ fn preprod_values_epoch_12() -> MultiEraProtParams { }) } -fn preprod_values_epoch_28() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preprod_values_epoch_28() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -57,8 +57,8 @@ fn preprod_values_epoch_28() -> MultiEraProtParams { }) } -fn preprod_values_epoch_51() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preprod_values_epoch_51() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -75,8 +75,8 @@ fn preprod_values_epoch_51() -> MultiEraProtParams { }) } -fn preview_values_epoch_1() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn preview_values_epoch_1() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -93,8 +93,8 @@ fn preview_values_epoch_1() -> MultiEraProtParams { }) } -fn preview_values_epoch_3() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preview_values_epoch_3() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -111,8 +111,8 @@ fn preview_values_epoch_3() -> MultiEraProtParams { }) } -fn preview_values_epoch_9() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preview_values_epoch_9() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -129,8 +129,8 @@ fn preview_values_epoch_9() -> MultiEraProtParams { }) } -fn preview_values_epoch_107() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn preview_values_epoch_107() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -147,8 +147,8 @@ fn preview_values_epoch_107() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_290() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn mainnet_values_epoch_290() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -165,8 +165,8 @@ fn mainnet_values_epoch_290() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_306() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn mainnet_values_epoch_306() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -183,8 +183,8 @@ fn mainnet_values_epoch_306() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_319() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn mainnet_values_epoch_319() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -201,8 +201,8 @@ fn mainnet_values_epoch_319() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_322() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn mainnet_values_epoch_322() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -219,8 +219,8 @@ fn mainnet_values_epoch_322() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_328() -> MultiEraProtParams { - MultiEraProtParams::Alonzo(AlonzoProtParams { +fn mainnet_values_epoch_328() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Alonzo(AlonzoProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, @@ -237,8 +237,8 @@ fn mainnet_values_epoch_328() -> MultiEraProtParams { }) } -fn mainnet_values_epoch_365() -> MultiEraProtParams { - MultiEraProtParams::Babbage(BabbageProtParams { +fn mainnet_values_epoch_365() -> MultiEraProtocolParameters { + MultiEraProtocolParameters::Babbage(BabbageProtParams { fee_policy: FeePolicy { summand: 155381, multiplier: 44, diff --git a/src/ledger/store.rs b/src/ledger/store.rs new file mode 100644 index 0000000..0f2f894 --- /dev/null +++ b/src/ledger/store.rs @@ -0,0 +1,177 @@ +use redb::{MultimapTableDefinition, ReadableTable, TableDefinition, WriteTransaction}; +use std::path::Path; + +use super::*; + +trait LedgerTable { + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), redb::Error>; +} + +const BLOCKS: TableDefinition = TableDefinition::new("blocks"); +struct BlocksTable; + +impl LedgerTable for BlocksTable { + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), redb::Error> { + let mut table = wx.open_table(BLOCKS)?; + + if let Some(ChainPoint(slot, hash)) = delta.new_position.as_ref() { + let v: &[u8; 32] = hash; + table.insert(slot, v)?; + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { + table.remove(slot)?; + } + + Ok(()) + } +} + +type UtxosKey<'a> = (&'a [u8; 32], u32); +type UtxosValue<'a> = (Era, &'a [u8]); + +const UTXOS: TableDefinition = TableDefinition::new("utxos"); + +struct UtxosTable; + +impl LedgerTable for UtxosTable { + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), redb::Error> { + let mut table = wx.open_table(UTXOS)?; + + for (k, v) in delta.produced_utxo.iter() { + let k: (&[u8; 32], u32) = (&k.0, k.1); + let v: (u16, &[u8]) = (v.0, &v.1); + table.insert(k, v)?; + } + + for k in delta.undone_utxo.iter() { + let k: (&[u8; 32], u32) = (&k.0, k.1); + table.remove(k)?; + } + + Ok(()) + } +} + +const PPARAMS: TableDefinition = TableDefinition::new("pparams"); +struct PParamsTable; + +impl LedgerTable for PParamsTable { + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), redb::Error> { + let mut table = wx.open_table(PPARAMS)?; + + if let Some(ChainPoint(slot, _)) = delta.new_position { + for PParamsBody(era, body) in delta.new_pparams.iter() { + let v: (u16, &[u8]) = (*era, &body); + table.insert(slot, v)?; + } + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position { + table.remove(slot)?; + } + + Ok(()) + } +} + +pub const TOMBSTONES: MultimapTableDefinition = + MultimapTableDefinition::new("tombstones"); +struct TombstonesTable; + +impl LedgerTable for TombstonesTable { + fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), redb::Error> { + let mut table = wx.open_multimap_table(TOMBSTONES)?; + + if let Some(ChainPoint(slot, _)) = delta.new_position.as_ref() { + for stxi in delta.consumed_utxo.iter() { + let stxi: (&[u8; 32], u32) = (&stxi.0, stxi.1); + table.insert(slot, stxi)?; + } + } + + if let Some(ChainPoint(slot, _)) = delta.undone_position.as_ref() { + table.remove_all(slot)?; + } + + Ok(()) + } +} + +pub struct LedgerStore(redb::Database); + +impl LedgerStore { + pub fn open(path: impl AsRef) -> Result { + let inner = redb::Database::create(path)?; + + Ok(Self(inner)) + } + + pub fn is_empty(&self) -> bool { + match self.cursor() { + Ok(x) => x.is_some(), + Err(_) => false, + } + } + + pub fn cursor(&self) -> Result, redb::Error> { + let rx = self.0.begin_read()?; + let table = rx.open_table(BLOCKS)?; + let last = table.last()?; + let last = last.map(|(k, v)| ChainPoint(k.value(), Hash::new(*v.value()))); + + Ok(last) + } + + pub fn apply(&mut self, deltas: &[LedgerDelta]) -> Result<(), redb::Error> { + let mut wx = self.0.begin_write()?; + wx.set_durability(redb::Durability::Eventual); + + for delta in deltas { + UtxosTable::apply(&wx, delta)?; + PParamsTable::apply(&wx, delta)?; + TombstonesTable::apply(&wx, delta)?; + + // indexes? + BlocksTable::apply(&wx, delta)?; + } + + wx.commit()?; + + Ok(()) + } + + pub fn get_utxos( + &self, + refs: impl IntoIterator, + ) -> Result, redb::Error> { + let rx = self.0.begin_read()?; + let table = rx.open_table(UTXOS)?; + let mut out = HashMap::new(); + + for key in refs { + let body = table.get(&(&key.0 as &[u8; 32], key.1))?; + let body = body.unwrap(); + // TODO: return invariant broken error + let (era, cbor) = body.value(); + out.insert(key, UtxoBody(era, Vec::from(cbor))); + } + + Ok(out) + } + + pub fn get_pparams(&self, until: BlockSlot) -> Result, redb::Error> { + let rx = self.0.begin_read()?; + let table = rx.open_table(PPARAMS)?; + + let mut out = vec![]; + + for item in table.range(..until)? { + let (_, body) = item?; + let (era, cbor) = body.value(); + out.push(PParamsBody(era, Vec::from(cbor))); + } + + Ok(out) + } +} diff --git a/src/lib.rs b/src/lib.rs index a9e9275..1c2ec0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,8 @@ +pub mod ledger; pub mod model; -pub mod pparams; pub mod prelude; pub mod querydb; pub mod serve; -pub mod storage; pub mod submit; pub mod sync; diff --git a/src/pparams/mod.rs b/src/pparams/mod.rs deleted file mode 100644 index 631c4ea..0000000 --- a/src/pparams/mod.rs +++ /dev/null @@ -1,160 +0,0 @@ -use gasket::framework::{AsWorkError, WorkerError}; -use pallas::{ - applying::{ - utils::{ByronProtParams, FeePolicy, ShelleyProtParams}, - MultiEraProtParams, - }, - ledger::{ - configs::{byron, shelley}, - traverse::{Era, MultiEraUpdate}, - }, -}; -use tracing::{info, warn}; - -use crate::storage::applydb::ApplyDB; - -mod test_data; - -pub struct Genesis<'a> { - pub byron: &'a byron::GenesisFile, - pub shelley: &'a shelley::GenesisFile, -} - -fn pparams_from_byron_genesis( - byron: &byron::GenesisFile, -) -> Result { - let out = pallas::applying::MultiEraProtParams::Byron(ByronProtParams { - fee_policy: FeePolicy { - summand: byron - .block_version_data - .tx_fee_policy - .summand - .parse() - .or_panic()?, - multiplier: byron - .block_version_data - .tx_fee_policy - .multiplier - .parse() - .or_panic()?, - }, - max_tx_size: byron.block_version_data.max_tx_size.parse().or_panic()?, - }); - - Ok(out) -} - -fn pparams_from_shelley_genesis( - shelley: &shelley::GenesisFile, -) -> Result { - let out = pallas::applying::MultiEraProtParams::Shelley(ShelleyProtParams { - fee_policy: FeePolicy { - summand: shelley.protocol_params.min_fee_a, - multiplier: shelley.protocol_params.min_fee_b, - }, - max_tx_size: shelley.protocol_params.max_tx_size, - min_lovelace: shelley.protocol_params.min_u_tx_o_value, - }); - - Ok(out) -} - -fn apply_era_hardfork( - genesis: &Genesis, - new_protocol: u64, -) -> Result { - match new_protocol { - 1 => pparams_from_byron_genesis(genesis.byron), - 2..=4 => pparams_from_shelley_genesis(genesis.shelley), - x => { - unimplemented!("don't know how to handle hardfork for protocol {x}"); - } - } -} - -fn apply_param_update( - genesis: &Genesis, - era: Era, - current: MultiEraProtParams, - update: MultiEraUpdate, -) -> Result { - match current { - MultiEraProtParams::Byron(mut pparams) => { - assert_eq!(u16::from(era), 1, "pparam update doesn't match era"); - - if let Some((major, _, _)) = update.byron_proposed_block_version() { - warn!(major, "found new byron protocol update proposal"); - return apply_era_hardfork(genesis, major as u64); - } - - if let Some(pallas::ledger::primitives::byron::TxFeePol::Variant0(new)) = - update.byron_proposed_fee_policy() - { - warn!("found new byron fee policy update proposal"); - - let new = new.unwrap(); - pparams.fee_policy = FeePolicy { - summand: new.0 as u64, - multiplier: new.1 as u64, - }; - } - - if let Some(new) = update.byron_proposed_max_tx_size() { - warn!("found new byron max tx size update proposal"); - pparams.max_tx_size = new; - } - - Ok(MultiEraProtParams::Byron(pparams)) - } - MultiEraProtParams::Shelley(mut pparams) => { - assert_eq!(u16::from(era), 2, "pparam update doesn't match era"); - - if let Some((major, _)) = update.first_proposed_protocol_version() { - warn!(major, "found new shelley protocol update proposal"); - return apply_era_hardfork(genesis, major); - } - - if let Some(x) = update.first_proposed_minfee_a() { - warn!(x, "found new minfee a update proposal"); - pparams.fee_policy.summand = x as u64; - } - - if let Some(x) = update.first_proposed_minfee_b() { - warn!(x, "found new minfee b update proposal"); - pparams.fee_policy.multiplier = x as u64; - } - - if let Some(x) = update.first_proposed_max_transaction_size() { - warn!(x, "found new max tx size update proposal"); - pparams.max_tx_size = x as u64; - } - - // TODO: where's the min utxo value in the network primitives for shelley? do we - // have them wrong in Pallas? - - Ok(MultiEraProtParams::Shelley(pparams)) - } - _ => unimplemented!(), - } -} - -// TODO: perform proper protocol parameters update for the Alonzo era. -pub fn compute_pparams( - genesis: Genesis, - ledger: &ApplyDB, - epoch: u64, -) -> Result { - let mut prot_params = apply_era_hardfork(&genesis, 1)?; - - let updates = ledger.get_pparams_updates(epoch).or_panic()?; - - info!(epoch, updates = updates.len(), "computing pparams"); - - for (era, _, cbor) in updates { - let era = Era::try_from(era).or_panic()?; - let update = MultiEraUpdate::decode_for_era(era, &cbor).or_panic()?; - prot_params = apply_param_update(&genesis, era, prot_params, update)?; - } - - Ok(prot_params) -} diff --git a/src/prelude.rs b/src/prelude.rs index f41bac8..01c22c5 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -53,8 +53,8 @@ impl Error { Error::Message(text.into()) } - pub fn custom(error: Box) -> Error { - Error::Custom(format!("{error}")) + pub fn custom(error: impl Display) -> Error { + Error::Custom(error.to_string()) } } diff --git a/src/querydb/mod.rs b/src/querydb/mod.rs index 0f9b697..55c88cb 100644 --- a/src/querydb/mod.rs +++ b/src/querydb/mod.rs @@ -1,2 +1 @@ -pub mod prelude; pub mod store; diff --git a/src/querydb/prelude.rs b/src/querydb/prelude.rs deleted file mode 100644 index eeed0ab..0000000 --- a/src/querydb/prelude.rs +++ /dev/null @@ -1,71 +0,0 @@ -use pallas; -use redb::{MultimapTableDefinition, TableDefinition}; -// use std::error::Error; - -// Given a block, table "block" maps its slot to its CBOR representation -pub type BlockKeyType<'a> = u64; -pub type BlockValueType<'a> = &'a [u8]; -pub type BlockResultType = Vec; -pub const BLOCK_TABLE: TableDefinition = - TableDefinition::new("block"); - -// Given a block, table "block_by_hash" maps its hash to its slot. -pub type BlockByHashKeyType<'a> = &'a [u8; 32]; -pub type BlockByHashValueType<'a> = u64; -pub const BLOCK_BY_HASH_TABLE: TableDefinition = - TableDefinition::new("block_by_hash"); - -// Given a transaction, table "tx" maps its hash to an encoding representation -// of it -// NOTE: transactions don't have a precise CBOR representation, so we use -// a library encoded representation instead -pub type TxKeyType<'a> = &'a [u8; 32]; -pub type TxValueType<'a> = &'a [u8]; -pub type TxResultType = Vec; -pub const TX_TABLE: TableDefinition = TableDefinition::new("tx"); - -// Given a UTxO, table "utxo" maps its output reference (a pair composed of the -// hash of the transaction that produced the UTxO and the index in the list of -// transaction outputs corresponding to it) to the result of encoding said UTxO -// NOTE: Just like transactions, UTxO's don't have a precise CBOR -// representation. -pub type UTxOKeyType<'a> = (&'a [u8], u8); -pub type UTxOValueType<'a> = &'a [u8]; -pub type UTxOResultType = Vec; -pub const UTXO_TABLE: TableDefinition = TableDefinition::new("utxo"); - -// Given an address, table "utxo_by_addr" maps it to a list of pairs of a tx -// hash and an (output) index (each one representing a UTxO sitting at that -// address) -pub type UTxOByAddrKeyType<'a> = &'a [u8]; -pub type UTxOByAddrValueType<'a> = (&'a [u8], u8); -pub type UTxOByAddrResultType = (Vec, u8); -pub const UTXO_BY_ADDR_TABLE: MultimapTableDefinition = - MultimapTableDefinition::new("utxo_by_addr"); - -// Given a minting policy, table "utxo_by_beacon" maps it to a list of pairs of -// a tx hash and an (output) index (each one representing a UTxO containing a -// token of that policy) -pub type UTxOByBeaconKeyType<'a> = &'a [u8; 28]; -pub type UTxOByBeaconValueType<'a> = (&'a [u8], u8); -pub type UTxOByBeaconResultType = (Vec, u8); -pub const UTXO_BY_BEACON_TABLE: MultimapTableDefinition< - UTxOByBeaconKeyType, - UTxOByBeaconValueType, -> = MultimapTableDefinition::new("utxo_by_beacon"); - -// Table "prot_params" stores only the latest protocol parameters. -pub type ProtParamsKeyType = (); -pub type ProtParamsValueType<'a> = &'a [u8]; -pub type ProtParamsResultType = Vec; -pub const PROT_PARAMS_TABLE: TableDefinition = - TableDefinition::new("prot_params"); - -pub enum Error { - AddressDecoding(pallas::ledger::addresses::Error), - BlockDecoding(pallas::ledger::traverse::Error), - KeyNotFound, - OutputDecoding(pallas::codec::minicbor::decode::Error), - UTxOTableInvariantBroken, - ReDBError(Box), -} diff --git a/src/querydb/store.rs b/src/querydb/store.rs index 27ed56c..fe86dc4 100644 --- a/src/querydb/store.rs +++ b/src/querydb/store.rs @@ -1,4 +1,3 @@ -use crate::querydb::prelude::{Error::*, *}; use pallas::{ codec::minicbor::decode::Error as DecodingError, ledger::{ @@ -7,29 +6,127 @@ use pallas::{ }, }; use redb::{ - Database, MultimapTable, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, - ReadableMultimapTable, ReadableTable, Table, WriteTransaction, + Database, ReadOnlyTable, ReadTransaction, ReadableMultimapTable, ReadableTable, + WriteTransaction, }; use std::{ops::Deref, path::Path}; +use pallas; +use redb::{MultimapTableDefinition, TableDefinition}; +use thiserror::Error; + +// use std::error::Error; + +// Given a block, table "block" maps its slot to its CBOR representation +pub type BlockKeyType<'a> = u64; +pub type BlockValueType<'a> = &'a [u8]; +pub type BlockResultType = Vec; +pub const BLOCK_TABLE: TableDefinition = + TableDefinition::new("block"); + +// Given a block, table "block_by_hash" maps its hash to its slot. +pub type BlockByHashKeyType<'a> = &'a [u8; 32]; +pub type BlockByHashValueType<'a> = u64; +pub const BLOCK_BY_HASH_INDEX: TableDefinition = + TableDefinition::new("block_by_hash"); + +// Given a transaction, table "tx" maps its hash to an encoding representation +// of it +// NOTE: transactions don't have a precise CBOR representation, so we use +// a library encoded representation instead +pub type TxKeyType<'a> = &'a [u8; 32]; +pub type TxValueType<'a> = &'a [u8]; +pub type TxResultType = Vec; +pub const TX_TABLE: TableDefinition = TableDefinition::new("tx"); + +// Given a UTxO, table "utxo" maps its output reference (a pair composed of the +// hash of the transaction that produced the UTxO and the index in the list of +// transaction outputs corresponding to it) to the result of encoding said UTxO +// NOTE: Just like transactions, UTxO's don't have a precise CBOR +// representation. +pub type UTxOKeyType<'a> = (&'a [u8], u8); +pub type UTxOValueType<'a> = &'a [u8]; +pub type UTxOResultType = Vec; +pub const UTXO_TABLE: TableDefinition = TableDefinition::new("utxo"); + +// Given an address, table "utxo_by_addr" maps it to a list of pairs of a tx +// hash and an (output) index (each one representing a UTxO sitting at that +// address) +pub type UTxOByAddrKeyType<'a> = &'a [u8]; +pub type UTxOByAddrValueType<'a> = (&'a [u8], u8); +pub type UTxOByAddrResultType = (Vec, u8); +pub const UTXO_BY_ADDR_INDEX: MultimapTableDefinition = + MultimapTableDefinition::new("utxo_by_addr"); + +// Given a minting policy, table "utxo_by_beacon" maps it to a list of pairs of +// a tx hash and an (output) index (each one representing a UTxO containing a +// token of that policy) +pub type UTxOByBeaconKeyType<'a> = &'a [u8; 28]; +pub type UTxOByBeaconValueType<'a> = (&'a [u8], u8); +pub type UTxOByBeaconResultType = (Vec, u8); +pub const UTXO_BY_POLICY_INDEX: MultimapTableDefinition< + UTxOByBeaconKeyType, + UTxOByBeaconValueType, +> = MultimapTableDefinition::new("utxo_by_beacon"); + +// Table "prot_params" stores only the latest protocol parameters. +pub type ProtParamsKeyType = (); +pub type ProtParamsValueType<'a> = &'a [u8]; +pub type ProtParamsResultType = Vec; +pub const PROT_PARAMS_TABLE: TableDefinition = + TableDefinition::new("prot_params"); + +#[derive(Error, Debug)] +pub enum Error { + #[error("error decoding address")] + AddressDecoding(#[from] pallas::ledger::addresses::Error), + + #[error("error decoding block")] + BlockDecoding(#[from] pallas::ledger::traverse::Error), + + #[error("key not found")] + KeyNotFound, + + #[error("error decoding output")] + OutputDecoding(#[from] pallas::codec::minicbor::decode::Error), + + #[error("utxo table invariant broken")] + UTxOTableInvariantBroken, + + #[error("Redb error")] + ReDBError(#[from] redb::Error), + + #[error("IO error")] + IOError(#[from] std::io::Error), +} + +impl Error { + pub fn redb(inner: impl Into) -> Self { + Self::ReDBError(inner.into()) + } +} + pub struct Store { inner_store: Database, } impl Store { - pub fn new(path: impl AsRef) -> Result { + pub fn open(path: impl AsRef) -> Result { Ok(Store { - inner_store: Database::create(path).map_err(|e| ReDBError(Box::new(e)))?, + inner_store: Database::create(path).map_err(Error::redb)?, }) } + + pub fn destroy(path: impl AsRef) -> Result<(), Error> { + std::fs::remove_file(path).map_err(Error::IOError) + } + pub fn apply_block(&self, block_cbor: BlockValueType) -> Result<(), Error> { - let write_tx: WriteTransaction = self - .inner_store - .begin_write() - .map_err(|e| ReDBError(Box::new(e)))?; + let write_tx: WriteTransaction = self.inner_store.begin_write().map_err(Error::redb)?; let block: MultiEraBlock = self.store_block(&write_tx, block_cbor)?; self.store_txs(&write_tx, &block)?; - write_tx.commit().map_err(|e| ReDBError(Box::new(e)))?; + write_tx.commit().map_err(Error::redb)?; + Ok(()) } @@ -38,86 +135,95 @@ impl Store { write_tx: &'a WriteTransaction, block_cbor: &'a [u8], ) -> Result, Error> { - let block: MultiEraBlock = MultiEraBlock::decode(block_cbor).map_err(BlockDecoding)?; - let mut block_table: Table = write_tx + let block: MultiEraBlock = + MultiEraBlock::decode(block_cbor).map_err(Error::BlockDecoding)?; + + write_tx .open_table(BLOCK_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let _ = block_table.insert(block.slot(), block_cbor); - let mut block_by_hash_table: Table = write_tx - .open_table(BLOCK_BY_HASH_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let _ = block_by_hash_table.insert(block.hash().deref(), block.slot()); + .map_err(Error::redb)? + .insert(block.slot(), block_cbor) + .map_err(Error::redb)?; + + write_tx + .open_table(BLOCK_BY_HASH_INDEX) + .map_err(Error::redb)? + .insert(block.hash().deref(), block.slot()) + .map_err(Error::redb)?; + Ok(block) } fn store_txs(&self, write_tx: &WriteTransaction, block: &MultiEraBlock) -> Result<(), Error> { - let mut tx_table: Table = write_tx - .open_table(TX_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; + let mut tx_table = write_tx.open_table(TX_TABLE).map_err(Error::redb)?; + for tx in block.txs() { tx_table .insert(tx.hash().deref(), tx.encode().as_slice()) - .map_err(|e| ReDBError(Box::new(e)))?; + .map_err(Error::redb)?; + self.update_tx_outputs(write_tx, &tx)?; } + Ok(()) } fn update_tx_outputs(&self, write_tx: &WriteTransaction, tx: &MultiEraTx) -> Result<(), Error> { - let mut utxo_table: Table = write_tx - .open_table(UTXO_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let mut utxo_by_addr_table: MultimapTable = - write_tx - .open_multimap_table(UTXO_BY_ADDR_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let mut utxo_by_beacon_table: MultimapTable = - write_tx - .open_multimap_table(UTXO_BY_BEACON_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; + let mut utxo_table = write_tx.open_table(UTXO_TABLE).map_err(Error::redb)?; + + let mut utxo_by_addr_table = write_tx + .open_multimap_table(UTXO_BY_ADDR_INDEX) + .map_err(Error::redb)?; + + let mut utxo_by_policy_table = write_tx + .open_multimap_table(UTXO_BY_POLICY_INDEX) + .map_err(Error::redb)?; + for (index, tx_out) in tx.produces() { let tx_hash = tx.hash(); - utxo_table - .insert( - (tx_hash.as_slice(), index as u8), - tx_out.encode().as_slice(), - ) - .map_err(|e| ReDBError(Box::new(e)))?; + utxo_by_addr_table .insert( output_address(&tx_out)?.to_vec().as_slice(), (tx_hash.as_slice(), index as u8), ) - .map_err(|e| ReDBError(Box::new(e)))?; + .map_err(Error::redb)?; + for policy in output_policy_ids(&tx_out) { - utxo_by_beacon_table + utxo_by_policy_table .insert(&policy, (tx_hash.as_slice(), index as u8)) - .map_err(|e| ReDBError(Box::new(e)))?; + .map_err(Error::redb)?; } } + for multi_era_input in tx.consumes() { let tx_in: UTxOKeyType = ( multi_era_input.hash().as_slice(), multi_era_input.index() as u8, ); - match utxo_table.get(tx_in).map_err(|e| ReDBError(Box::new(e)))? { - Some(encoded_tx_out) => { - let tx_out: MultiEraOutput = - decode_output(encoded_tx_out.value()).map_err(OutputDecoding)?; - utxo_by_addr_table - .remove(output_address(&tx_out)?.to_vec().as_slice(), tx_in) - .map_err(|e| ReDBError(Box::new(e)))?; - for policy in output_policy_ids(&tx_out) { - utxo_by_beacon_table - .remove(&policy, tx_in) - .map_err(|e| ReDBError(Box::new(e)))?; - } - } - None => return Err(UTxOTableInvariantBroken), /* This means the input is not - * available for spending! */ + + let encoded_tx_out = utxo_table + .get(tx_in) + .map_err(Error::redb)? + .ok_or(Error::UTxOTableInvariantBroken)? + .value() + .to_vec(); + + let tx_out: MultiEraOutput = + decode_output(&encoded_tx_out).map_err(Error::OutputDecoding)?; + + utxo_by_addr_table + .remove(output_address(&tx_out)?.to_vec().as_slice(), tx_in) + .map_err(Error::redb)?; + + for policy in output_policy_ids(&tx_out) { + utxo_by_policy_table + .remove(&policy, tx_in) + .map_err(Error::redb)?; } - utxo_table.remove(tx_in).map_err(|_| unreachable!())?; + + utxo_table.remove(tx_in).map_err(Error::redb)?; } + Ok(()) } @@ -125,75 +231,64 @@ impl Store { &self, prot_params: ProtParamsValueType, ) -> Result<(), Error> { - let write_tx: WriteTransaction = self - .inner_store - .begin_write() - .map_err(|e| ReDBError(Box::new(e)))?; - let mut prot_params_table: Table = write_tx + let write_tx: WriteTransaction = self.inner_store.begin_write().map_err(Error::redb)?; + + let mut prot_params_table = write_tx .open_table(PROT_PARAMS_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; + .map_err(Error::redb)?; + let _ = prot_params_table.insert((), prot_params); + drop(prot_params_table); - write_tx.commit().map_err(|e| ReDBError(Box::new(e)))?; + + write_tx.commit().map_err(Error::redb)?; + Ok(()) } pub fn get_blockchain_tip(&self) -> Result { - let read_tx: ReadTransaction = self - .inner_store + self.inner_store .begin_read() - .map_err(|e| ReDBError(Box::new(e)))?; - let blockchain_table: ReadOnlyTable = read_tx + .map_err(Error::redb)? .open_table(BLOCK_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let res = blockchain_table + .map_err(Error::redb)? .last() - .map_err(|e| ReDBError(Box::new(e)))? - .ok_or(KeyNotFound) - .map(|entry| Vec::from(entry.1.value())); - res + .map_err(Error::redb)? + .ok_or(Error::KeyNotFound) + .map(|entry| Vec::from(entry.1.value())) } pub fn get_protocol_parameters(&self) -> Result { - let read_tx: ReadTransaction = self - .inner_store + self.inner_store .begin_read() - .map_err(|e| ReDBError(Box::new(e)))?; - let prot_params_table: ReadOnlyTable = read_tx + .map_err(Error::redb)? .open_table(PROT_PARAMS_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let res = prot_params_table + .map_err(Error::redb)? .get(()) - .map_err(|e| ReDBError(Box::new(e)))? - .ok_or(KeyNotFound) - .map(|entry| Vec::from(entry.value())); - res + .map_err(Error::redb)? + .ok_or(Error::KeyNotFound) + .map(|entry| Vec::from(entry.value())) } - pub fn get_utxos_from_address( + pub fn get_utxos_for_address( &self, addr: &UTxOByAddrKeyType, - ) -> Result>, Error> { - let read_tx: ReadTransaction = self - .inner_store - .begin_read() - .map_err(|e| ReDBError(Box::new(e)))?; - let utxo_by_addr_table: ReadOnlyMultimapTable = - read_tx - .open_multimap_table(UTXO_BY_ADDR_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; - let res = match utxo_by_addr_table.get(addr) { - Ok(database_results) => { - let mut res = vec![]; - for val in database_results.flatten() { - let (tx_hash, tx_index) = val.value(); - res.push((Vec::from(tx_hash), tx_index)) - } - Ok(Box::new(res.into_iter())) - } - Err(err) => Err(ReDBError(Box::new(err))), - }; - res + ) -> Result, Error> { + let read_tx = self.inner_store.begin_read().map_err(Error::redb)?; + + let values = read_tx + .open_multimap_table(UTXO_BY_ADDR_INDEX) + .map_err(Error::redb)? + .get(addr) + .map_err(Error::redb)? + .flatten() + .map(|x| { + let (a, b) = x.value(); + (Vec::from(a), b) + }) + .collect(); + + Ok(values) } pub fn get_utxo_from_reference(&self, utxo_ref: &UTxOKeyType) -> Option { @@ -218,30 +313,26 @@ impl Store { } pub fn get_block_from_hash(&self, block_hash: &BlockKeyType) -> Option { - let read_tx: ReadTransaction = self.inner_store.begin_read().ok()?; - let tx_table: ReadOnlyTable = - read_tx.open_table(BLOCK_TABLE).ok()?; - let res = tx_table + self.inner_store + .begin_read() + .ok()? + .open_table(BLOCK_TABLE) + .ok()? .get(block_hash) .ok()? - .map(|val| Vec::from(val.value())); - res + .map(|val| Vec::from(val.value())) } pub fn get_utxos_from_beacon( &self, beacon_policy_id: &UTxOByBeaconKeyType, ) -> Result>, Error> { - let read_tx: ReadTransaction = self - .inner_store - .begin_read() - .map_err(|e| ReDBError(Box::new(e)))?; - let utxo_by_beacon_table: ReadOnlyMultimapTable< - UTxOByBeaconKeyType, - UTxOByBeaconValueType, - > = read_tx - .open_multimap_table(UTXO_BY_BEACON_TABLE) - .map_err(|e| ReDBError(Box::new(e)))?; + let read_tx = self.inner_store.begin_read().map_err(Error::redb)?; + + let utxo_by_beacon_table = read_tx + .open_multimap_table(UTXO_BY_POLICY_INDEX) + .map_err(Error::redb)?; + let res = match utxo_by_beacon_table.get(beacon_policy_id) { Ok(database_results) => { let mut res = vec![]; @@ -251,7 +342,7 @@ impl Store { } Ok(Box::new(res.into_iter())) } - Err(err) => Err(ReDBError(Box::new(err))), + Err(err) => Err(Error::redb(err)), }; res } @@ -268,7 +359,7 @@ fn output_policy_ids(output: &MultiEraOutput) -> Vec<[u8; 28]> { } fn output_address(output: &MultiEraOutput) -> Result { - output.address().map_err(AddressDecoding) + output.address().map_err(Error::AddressDecoding) } fn decode_output(encoded_output: &[u8]) -> Result { diff --git a/src/storage/applydb/genesis.rs b/src/storage/applydb/genesis.rs deleted file mode 100644 index 9e22d8e..0000000 --- a/src/storage/applydb/genesis.rs +++ /dev/null @@ -1,129 +0,0 @@ -use pallas::ledger::addresses::ByronAddress; -use pallas::ledger::configs::byron::GenesisUtxo; -use pallas::ledger::primitives::byron::TxOut; -use rocksdb::WriteBatch; -use tracing::info; - -use crate::{ - prelude::*, - storage::kvtable::{DBSerde, KVTable}, -}; - -use super::{ApplyDB, UtxoKV, UtxoRef}; - -fn build_byron_txout(addr: ByronAddress, amount: u64) -> TxOut { - TxOut { - address: pallas::ledger::primitives::byron::Address { - payload: addr.payload, - crc: addr.crc, - }, - amount, - } -} - -fn genesis_utxo_to_kv(utxo: GenesisUtxo) -> Result<(DBSerde, DBSerde), Error> { - let (tx, addr, amount) = utxo; - - let key = DBSerde(UtxoRef(tx, 0)); - - let txout = build_byron_txout(addr, amount); - let txout = pallas::codec::minicbor::to_vec(txout).map_err(Error::config)?; - let value = DBSerde((0u16, txout)); - - Ok((key, value)) -} - -impl ApplyDB { - pub fn apply_origin( - &self, - byron: &pallas::ledger::configs::byron::GenesisFile, - ) -> Result<(), Error> { - let batch = pallas::ledger::configs::byron::genesis_utxos(byron) - .into_iter() - .map(genesis_utxo_to_kv) - .collect::, _>>()? - .into_iter() - .fold(WriteBatch::default(), |mut batch, (k, v)| { - info!(tx = %k.0 .0, "inserting genesis utxo"); - UtxoKV::stage_upsert(&self.db, k, v, &mut batch); - batch - }); - - self.db.write(batch).map_err(Error::storage) - } -} - -#[cfg(test)] -mod tests { - use pallas::crypto::hash::Hash; - use std::str::FromStr; - - use super::*; - use crate::storage::applydb::tests::with_tmp_db; - - fn assert_genesis_utxo_exists(db: &ApplyDB, tx_hex: &str, addr_base58: &str, amount: u64) { - let tx = Hash::<32>::from_str(tx_hex).unwrap(); - - let utxo_body = db.get_utxo(tx, 0).unwrap(); - - assert!(utxo_body.is_some(), "utxo not found"); - let (era, cbor) = utxo_body.unwrap(); - - assert_eq!(era, 0); - - let txout: Result = - pallas::codec::minicbor::decode(&cbor); - - assert!(txout.is_ok(), "can't parse utxo cbor"); - let txout = txout.unwrap(); - - assert_eq!(txout.amount, amount, "utxo amount doesn't match"); - - let addr = pallas::ledger::addresses::ByronAddress::new( - txout.address.payload.as_ref(), - txout.address.crc, - ); - - assert_eq!(addr.to_base58(), addr_base58); - } - - #[test] - fn test_mainnet_genesis_utxos() { - with_tmp_db(|db| { - let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) - .join("examples") - .join("sync-mainnet") - .join("byron.json"); - - let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); - db.apply_origin(&byron).unwrap(); - - assert_genesis_utxo_exists( - &db, - "0ae3da29711600e94a33fb7441d2e76876a9a1e98b5ebdefbf2e3bc535617616", - "Ae2tdPwUPEZKQuZh2UndEoTKEakMYHGNjJVYmNZgJk2qqgHouxDsA5oT83n", - 2_463_071_701_000_000, - ) - }); - } - - #[test] - fn test_preview_genesis_utxos() { - with_tmp_db(|db| { - let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) - .join("examples") - .join("sync-preview") - .join("byron.json"); - - let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); - db.apply_origin(&byron).unwrap(); - - assert_genesis_utxo_exists( - &db, - "4843cf2e582b2f9ce37600e5ab4cc678991f988f8780fed05407f9537f7712bd", - "FHnt4NL7yPXvDWHa8bVs73UEUdJd64VxWXSFNqetECtYfTd9TtJguJ14Lu3feth", - 30_000_000_000_000_000, - ); - }); - } -} diff --git a/src/storage/applydb/mod.rs b/src/storage/applydb/mod.rs deleted file mode 100644 index 242cca8..0000000 --- a/src/storage/applydb/mod.rs +++ /dev/null @@ -1,636 +0,0 @@ -use pallas::{ - crypto::hash::Hash, - ledger::traverse::{MultiEraBlock, MultiEraTx}, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - path::Path, - sync::Arc, -}; -use thiserror::Error; -use tracing::{error, info}; - -use rocksdb::{Options, WriteBatch, DB}; - -use crate::prelude::BlockHash; - -use super::kvtable::*; - -pub mod genesis; - -type Epoch = u64; -type Era = u16; -type TxHash = Hash<32>; -type OutputIndex = u64; -type UtxoBody = (Era, Vec); -type BlockSlot = u64; -type UpdateBody = (Era, BlockHash, Vec); - -#[derive(Error, Debug)] -pub enum Error { - #[error("data error")] - Data(super::kvtable::Error), - - #[error("missing utxo {0}#{1}")] - MissingUtxo(TxHash, OutputIndex), - - #[error("missing stxi {0}#{1}")] - MissingStxi(TxHash, OutputIndex), - - #[error("missing pparams")] - MissingPParams, - - #[error("cbor decoding")] - Cbor, - - #[error("unimplemented validation for this era")] - UnimplementedEra, -} - -impl From for Error { - fn from(value: super::kvtable::Error) -> Self { - Error::Data(value) - } -} - -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] -pub struct UtxoRef(pub TxHash, pub OutputIndex); - -pub struct UtxoKV; - -impl KVTable, DBSerde> for UtxoKV { - const CF_NAME: &'static str = "UtxoKV"; -} - -// Spent transaction inputs -pub struct StxiKV; - -impl KVTable, DBSerde> for StxiKV { - const CF_NAME: &'static str = "StxiKV"; -} - -pub struct SlotKV; - -#[derive(Serialize, Deserialize)] -pub struct SlotData { - hash: BlockHash, - #[deprecated] - tombstones: Vec, -} - -impl KVTable> for SlotKV { - const CF_NAME: &'static str = "SlotKV"; -} - -// Spent transaction inputs -pub struct PParamsKV; - -impl KVTable> for PParamsKV { - const CF_NAME: &'static str = "PParamsKV"; -} - -pub struct ApplyBatch<'a> { - db: &'a rocksdb::DB, - block_slot: BlockSlot, - block_hash: BlockHash, - utxo_inserts: HashMap, - stxi_inserts: HashMap, - utxo_deletes: HashMap, - pparams_update: Option<(Epoch, UpdateBody)>, -} - -impl<'a> ApplyBatch<'a> { - pub fn new(db: &'a rocksdb::DB, block_slot: BlockSlot, block_hash: BlockHash) -> Self { - Self { - db, - block_slot, - block_hash, - utxo_inserts: HashMap::new(), - stxi_inserts: HashMap::new(), - utxo_deletes: HashMap::new(), - pparams_update: None, - } - } - - pub fn contains_utxo(&self, tx: TxHash, output: OutputIndex) -> bool { - self.utxo_inserts.contains_key(&UtxoRef(tx, output)) - } - - // Meant to be used to get the UTxO associated with a transaction input, - // assuming the current block has already been traversed, appropriately - // filling utxo_inserts and utxo_deletes. - pub fn get_same_block_utxo(&self, tx_hash: TxHash, ind: OutputIndex) -> Option { - // utxo_inserts contains the UTxOs produced in the current block which haven't - // been spent. - self.utxo_inserts - .get(&UtxoRef(tx_hash, ind)) - // utxo_deletes contains UTxOs previously stored in the DB, which we don't care - // about, and UTxOs produced (and spent) by transactions in the current block, - // which we care about. - .or(self.utxo_deletes.get(&UtxoRef(tx_hash, ind))) - .cloned() - } - - pub fn insert_utxo(&mut self, tx: TxHash, output: OutputIndex, body: UtxoBody) { - self.utxo_inserts.insert(UtxoRef(tx, output), body); - } - - pub fn spend_utxo(&mut self, tx: TxHash, idx: OutputIndex, body: UtxoBody) { - info!(%tx, idx, "spending utxo"); - - let k = UtxoRef(tx, idx); - - self.stxi_inserts.insert(k.clone(), body.clone()); - self.utxo_deletes.insert(k.clone(), body); - } - - pub fn spend_utxo_same_block(&mut self, tx: TxHash, idx: OutputIndex) { - info!(%tx, idx, "spending utxo same block"); - - let k = UtxoRef(tx, idx); - - let body = self.utxo_inserts.remove(&k).unwrap(); - - self.stxi_inserts.insert(k.clone(), body.clone()); - self.utxo_deletes.insert(k.clone(), body); - } - - pub fn update_pparams(&mut self, epoch: Epoch, era: Era, block: BlockHash, content: Vec) { - self.pparams_update = Some((epoch, (era, block, content))); - } -} - -impl<'a> From> for WriteBatch { - fn from(from: ApplyBatch<'a>) -> Self { - let mut batch = WriteBatch::default(); - - for (key, value) in from.utxo_inserts { - UtxoKV::stage_upsert(from.db, DBSerde(key), DBSerde(value), &mut batch); - } - - for (key, _) in from.utxo_deletes { - UtxoKV::stage_delete(from.db, DBSerde(key), &mut batch); - } - - for (key, value) in from.stxi_inserts { - StxiKV::stage_upsert(from.db, DBSerde(key), DBSerde(value), &mut batch); - } - - let k = DBInt(from.block_slot); - - #[allow(deprecated)] - let v = DBSerde(SlotData { - hash: from.block_hash, - tombstones: vec![], - }); - - SlotKV::stage_upsert(from.db, k, v, &mut batch); - - if let Some((key, value)) = from.pparams_update { - PParamsKV::stage_upsert(from.db, DBInt(key), DBSerde(value), &mut batch); - } - - batch - } -} - -pub struct UndoBatch<'a> { - db: &'a rocksdb::DB, - block_slot: BlockSlot, - utxo_recovery: HashMap, - stxi_deletes: Vec, - utxo_deletes: HashSet, -} - -impl<'a> UndoBatch<'a> { - pub fn new(db: &'a rocksdb::DB, block_slot: BlockSlot) -> Self { - Self { - db, - block_slot, - utxo_recovery: HashMap::new(), - stxi_deletes: Vec::new(), - utxo_deletes: HashSet::new(), - } - } - - pub fn would_delete_utxo(&self, tx: TxHash, output: OutputIndex) -> bool { - self.utxo_deletes.contains(&UtxoRef(tx, output)) - } - - pub fn unspend_stxi( - &mut self, - tx: TxHash, - output: OutputIndex, - body: UtxoBody, - ) -> Result<(), Error> { - let k = UtxoRef(tx, output); - - self.utxo_recovery.insert(k.clone(), body); - self.stxi_deletes.push(k); - - Ok(()) - } - - pub fn unspend_stxi_same_block( - &mut self, - tx: TxHash, - output: OutputIndex, - ) -> Result<(), Error> { - let k = UtxoRef(tx, output); - - self.utxo_deletes.remove(&k); - self.stxi_deletes.push(k); - - Ok(()) - } - - pub fn delete_utxo(&mut self, tx: TxHash, output: OutputIndex) { - let k = UtxoRef(tx, output); - self.utxo_deletes.insert(k); - } -} - -impl<'a> From> for WriteBatch { - fn from(from: UndoBatch<'a>) -> Self { - let mut batch = WriteBatch::default(); - - for (key, value) in from.utxo_recovery { - UtxoKV::stage_upsert(from.db, DBSerde(key), DBSerde(value), &mut batch); - } - - for key in from.utxo_deletes { - UtxoKV::stage_delete(from.db, DBSerde(key), &mut batch); - } - - for key in from.stxi_deletes { - StxiKV::stage_delete(from.db, DBSerde(key), &mut batch); - } - - let k = DBInt(from.block_slot); - SlotKV::stage_delete(from.db, k, &mut batch); - - batch - } -} - -#[derive(Clone)] -pub struct ApplyDB { - db: Arc, -} - -impl ApplyDB { - pub fn open(path: impl AsRef) -> Result { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let db = DB::open_cf( - &opts, - path, - [ - UtxoKV::CF_NAME, - StxiKV::CF_NAME, - SlotKV::CF_NAME, - PParamsKV::CF_NAME, - ], - ) - .map_err(|_| super::kvtable::Error::IO)?; - - Ok(Self { db: Arc::new(db) }) - } - - pub fn is_empty(&self) -> bool { - SlotKV::is_empty(&self.db) - } - - pub fn cursor(&self) -> Result, Error> { - let v = SlotKV::last_entry(&self.db)?; - let out = v.map(|(s, d)| (s.0, d.0.hash)); - - Ok(out) - } - - pub fn get_utxo(&self, tx: TxHash, output: OutputIndex) -> Result, Error> { - let dbval = UtxoKV::get_by_key(&self.db, DBSerde(UtxoRef(tx, output)))?; - Ok(dbval.map(|x| x.0)) - } - - pub fn get_stxi(&self, tx: TxHash, output: OutputIndex) -> Result, Error> { - let dbval = StxiKV::get_by_key(&self.db, DBSerde(UtxoRef(tx, output)))?; - Ok(dbval.map(|x| x.0)) - } - - pub fn resolve_inputs_for_tx( - &self, - tx: &MultiEraTx<'_>, - utxos: &mut HashMap, - ) -> Result<(), Error> { - for consumed in tx.consumes() { - let hash = *consumed.hash(); - let idx = consumed.index(); - - let utxo_ref = UtxoRef(hash, idx); - - if let Entry::Vacant(e) = utxos.entry(utxo_ref) { - let utxo = self - .get_utxo(hash, idx)? - .ok_or(Error::MissingUtxo(hash, idx))?; - - e.insert(utxo); - }; - } - - Ok(()) - } - - pub fn resolve_inputs_for_block( - &self, - block: &MultiEraBlock<'_>, - utxos: &mut HashMap, - ) -> Result<(), Error> { - let txs = block.txs(); - - for tx in txs.iter() { - for (idx, produced) in tx.produces() { - let body = produced.encode(); - let era = tx.era().into(); - utxos.insert(UtxoRef(tx.hash(), idx as u64), (era, body)); - } - } - - for tx in txs.iter() { - self.resolve_inputs_for_tx(tx, utxos)?; - } - - Ok(()) - } - - pub fn apply_block(&mut self, block: &MultiEraBlock<'_>) -> Result<(), Error> { - let slot = block.slot(); - let hash = block.hash(); - - let mut batch = ApplyBatch::new(&self.db, slot, hash); - - let txs = block.txs(); - - for tx in txs.iter() { - for (idx, produced) in tx.produces() { - let body = produced.encode(); - let era = tx.era().into(); - batch.insert_utxo(tx.hash(), idx as u64, (era, body)); - } - - for consumed in tx.consumes() { - let hash = *consumed.hash(); - let idx = consumed.index(); - - if batch.contains_utxo(hash, idx) { - batch.spend_utxo_same_block(hash, idx); - } else { - let utxo = self - .get_utxo(hash, idx)? - .ok_or(Error::MissingUtxo(hash, idx))?; - - batch.spend_utxo(hash, idx, utxo); - }; - } - - if let Some(update) = tx.update() { - batch.update_pparams( - update.epoch(), - block.era().into(), - block.hash(), - update.encode(), - ); - } - } - - // check block-level updates (because of f#!@#@ byron) - if let Some(update) = block.update() { - batch.update_pparams( - update.epoch(), - block.era().into(), - block.hash(), - update.encode(), - ); - } - - let batch = WriteBatch::from(batch); - - self.db - .write(batch) - .map_err(|_| super::kvtable::Error::IO)?; - - Ok(()) - } - - pub fn get_pparams_updates(&self, until: Epoch) -> Result, Error> { - let matches = PParamsKV::iter_entries_start(&self.db) - .map(|x| x.map(|(k, v)| (k.0, v.unwrap()))) - .collect::, _>>() - .map_err(Error::Data)? - .into_iter() - .filter(|(x, _)| *x <= until) - .map(|(_, v)| v) - .collect(); - - Ok(matches) - } - - pub fn undo_block(&mut self, cbor: &[u8]) -> Result<(), Error> { - let block = MultiEraBlock::decode(cbor).map_err(|_| Error::Cbor)?; - let slot = block.slot(); - - let mut batch = UndoBatch::new(&self.db, slot); - - for tx in block.txs() { - for (idx, _) in tx.produces() { - batch.delete_utxo(tx.hash(), idx as u64); - } - } - - for tx in block.txs() { - for consumed in tx.consumes() { - let hash = consumed.hash(); - let idx = consumed.index(); - - if batch.would_delete_utxo(*hash, idx) { - batch.unspend_stxi_same_block(*hash, idx)?; - } else { - let body = self - .get_stxi(*hash, idx)? - .ok_or(Error::MissingStxi(*hash, idx))?; - - batch.unspend_stxi(*hash, idx, body)?; - } - } - } - - let batch = WriteBatch::from(batch); - - self.db - .write(batch) - .map_err(|_| super::kvtable::Error::IO)?; - - info!(slot, "deleted block"); - - Ok(()) - } - - pub fn compact(&self, _max_slot: u64) -> Result<(), Error> { - // TODO: iterate by slot from start until max slot and delete utxos + tombstone - todo!() - } - - pub fn destroy(path: impl AsRef) -> Result<(), Error> { - DB::destroy(&Options::default(), path).map_err(|_| super::kvtable::Error::IO)?; - - Ok(()) - } - - #[cfg(test)] - pub fn insert_dummy_utxo(&mut self, hash: TxHash, index: OutputIndex) { - let mut batch = WriteBatch::default(); - - UtxoKV::stage_upsert( - &self.db, - DBSerde(UtxoRef(hash, index)), - DBSerde((1, vec![])), - &mut batch, - ); - - self.db.write(batch).unwrap(); - } - - #[cfg(test)] - pub fn insert_dummy_stxi(&mut self, hash: TxHash, index: OutputIndex) { - let mut batch = WriteBatch::default(); - - StxiKV::stage_upsert( - &self.db, - DBSerde(UtxoRef(hash, index)), - DBSerde((1, vec![])), - &mut batch, - ); - - self.db.write(batch).unwrap(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - pub fn with_tmp_db(op: fn(db: ApplyDB) -> ()) { - let path = tempfile::tempdir().unwrap().into_path(); - let db = ApplyDB::open(path.clone()).unwrap(); - - op(db); - - ApplyDB::destroy(path).unwrap(); - } - - fn load_test_block(name: &str) -> Vec { - let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) - .join("test_data") - .join(name); - - let content = std::fs::read_to_string(path).unwrap(); - hex::decode(content).unwrap() - } - - #[test] - fn test_apply_block() { - with_tmp_db(|mut db| { - // nice block with several txs, it includes chaining edge case - let cbor = load_test_block("alonzo27.block"); - - let block = MultiEraBlock::decode(&cbor).unwrap(); - - let block_txs: Vec<_> = block.txs().iter().map(|tx| tx.hash()).collect(); - - for tx in block.txs() { - for input in tx.consumes() { - // skip inserting dummy utxo if it's part of the current block - if block_txs.contains(input.hash()) { - continue; - } - - db.insert_dummy_utxo(*input.hash(), input.index()); - } - } - - db.apply_block(&block).unwrap(); - - for tx in block.txs() { - for input in tx.consumes() { - // assert that consumed utxos are no longer in the unspent set - let utxo = db.get_utxo(*input.hash(), input.index()).unwrap(); - assert!(utxo.is_none()); - - // assert that consumed utxos moved to the spent set - let stxi = db.get_stxi(*input.hash(), input.index()).unwrap(); - assert!(stxi.is_some()); - } - - for (idx, _) in tx.produces() { - let utxo = db.get_utxo(tx.hash(), idx as u64).unwrap(); - let stxi = db.get_stxi(tx.hash(), idx as u64).unwrap(); - - // assert that produced utxos were added to either unspent or spent set - assert_ne!(utxo.is_some(), stxi.is_some()); - } - } - }); - } - - #[test] - fn test_undo_block() { - with_tmp_db(|mut db| { - // nice block with several txs, it includes chaining edge case - let cbor = load_test_block("alonzo27.block"); - - let block = MultiEraBlock::decode(&cbor).unwrap(); - - let block_txs: Vec<_> = block.txs().iter().map(|tx| tx.hash()).collect(); - - for tx in block.txs() { - for input in tx.consumes() { - // skip inserting dummy stxi if it's part of the current block - if block_txs.contains(input.hash()) { - continue; - } - - db.insert_dummy_stxi(*input.hash(), input.index()); - } - } - - db.undo_block(&cbor).unwrap(); - - for tx in block.txs() { - for input in tx.consumes() { - // assert that consumed utxos go back to the unspent set, unless they are from - // the same block - let utxo = db.get_utxo(*input.hash(), input.index()).unwrap(); - - if block_txs.contains(input.hash()) { - assert!(utxo.is_none()); - } else { - assert!(utxo.is_some()); - } - - // assert that consumed utxos are no longer in the spent set - let stxi = db.get_stxi(*input.hash(), input.index()).unwrap(); - assert!(stxi.is_none()); - } - - for (idx, _) in tx.produces() { - // assert that produced utxos are no longer in the unspent set - let utxo = db.get_utxo(tx.hash(), idx as u64).unwrap(); - assert!(utxo.is_none()); - } - } - }); - } -} diff --git a/src/storage/kvtable.rs b/src/storage/kvtable.rs deleted file mode 100644 index e7ee80a..0000000 --- a/src/storage/kvtable.rs +++ /dev/null @@ -1,461 +0,0 @@ -use pallas::crypto::hash::Hash; -use serde::{de::DeserializeOwned, Serialize}; -use std::marker::PhantomData; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum Error { - #[error("IO error")] - IO, - - #[error("serde error")] - Serde, - - #[error("not found")] - NotFound, - - #[error("cbor decoding")] - Cbor, -} - -#[derive(PartialEq, Eq)] -pub struct DBHash(pub Hash<32>); - -impl From> for DBHash { - fn from(value: Box<[u8]>) -> Self { - let inner: [u8; 32] = value[0..32].try_into().unwrap(); - let inner = Hash::<32>::from(inner); - Self(inner) - } -} - -impl From for Box<[u8]> { - fn from(value: DBHash) -> Self { - let b = value.0.to_vec(); - b.into() - } -} - -impl From> for DBHash { - fn from(value: Hash<32>) -> Self { - DBHash(value) - } -} - -impl From for Hash<32> { - fn from(value: DBHash) -> Self { - value.0 - } -} - -#[derive(PartialEq, Eq)] -pub struct DBInt(pub u64); - -impl From for Box<[u8]> { - fn from(value: DBInt) -> Self { - let b = value.0.to_be_bytes(); - Box::new(b) - } -} - -impl From> for DBInt { - fn from(value: Box<[u8]>) -> Self { - let inner: [u8; 8] = value[0..8].try_into().unwrap(); - let inner = u64::from_be_bytes(inner); - Self(inner) - } -} - -impl From for DBInt { - fn from(value: u64) -> Self { - DBInt(value) - } -} - -impl From for u64 { - fn from(value: DBInt) -> Self { - value.0 - } -} - -#[derive(PartialEq, Eq)] -pub struct DBBytes(pub Vec); - -impl From for Box<[u8]> { - fn from(value: DBBytes) -> Self { - value.0.into() - } -} - -impl From> for DBBytes { - fn from(value: Box<[u8]>) -> Self { - Self(value.into()) - } -} - -impl From> for DBBytes -where - V: Serialize, -{ - fn from(value: DBSerde) -> Self { - let inner = bincode::serialize(&value.0).unwrap(); - DBBytes(inner) - } -} - -pub struct DBSerde(pub V); - -impl std::ops::Deref for DBSerde { - type Target = V; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From> for Box<[u8]> -where - V: Serialize, -{ - fn from(v: DBSerde) -> Self { - bincode::serialize(&v.0) - .map(|x| x.into_boxed_slice()) - .unwrap() - } -} - -impl From> for DBSerde -where - V: DeserializeOwned, -{ - fn from(value: Box<[u8]>) -> Self { - let inner = bincode::deserialize(&value).unwrap(); - DBSerde(inner) - } -} - -impl From for DBSerde -where - V: DeserializeOwned, -{ - fn from(value: DBBytes) -> Self { - let inner = bincode::deserialize(&value.0).unwrap(); - DBSerde(inner) - } -} - -impl Clone for DBSerde -where - V: Clone, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl DBSerde { - pub fn unwrap(self) -> T { - self.0 - } -} - -pub struct WithDBIntPrefix(pub u64, pub T); - -impl From> for Box<[u8]> -where - Box<[u8]>: From, -{ - fn from(value: WithDBIntPrefix) -> Self { - let prefix: Box<[u8]> = DBInt(value.0).into(); - let after: Box<[u8]> = value.1.into(); - - [prefix, after].concat().into() - } -} - -impl From> for WithDBIntPrefix { - fn from(_value: Box<[u8]>) -> Self { - todo!() - } -} - -type RocksIterator<'a> = rocksdb::DBIteratorWithThreadMode<'a, rocksdb::DB>; - -pub struct ValueIterator<'a, V>(RocksIterator<'a>, PhantomData); - -impl<'a, V> ValueIterator<'a, V> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, V> Iterator for ValueIterator<'a, V> -where - V: From>, -{ - type Item = Result; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((_, value))) => Some(Ok(V::from(value))), - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub struct KeyIterator<'a, K>(RocksIterator<'a>, PhantomData); - -impl<'a, K> KeyIterator<'a, K> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, K> Iterator for KeyIterator<'a, K> -where - K: From>, -{ - type Item = Result; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((key, _))) => Some(Ok(K::from(key))), - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub struct EntryIterator<'a, K, V>(RocksIterator<'a>, PhantomData<(K, V)>); - -impl<'a, K, V> EntryIterator<'a, K, V> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, K, V> Iterator for EntryIterator<'a, K, V> -where - K: From>, - V: From>, -{ - type Item = Result<(K, V), Error>; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((key, value))) => { - let key_out = K::from(key); - let value_out = V::from(value); - - Some(Ok((key_out, value_out))) - } - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub trait KVTable -where - Box<[u8]>: From, - Box<[u8]>: From, - K: From>, - V: From>, -{ - const CF_NAME: &'static str; - - fn cf(db: &rocksdb::DB) -> rocksdb::ColumnFamilyRef { - db.cf_handle(Self::CF_NAME).unwrap() - } - - fn reset(db: &rocksdb::DB) -> Result<(), Error> { - db.drop_cf(Self::CF_NAME).map_err(|_| Error::IO)?; - - db.create_cf(Self::CF_NAME, &rocksdb::Options::default()) - .map_err(|_| Error::IO)?; - - Ok(()) - } - - fn get_by_key(db: &rocksdb::DB, k: K) -> Result, Error> { - let cf = Self::cf(db); - let raw_key = Box::<[u8]>::from(k); - let raw_value = db - .get_cf(&cf, raw_key) - .map_err(|_| Error::IO)? - .map(|x| Box::from(x.as_slice())); - - match raw_value { - Some(x) => { - let out = ::from(x); - Ok(Some(out)) - } - None => Ok(None), - } - } - - fn stage_upsert(db: &rocksdb::DB, k: K, v: V, batch: &mut rocksdb::WriteBatch) { - let cf = Self::cf(db); - - let k_raw = Box::<[u8]>::from(k); - let v_raw = Box::<[u8]>::from(v); - - batch.put_cf(&cf, k_raw, v_raw); - } - - fn is_empty(db: &rocksdb::DB) -> bool { - // HACK: can't find an easy way to size the num of keys, so we'll start an - // iterator and see if we have at least one value. If someone know a better way - // to accomplish this, please refactor. - let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::Start); - iter.next().is_none() - } - - fn iter_keys<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> KeyIterator<'a, K> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - KeyIterator::new(inner) - } - - fn iter_keys_start(db: &rocksdb::DB) -> KeyIterator<'_, K> { - Self::iter_keys(db, rocksdb::IteratorMode::Start) - } - - fn iter_keys_from(db: &rocksdb::DB, from: K) -> KeyIterator<'_, K> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_keys(db, mode) - } - - fn iter_values<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> ValueIterator<'a, V> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - ValueIterator::new(inner) - } - - fn iter_values_start(db: &rocksdb::DB) -> ValueIterator<'_, V> { - Self::iter_values(db, rocksdb::IteratorMode::Start) - } - - fn iter_values_from(db: &rocksdb::DB, from: K) -> ValueIterator<'_, V> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_values(db, mode) - } - - fn iter_values_from_reverse(db: &rocksdb::DB, from: K) -> ValueIterator<'_, V> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Reverse); - - Self::iter_values(db, mode) - } - - fn iter_entries<'a>( - db: &'a rocksdb::DB, - mode: rocksdb::IteratorMode, - ) -> EntryIterator<'a, K, V> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - EntryIterator::new(inner) - } - - fn iter_entries_start(db: &rocksdb::DB) -> EntryIterator<'_, K, V> { - Self::iter_entries(db, rocksdb::IteratorMode::Start) - } - - fn iter_entries_from(db: &rocksdb::DB, from: K) -> EntryIterator<'_, K, V> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_entries(db, mode) - } - - fn last_key(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - fn last_value(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_values(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - fn last_entry(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_entries(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - fn scan_until( - db: &rocksdb::DB, - mode: rocksdb::IteratorMode, - predicate: F, - ) -> Result, Error> - where - F: Fn(&V) -> bool, - { - for (k, v) in Self::iter_entries(db, mode).flatten() { - if predicate(&v) { - return Ok(Some(k)); - } - } - - Ok(None) - } - - fn stage_delete(db: &rocksdb::DB, key: K, batch: &mut rocksdb::WriteBatch) { - let cf = Self::cf(db); - let k_raw = Box::<[u8]>::from(key); - batch.delete_cf(&cf, k_raw); - } -} - -pub struct AnyTable; - -pub trait AnyValue: Serialize + DeserializeOwned { - fn type_key() -> DBInt; -} - -impl KVTable for AnyTable { - const CF_NAME: &'static str = "AnyKV"; -} - -impl AnyTable { - pub fn stage_upsert_any(db: &rocksdb::DB, v: T, batch: &mut rocksdb::WriteBatch) { - let k = T::type_key(); - let v = DBSerde(v).into(); - Self::stage_upsert(db, k, v, batch) - } - - pub fn get(db: &rocksdb::DB) -> Result, Error> { - let k = T::type_key(); - let v: Option = AnyTable::get_by_key(db, k)? - .map(DBSerde::::from) - .map(|x| x.0); - - Ok(v) - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs deleted file mode 100644 index c56db30..0000000 --- a/src/storage/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod applydb; -pub mod kvtable; diff --git a/src/sync/ledger.rs b/src/sync/ledger.rs index 13be768..f1f2740 100644 --- a/src/sync/ledger.rs +++ b/src/sync/ledger.rs @@ -1,28 +1,17 @@ use gasket::framework::*; -use pallas::applying::{validate, Environment as ValidationContext, UTxOs}; -use pallas::ledger::configs::{byron, shelley}; -use pallas::ledger::traverse::{Era, MultiEraBlock, MultiEraInput, MultiEraOutput}; -use std::borrow::Cow; -use std::collections::HashMap; -use tracing::{debug, info, warn}; +use pallas::ledger::configs::byron; +use pallas::ledger::traverse::MultiEraBlock; +use tracing::info; use crate::prelude::*; -use crate::storage::applydb::ApplyDB; pub type UpstreamPort = gasket::messaging::tokio::InputPort; #[derive(Stage)] #[stage(name = "ledger", unit = "RollEvent", worker = "Worker")] pub struct Stage { - ledger: ApplyDB, + ledger: crate::ledger::store::LedgerStore, byron: byron::GenesisFile, - shelley: shelley::GenesisFile, - network_magic: u64, - network_id: u8, - - current_pparams: Option<(u64, ValidationContext)>, - - phase1_validation_enabled: bool, pub upstream: UpstreamPort, @@ -34,117 +23,15 @@ pub struct Stage { } impl Stage { - pub fn new( - ledger: ApplyDB, - byron: byron::GenesisFile, - shelley: shelley::GenesisFile, - phase1_validation_enabled: bool, - network_magic: u64, - network_id: u8, - ) -> Self { + pub fn new(ledger: crate::ledger::store::LedgerStore, byron: byron::GenesisFile) -> Self { Self { ledger, byron, - shelley, - network_magic, - network_id, - current_pparams: None, - phase1_validation_enabled, upstream: Default::default(), block_count: Default::default(), wal_count: Default::default(), } } - - // Temporal workaround while we fix the GenesisValues mess we have in Pallas. - fn compute_epoch(&mut self, block: &MultiEraBlock) -> u64 { - let slot_length = self - .shelley - .slot_length - .expect("shelley genesis didn't provide a slot length"); - - let epoch_length = self - .shelley - .epoch_length - .expect("shelley genesis didn't provide an epoch lenght"); - - (block.slot() * slot_length as u64) / epoch_length as u64 - } - - fn ensure_pparams(&mut self, block: &MultiEraBlock) -> Result<(), WorkerError> { - let epoch = self.compute_epoch(block); - - if self - .current_pparams - .as_ref() - .is_some_and(|(current, _)| *current == epoch) - { - return Ok(()); - } - - let pparams = crate::pparams::compute_pparams( - crate::pparams::Genesis { - byron: &self.byron, - shelley: &self.shelley, - }, - &self.ledger, - epoch, - )?; - - warn!(?pparams, "pparams for new epoch"); - - let context = ValidationContext { - block_slot: block.slot(), - prot_magic: self.network_magic as u32, - network_id: self.network_id, - prot_params: pparams, - }; - - self.current_pparams = Some((epoch, context)); - - Ok(()) - } - - pub fn execute_phase1_validation(&self, block: &MultiEraBlock<'_>) -> Result<(), WorkerError> { - let mut utxos = HashMap::new(); - self.ledger - .resolve_inputs_for_block(block, &mut utxos) - .or_panic()?; - - let mut utxos2 = UTxOs::new(); - - for (ref_, output) in utxos.iter() { - let txin = pallas::ledger::primitives::byron::TxIn::Variant0( - pallas::codec::utils::CborWrap((ref_.0, ref_.1 as u32)), - ); - - let key = MultiEraInput::Byron( - >>::from(Cow::Owned(txin)), - ); - - let era = Era::try_from(output.0).or_panic()?; - let value = MultiEraOutput::decode(era, &output.1).or_panic()?; - - utxos2.insert(key, value); - } - - let context = self - .current_pparams - .as_ref() - .map(|(_, x)| x) - .ok_or("no pparams available") - .or_panic()?; - - for tx in block.txs().iter() { - let res = validate(tx, &utxos2, &context); - - if let Err(err) = res { - warn!(?err, "validation error"); - } - } - - Ok(()) - } } pub struct Worker; @@ -171,21 +58,22 @@ impl gasket::framework::Worker for Worker { let block = MultiEraBlock::decode(cbor).or_panic()?; - if stage.phase1_validation_enabled { - debug!("performing phase-1 validations"); - stage.ensure_pparams(&block)?; - stage.execute_phase1_validation(&block)?; - } - - stage.ledger.apply_block(&block).or_panic()?; + let delta = crate::ledger::compute_delta(&block); + stage.ledger.apply(&[delta]).or_panic()?; } RollEvent::Undo(slot, _, cbor) => { info!(slot, "undoing block"); - stage.ledger.undo_block(cbor).or_panic()?; + + let block = MultiEraBlock::decode(cbor).or_panic()?; + + let delta = crate::ledger::compute_undo_delta(&block); + stage.ledger.apply(&[delta]).or_panic()?; } RollEvent::Origin => { info!("applying origin"); - stage.ledger.apply_origin(&stage.byron).or_panic()?; + + let delta = crate::ledger::compute_origin_delta(&stage.byron); + stage.ledger.apply(&[delta]).or_panic()?; } }; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 547f9c3..1284d5e 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -7,8 +7,9 @@ use pallas::storage::rolldb::wal::Store as WalStore; use serde::Deserialize; use tracing::info; +use crate::ledger::store::LedgerStore; +use crate::ledger::ChainPoint; use crate::prelude::*; -use crate::storage::applydb::ApplyDB; pub mod chain; pub mod ledger; @@ -48,9 +49,9 @@ pub fn pipeline( upstream: &UpstreamConfig, wal: WalStore, chain: ChainStore, - ledger: ApplyDB, + ledger: LedgerStore, byron: byron::GenesisFile, - shelley: shelley::GenesisFile, + _shelley: shelley::GenesisFile, retries: &Option, ) -> Result, Error> { let pull_cursor = wal @@ -69,19 +70,16 @@ pub fn pipeline( let cursor_chain = chain.find_tip().map_err(Error::storage)?; info!(?cursor_chain, "chain cursor found"); - let cursor_ledger = ledger.cursor().map_err(Error::storage)?; + let cursor_ledger = ledger + .cursor() + .map_err(Error::storage)? + .map(|ChainPoint(a, b)| (a, b)); + info!(?cursor_ledger, "ledger cursor found"); let mut roll = roll::Stage::new(wal, cursor_chain, cursor_ledger); let mut chain = chain::Stage::new(chain); - let mut ledger = ledger::Stage::new( - ledger, - byron, - shelley, - config.phase1_validation_enabled, - config.network_magic, - config.network_id, - ); + let mut ledger = ledger::Stage::new(ledger, byron); let (to_roll, from_pull) = gasket::messaging::tokio::mpsc_channel(50); pull.downstream.connect(to_roll);