diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic
index b2e068f2cbe..7241af25173 100644
--- a/.config/dictionaries/project.dic
+++ b/.config/dictionaries/project.dic
@@ -1,4 +1,5 @@
aarch
+abcz
ABNF
abnf
addrr
@@ -43,12 +44,11 @@ crontagged
cstring
dalek
dashmap
-dashmap
-Datelike
Datelike
DBSTATUS
dbsync
dcbor
+decompressor
delegators
dockerhub
Dominik
@@ -72,6 +72,7 @@ filestat
filestorage
filesystems
fkey
+fmmap
fmtchk
fmtfix
fontawesome
@@ -86,9 +87,12 @@ gmtime
gossipsub
happ
hardano
+Hardlink
hasher
+hexdigit
highwater
hmod
+humantime
ideascale
idents
IFMT
@@ -96,7 +100,6 @@ Intellij
ioerr
iohk
ipfs
-ipfs
ipld
jetbrains
jorm
@@ -116,13 +119,16 @@ lintfix
lipsum
livedocs
localizable
+logcall
lookaside
maindbname
mapref
mdlint
mdns
+memx
Metadatum
metadatum
+mimalloc
minicbor
miniprotocol
miniprotocols
@@ -131,7 +137,10 @@ mitigations
mkcron
mkdelay
mkdirat
+Mmap
moderations
+moka
+MPMC
msvc
Multiaddr
multiera
@@ -177,6 +186,7 @@ redoc
REMOVEDIR
renameat
Replayability
+Repr
reqwest
retriggering
rlib
@@ -199,7 +209,9 @@ scanstatus
Sched
scrollability
seckey
+Signdata
sitedocs
+skiplist
slotno
smac
stevenj
@@ -221,12 +233,14 @@ tinyvec
toobig
toolsets
Traceback
+txmonitor
txns
typenum
unfinalized
unixfs
unlinkat
upnp
+ureq
userid
utimensat
UTXO
@@ -236,11 +250,10 @@ vkey
vkeywitness
voteplan
voteplans
+wallclock
WASI
wasi
wasip
-wasip
-wasip
wasmtime
webasm
webassembly
diff --git a/rust/cardano-chain-follower/testbed/Cargo.toml b/integration_tests/rust/Cargo.toml
similarity index 91%
rename from rust/cardano-chain-follower/testbed/Cargo.toml
rename to integration_tests/rust/Cargo.toml
index aaadca1e9ef..0d6087382ac 100644
--- a/rust/cardano-chain-follower/testbed/Cargo.toml
+++ b/integration_tests/rust/Cargo.toml
@@ -23,11 +23,9 @@ bare_urls = "deny"
unescaped_backticks = "deny"
[workspace.lints.clippy]
-pedantic = { level = "deny", priority = -1 }
+pedantic = "deny"
unwrap_used = "deny"
expect_used = "deny"
-todo = "deny"
-unimplemented = "deny"
exit = "deny"
get_unwrap = "deny"
index_refutable_slice = "deny"
diff --git a/integration_tests/rust/Earthfile b/integration_tests/rust/Earthfile
new file mode 100644
index 00000000000..586442946f8
--- /dev/null
+++ b/integration_tests/rust/Earthfile
@@ -0,0 +1,25 @@
+VERSION 0.8
+
+IMPORT github.com/input-output-hk/catalyst-ci/earthly/mithril_snapshot:v3.1.8 AS mithril-snapshot-ci
+IMPORT ../../rust AS rust-workspace
+
+local-build:
+ FROM rust-workspace+builder
+
+ WORKDIR crates/cardano-chain-follower/testbed
+ RUN cargo build -p overhead_benchmark --release
+
+ SAVE ARTIFACT target/release/overhead_benchmark overhead_benchmark
+
+local-run-preprod:
+ ARG --required BENCH_NAME
+
+ FROM +local-build
+
+ COPY --dir mithril-snapshot-ci+package-preprod-snapshot/snapshot/immutable mithril_snapshot
+ COPY +build/overhead_benchmark overhead_benchmark_bin
+ RUN ./overhead_benchmark_bin --bench-name $BENCH_NAME --mithril-snapshot-path ./mithril_snapshot
+
+local-save-preprod-snapshot:
+ FROM mithril-snapshot-ci+package-preprod-snapshot
+ SAVE ARTIFACT immutable AS LOCAL local_preprod_mithril_snapshot
diff --git a/rust/cardano-chain-follower/testbed/README.md b/integration_tests/rust/README.md
similarity index 100%
rename from rust/cardano-chain-follower/testbed/README.md
rename to integration_tests/rust/README.md
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/.gitignore b/integration_tests/rust/overhead_benchmark/.gitignore
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/.gitignore
rename to integration_tests/rust/overhead_benchmark/.gitignore
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/Cargo.toml b/integration_tests/rust/overhead_benchmark/Cargo.toml
similarity index 87%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/Cargo.toml
rename to integration_tests/rust/overhead_benchmark/Cargo.toml
index 660a5767668..fbdb4d2b49c 100644
--- a/rust/cardano-chain-follower/testbed/overhead_benchmark/Cargo.toml
+++ b/integration_tests/rust/overhead_benchmark/Cargo.toml
@@ -11,6 +11,6 @@ cardano-chain-follower = { path = "../.." }
anyhow = "1.0.82"
clap = { version = "4.5.4", features = ["derive", "help", "usage", "std"], default-features = false }
-pallas-traverse = "0.24.0"
-pallas-hardano = "0.24.0"
+pallas-traverse = "0.30.1"
+pallas-hardano = "0.30.1"
tokio = { version = "1.37.0", features = ["macros", "sync", "rt-multi-thread", "rt", "net"] }
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/README.md b/integration_tests/rust/overhead_benchmark/README.md
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/README.md
rename to integration_tests/rust/overhead_benchmark/README.md
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/cardano_chain_follower.rs b/integration_tests/rust/overhead_benchmark/src/benchs/cardano_chain_follower.rs
similarity index 90%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/cardano_chain_follower.rs
rename to integration_tests/rust/overhead_benchmark/src/benchs/cardano_chain_follower.rs
index 3515471390e..f0e850d33de 100644
--- a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/cardano_chain_follower.rs
+++ b/integration_tests/rust/overhead_benchmark/src/benchs/cardano_chain_follower.rs
@@ -32,7 +32,10 @@ pub async fn run(params: BenchmarkParams) -> anyhow::Result<()> {
let update = follower.next().await?;
match update {
- ChainUpdate::Block(raw_block_data) => {
+ ChainUpdate::ImmutableBlockRollback(data)
+ | ChainUpdate::BlockTip(data)
+ | ChainUpdate::ImmutableBlock(data)
+ | ChainUpdate::Block(raw_block_data) => {
let block_data = raw_block_data.decode()?;
monitor_task_handle.send_update(monitor::BenchmarkStats {
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/mod.rs b/integration_tests/rust/overhead_benchmark/src/benchs/mod.rs
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/mod.rs
rename to integration_tests/rust/overhead_benchmark/src/benchs/mod.rs
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/monitor.rs b/integration_tests/rust/overhead_benchmark/src/benchs/monitor.rs
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/monitor.rs
rename to integration_tests/rust/overhead_benchmark/src/benchs/monitor.rs
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/pallas.rs b/integration_tests/rust/overhead_benchmark/src/benchs/pallas.rs
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/src/benchs/pallas.rs
rename to integration_tests/rust/overhead_benchmark/src/benchs/pallas.rs
diff --git a/rust/cardano-chain-follower/testbed/overhead_benchmark/src/main.rs b/integration_tests/rust/overhead_benchmark/src/main.rs
similarity index 100%
rename from rust/cardano-chain-follower/testbed/overhead_benchmark/src/main.rs
rename to integration_tests/rust/overhead_benchmark/src/main.rs
diff --git a/rust/cardano-chain-follower/testbed/rust-toolchain.toml b/integration_tests/rust/rust-toolchain.toml
similarity index 65%
rename from rust/cardano-chain-follower/testbed/rust-toolchain.toml
rename to integration_tests/rust/rust-toolchain.toml
index 2c1a03c1aea..21c73f28f46 100644
--- a/rust/cardano-chain-follower/testbed/rust-toolchain.toml
+++ b/integration_tests/rust/rust-toolchain.toml
@@ -1,3 +1,3 @@
[toolchain]
-channel = "1.78"
+channel = "1.80"
profile = "default"
diff --git a/rust/Earthfile b/rust/Earthfile
index 6e1d8f8d4d7..319dc2df31f 100644
--- a/rust/Earthfile
+++ b/rust/Earthfile
@@ -1,6 +1,7 @@
VERSION 0.8
-IMPORT github.com/input-output-hk/catalyst-ci/earthly/rust:v3.2.00 AS rust-ci
+IMPORT github.com/input-output-hk/catalyst-ci/earthly/rust:feat/cardano-chain-follower-changes AS rust-ci
+#IMPORT github.com/input-output-hk/catalyst-ci/earthly/rust:v3.2.00 AS rust-ci
# Use when debugging cat-ci locally.
# IMPORT ../../catalyst-ci/earthly/rust AS rust-ci
diff --git a/rust/cardano-chain-follower/Cargo.toml b/rust/cardano-chain-follower/Cargo.toml
index e8d6540fac4..12a68a66718 100644
--- a/rust/cardano-chain-follower/Cargo.toml
+++ b/rust/cardano-chain-follower/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "cardano-chain-follower"
-version = "0.0.1"
+version = "0.0.2"
edition.workspace = true
authors.workspace = true
homepage.workspace = true
@@ -11,13 +11,85 @@ license.workspace = true
workspace = true
[dependencies]
-pallas = { git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3", version = "0.30.1" }
-pallas-hardano = { git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3", version = "0.30.1" }
+pallas = { version = "0.30.1", git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3" }
+pallas-hardano = { version = "0.30.1", git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3" }
+pallas-crypto = { version = "0.30.1", git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3" }
+
+# cspell: words licence
+mithril-client = { version = "0.8.16", git = "https://github.com/input-output-hk/catalyst-mithril.git", branch = "fix/lgpl-licence", default-features = false, features = [
+ "full",
+ "num-integer-backend",
+] }
+
+c509-certificate = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "v0.0.1" }
thiserror = "1.0.56"
-tokio = { version = "1.36.0", features = ["macros", "rt", "net", "rt-multi-thread"] }
+tokio = { version = "1.40.0", features = [
+ "macros",
+ "rt",
+ "net",
+ "rt-multi-thread",
+] }
tracing = "0.1.40"
+tracing-log = "0.2.0"
+dashmap = "6.0.1"
+url = "2.5.0"
+anyhow = "1.0.86"
+chrono = "0.4.38"
+async-trait = "0.1.82"
+dirs = "5.0.1"
+futures = "0.3.30"
+humantime = "2.1.0"
+crossbeam-skiplist = "0.1.3"
+crossbeam-channel = "0.5.13"
+crossbeam-epoch = "0.9.18"
+strum = "0.26.3"
+ouroboros = "0.18.4"
+hex = "0.4.3"
+rayon = "1.10.0"
+serde = "1.0.209"
+serde_json = "1.0.128"
+mimalloc = { version = "0.1.43", optional = true }
+memx = "0.1.32"
+fmmap = { version = "0.3.3", features = ["sync", "tokio-async"] }
+minicbor = { version = "0.24.4", features = ["alloc", "derive", "half"] }
+brotli = "6.0.0"
+zstd = "0.13.2"
+x509-cert = "0.2.5"
+ed25519-dalek = "2.1.1"
+blake2b_simd = "1.0.2"
+num-traits = "0.2.19"
+logcall = "0.1.9"
+tar = "0.4.41"
+ureq = { version = "2.10.1", features = ["native-certs"] }
+http = "1.1.0"
+hickory-resolver = { version = "0.24.1", features = ["dns-over-rustls"] }
+moka = { version = "0.12.8", features = ["sync"] }
[dev-dependencies]
hex = "0.4.3"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+test-log = { version = "0.2.16", default-features = false, features = [
+ "trace",
+] }
+clap = "4.5.17"
+
+# Note, these features are for support of features exposed by dependencies.
+[features]
+default = ["rustls-tls-native-roots"]
+
+# Enable the MiMalloc global allocator
+# Only used for examples.
+mimalloc = ["dep:mimalloc"]
+
+# These features are for support of dependent crates only.
+# They do not change the operation of the main crate.
+# cspell: words alpn
+native-tls = ["mithril-client/native-tls"]
+native-tls-alpn = ["mithril-client/native-tls-alpn"]
+native-tls-vendored = ["mithril-client/native-tls-vendored"]
+
+rustls-tls = ["mithril-client/rustls-tls"]
+rustls-tls-manual-roots = ["mithril-client/rustls-tls-manual-roots"]
+rustls-tls-webpki-roots = ["mithril-client/rustls-tls-webpki-roots"]
+rustls-tls-native-roots = ["mithril-client/rustls-tls-native-roots"]
diff --git a/rust/cardano-chain-follower/Readme.md b/rust/cardano-chain-follower/Readme.md
new file mode 100644
index 00000000000..9e8a4744fb2
--- /dev/null
+++ b/rust/cardano-chain-follower/Readme.md
@@ -0,0 +1,59 @@
+# Things that need fixing
+
+The following fixes would be nice to have to:
+
+1. Improve sync times.
+2. Decrease disk utilization.
+3. Eliminate external dependencies.
+
+## Parallel downloading requires external tool
+
+We currently require an external tool `aria2c` to download the Mithril snapshot.
+We should have a native version to remove this external tool dependency.
+
+See:
+For a simple version of such we could adapt.
+
+The first version should just replace `Aria2c` and download to a file.
+
+Ideally, we would have an in-memory queue that downloads in parallel, rather than saving to disk.
+This would need to use something like a skip-map to re-order the blocks, and a pool of workers to download the next blocks.
+It's not trivial, but it would remove the necessity to store the actual snapshot archive on-disk.
+
+It's not possible to download the snapshot archive to ram because it is enormous.
+
+## Zstd decompress and tar extraction optimization
+
+Currently, an async zstd decompress and tar extraction is used.
+This is known to be slow, and we are CPU bound doing it.
+
+Change this to run in a Thread outside async and use the zstd library, which links to the C zstd library directly.
+And the non async tar extraction library.
+
+This will speed up extracting files from the archive.
+
+This would be better also if we had synchronous piped downloading as mentioned above.
+
+## Block Decode Optimization
+
+Currently, to enforce and validate chain integrity, we need to decode the blocks all over the place.
+Decoding blocks is expensive, and this is wasteful.
+As the application will almost certainly require the block to be decoded, it makes sense for it to happen once in a uniform way.
+We would then pass the decoded block to the application saving it the effort of doing it, itself.
+
+We should Decode LIVE blocks once when we receive them from the network,
+and then keep the decoded as well as raw block data in memory.
+
+For Immutable blocks, we should decode them ONCE when we read them from disk.
+
+## Immutable Queue Optimization
+
+The Immutable follower reads from disk, inline.
+Disk IO is relatively expensive.
+Decoding blocks is also expensive, it's better to do that in parallel with an application processing a previous block.
+
+What we should do is have a read ahead queue, where a second task is reading ahead of the application following,
+reading the next blocks from disk, and decoding them.
+
+The main follower used by the application then reads from this red ahead queue.
+This would help us better utilize disk and CPU resources, which would result in improved sync times.
diff --git a/rust/cardano-chain-follower/examples/concurrent_reads.rs b/rust/cardano-chain-follower/examples/concurrent_reads.rs
deleted file mode 100644
index 31daf87f5ed..00000000000
--- a/rust/cardano-chain-follower/examples/concurrent_reads.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-//! This example shows how to use the chain follower to download arbitrary blocks
-//! from the chain concurrently.
-
-use std::error::Error;
-
-use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- let config = FollowerConfigBuilder::default().build();
-
- let follower = Follower::connect(
- "relays-new.cardano-mainnet.iohk.io:3001",
- Network::Mainnet,
- config,
- )
- .await?;
-
- let points = vec![
- Point::Specific(
- 110_908_236,
- hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
- ),
- Point::Specific(
- 110_908_582,
- hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
- ),
- ];
- let mut point_count = points.len();
-
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
-
- for p in points {
- let slot_no = p.slot_or_default();
- let r = follower.read_block(p);
- let r_tx = tx.clone();
-
- tokio::spawn(async move {
- tracing::info!(slot_no, "Reading block");
- let result = r.await;
- drop(r_tx.send(result));
- });
- }
-
- while let Some(result) = rx.recv().await {
- let block_data = result?;
- let block = block_data.decode()?;
-
- let total_fee = block
- .txs()
- .iter()
- .map(|tx| tx.fee().unwrap_or_default())
- .sum::();
-
- println!(
- "Block {} (slot {}) => total fee: {total_fee}",
- block.number(),
- block.slot()
- );
-
- point_count -= 1;
- if point_count == 0 {
- break;
- }
- }
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/follow_chain_updates.rs b/rust/cardano-chain-follower/examples/follow_chain_updates.rs
deleted file mode 100644
index 3aa64f27a6b..00000000000
--- a/rust/cardano-chain-follower/examples/follow_chain_updates.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-//! This example shows how to use the chain follower to follow chain updates on
-//! a Cardano network chain.
-
-use std::error::Error;
-
-use cardano_chain_follower::{ChainUpdate, Follower, FollowerConfigBuilder, Network};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- // Defaults to start following from the tip.
- let config = FollowerConfigBuilder::default().build();
-
- let mut follower = Follower::connect(
- "relays-new.cardano-mainnet.iohk.io:3001",
- Network::Mainnet,
- config,
- )
- .await?;
-
- // Wait for 3 chain updates and shutdown.
- for _ in 0..3 {
- let chain_update = follower.next().await?;
-
- match chain_update {
- ChainUpdate::Block(data) => {
- let block = data.decode()?;
-
- println!(
- "New block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- ChainUpdate::Rollback(data) => {
- let block = data.decode()?;
-
- println!(
- "Rollback block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- }
- }
-
- // Waits for the follower background task to exit.
- follower.close().await?;
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/follow_chain_updates_mithril.rs b/rust/cardano-chain-follower/examples/follow_chain_updates_mithril.rs
deleted file mode 100644
index ce67c86f459..00000000000
--- a/rust/cardano-chain-follower/examples/follow_chain_updates_mithril.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-//! This example shows how to use the chain follower to follow chain updates on
-//! a Cardano network chain.
-
-// Allowing since this is example code.
-#![allow(clippy::unwrap_used)]
-
-use std::{error::Error, path::PathBuf};
-
-use cardano_chain_follower::{ChainUpdate, Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- // Create a follower config specifying the Mithril snapshot path and
- // to follow from block 1794552 (preprod).
- let config = FollowerConfigBuilder::default()
- .follow_from(Point::Specific(
- 49_075_262,
- hex::decode("e929cd1bf8ec78844ec9ea450111aaf55fbf17540db4b633f27d4503eebf2218")?,
- ))
- .mithril_snapshot_path(
- PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap())
- .join("examples/snapshot_data"),
- )
- .build();
-
- let mut follower = Follower::connect(
- "preprod-node.play.dev.cardano.org:3001",
- Network::Preprod,
- config,
- )
- .await?;
-
- // Wait for some chain updates and shutdown.
- for _ in 0..10 {
- let chain_update = follower.next().await?;
-
- match chain_update {
- ChainUpdate::Block(data) => {
- let block = data.decode()?;
-
- println!(
- "New block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- ChainUpdate::Rollback(data) => {
- let block = data.decode()?;
-
- println!(
- "Rollback block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- }
- }
-
- // Waits for the follower background task to exit.
- follower.close().await?;
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/follow_chains.rs b/rust/cardano-chain-follower/examples/follow_chains.rs
new file mode 100644
index 00000000000..ad5ad3bafa1
--- /dev/null
+++ b/rust/cardano-chain-follower/examples/follow_chains.rs
@@ -0,0 +1,378 @@
+//! This example shows how to use the chain follower to follow all chains, until they have
+//! all reached tip. It will report on how many blocks for each chain exist between eras,
+//! and also how long each chain took to reach its tip.
+
+// Allowing since this is example code.
+//#![allow(clippy::unwrap_used)]
+
+#[cfg(feature = "mimalloc")]
+use mimalloc::MiMalloc;
+
+/// Use Mimalloc for the global allocator.
+#[cfg(feature = "mimalloc")]
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
+use std::{error::Error, time::Duration};
+
+use cardano_chain_follower::{
+ ChainFollower, ChainSyncConfig, ChainUpdate, Kind, Metadata, Network, Statistics, ORIGIN_POINT,
+ TIP_POINT,
+};
+use clap::{arg, ArgAction, ArgMatches, Command};
+use tokio::time::Instant;
+use tracing::{error, info, level_filters::LevelFilter};
+use tracing_subscriber::EnvFilter;
+
+/// Process our CLI Arguments
+fn process_argument() -> (Vec, ArgMatches) {
+ let matches = Command::new("follow_chains")
+ .args(&[
+ arg!(--preprod "Follow Preprod network").action(ArgAction::SetTrue),
+ arg!(--preview "Follow Preview network").action(ArgAction::SetTrue),
+ arg!(--mainnet "Follow Mainnet network").action(ArgAction::SetTrue),
+ arg!(--all "Follow All networks").action(ArgAction::SetTrue),
+ arg!(--"stop-at-tip" "Stop when the tip of the blockchain is reached.")
+ .action(ArgAction::SetTrue),
+ arg!(--"all-live-blocks" "Show all live blocks.").action(ArgAction::SetTrue),
+ arg!(--"all-tip-blocks" "Show all blocks read from the Peer as TIP.")
+ .action(ArgAction::SetTrue),
+ arg!(--"halt-on-error" "Stop the process when an error occurs without retrying.")
+ .action(ArgAction::SetTrue),
+ arg!(--"bad-cip36" "Dump Bad Cip36 registrations detected.")
+ .action(ArgAction::SetTrue),
+ arg!(--"largest-metadata" "Dump The largest transaction metadata we find (as we find it).")
+ .action(ArgAction::SetTrue),
+ arg!(--"mithril-sync-workers" "The number of workers to use when downloading the blockchain snapshot.")
+ .value_parser(clap::value_parser!(u16).range(1..))
+ .action(ArgAction::Set),
+ arg!(--"mithril-sync-chunk-size" "The size in MB of each chunk downloaded by a worker.")
+ .value_parser(clap::value_parser!(u16).range(1..))
+ .action(ArgAction::Set),
+ arg!(--"mithril-sync-queue-ahead" "The number of chunks pre-queued per worker.")
+ .value_parser(clap::value_parser!(u16).range(1..))
+ .action(ArgAction::Set),
+ arg!(--"mithril-sync-connect-timeout" "The HTTP Connection Timeout for mithril downloads, in seconds.")
+ .value_parser(clap::value_parser!(u64).range(1..))
+ .action(ArgAction::Set),
+ arg!(--"mithril-sync-data-read-timeout" "The HTTP Data Read Timeout for mithril downloads, in seconds.")
+ .value_parser(clap::value_parser!(u64).range(1..))
+ .action(ArgAction::Set),
+ ])
+ .get_matches();
+
+ let mut networks = vec![];
+ if matches.get_flag("preprod") || matches.get_flag("all") {
+ networks.push(Network::Preprod);
+ }
+ if matches.get_flag("preview") || matches.get_flag("all") {
+ networks.push(Network::Preview);
+ }
+ if matches.get_flag("mainnet") || matches.get_flag("all") {
+ networks.push(Network::Mainnet);
+ }
+
+ (networks, matches)
+}
+
+/// Start syncing a particular network
+async fn start_sync_for(network: &Network, matches: ArgMatches) -> Result<(), Box> {
+ let mut cfg = ChainSyncConfig::default_for(*network);
+
+ let mut mithril_dl_connect_timeout = "Not Set".to_string();
+ let mut mithril_dl_data_timeout = "Not Set".to_string();
+
+ let mut dl_config = cfg.mithril_cfg.dl_config.clone().unwrap_or_default();
+
+ if let Some(workers) = matches.get_one::("mithril-sync-workers") {
+ dl_config = dl_config.with_workers(*workers as usize);
+ }
+ let mithril_dl_workers = format!("{}", dl_config.workers);
+
+ if let Some(chunk_size) = matches.get_one::("mithril-sync-chunk-size") {
+ dl_config = dl_config.with_chunk_size(*chunk_size as usize * 1024 * 1024);
+ }
+ let mithril_dl_chunk_size = format!("{} MBytes", dl_config.chunk_size / (1024 * 1024));
+
+ if let Some(queue_ahead) = matches.get_one::("mithril-sync-queue-ahead") {
+ dl_config = dl_config.with_queue_ahead(*queue_ahead as usize);
+ }
+ let mithril_dl_queue_ahead = format!("{}", dl_config.queue_ahead);
+
+ if let Some(connect_timeout) = matches.get_one::("mithril-sync-connect-timeout") {
+ dl_config = dl_config.with_connection_timeout(Duration::from_secs(*connect_timeout));
+ }
+ if let Some(connect_timeout) = dl_config.connection_timeout {
+ mithril_dl_connect_timeout = format!("{}", humantime::format_duration(connect_timeout));
+ }
+
+ if let Some(data_timeout) = matches.get_one::("mithril-sync-data-timeout") {
+ dl_config = dl_config.with_connection_timeout(Duration::from_secs(*data_timeout));
+ }
+ if let Some(data_timeout) = dl_config.data_read_timeout {
+ mithril_dl_data_timeout = format!("{}", humantime::format_duration(data_timeout));
+ }
+
+ cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
+
+ info!(
+ chain = cfg.chain.to_string(),
+ mithril_sync_dl_workers = mithril_dl_workers,
+ mithril_sync_dl_chunk_size = mithril_dl_chunk_size,
+ mithril_sync_dl_queue_ahead = mithril_dl_queue_ahead,
+ mithril_sync_dl_connect_timeout = mithril_dl_connect_timeout,
+ mithril_sync_dl_data_read_timeout = mithril_dl_data_timeout,
+ "Starting Sync"
+ );
+
+ if let Err(error) = cfg.run().await {
+ error!("Failed to start sync task for {} : {}", network, error);
+ Err(error)?;
+ }
+
+ Ok(())
+}
+
+/// The interval between showing a block, even if nothing else changed.
+const RUNNING_UPDATE_INTERVAL: u64 = 100_000;
+
+/// Try and follow a chain continuously, from Genesis until Tip.
+#[allow(clippy::too_many_lines)]
+async fn follow_for(network: Network, matches: ArgMatches) {
+ info!(chain = network.to_string(), "Following");
+ let mut follower = ChainFollower::new(network, ORIGIN_POINT, TIP_POINT).await;
+
+ let all_tip_blocks = matches.get_flag("all-tip-blocks");
+ let all_live_blocks = matches.get_flag("all-live-blocks");
+ let stop_at_tip = matches.get_flag("stop-at-tip");
+ let halt_on_error = matches.get_flag("halt-on-error");
+ let bad_cip36 = matches.get_flag("bad-cip36");
+ let largest_metadata = matches.get_flag("largest-metadata");
+
+ let mut current_era = String::new();
+ let mut last_update: Option = None;
+ let mut last_update_shown = false;
+ let mut prev_hash: Option> = None;
+ let mut last_immutable: bool = false;
+ let mut reached_tip = false; // After we reach TIP we show all block we process.
+ let mut updates: u64 = 0;
+ let mut last_fork = 0;
+ let mut follow_all = false;
+
+ let mut last_metrics_time = Instant::now();
+
+ let mut biggest_aux_data: usize = 0;
+
+ while let Some(chain_update) = follower.next().await {
+ updates += 1;
+
+ if chain_update.tip {
+ reached_tip = true;
+ }
+
+ let block = chain_update.block_data().decode();
+ let this_era = block.era().to_string();
+
+ // When we transition between important points, show the last block as well.
+ if ((current_era != this_era)
+ || (chain_update.immutable() != last_immutable)
+ || (last_fork != chain_update.data.fork()))
+ && !last_update_shown
+ {
+ if let Some(last_update) = last_update.clone() {
+ info!(
+ chain = network.to_string(),
+ "Chain Update {}:{}",
+ updates - 1,
+ last_update
+ );
+ }
+ }
+
+ // If these become true, we will show all blocks from the follower.
+ follow_all = follow_all
+ || (!chain_update.immutable() && all_live_blocks)
+ || ((chain_update.data.fork() > 1) && all_tip_blocks);
+
+ // Don't know if this update will show or not, so say it didn't.
+ last_update_shown = false;
+
+ if (current_era != this_era)
+ || (chain_update.immutable() != last_immutable)
+ || reached_tip
+ || follow_all
+ || (updates % RUNNING_UPDATE_INTERVAL == 0)
+ || (last_fork != chain_update.data.fork())
+ {
+ current_era = this_era;
+ last_immutable = chain_update.immutable();
+ last_fork = chain_update.data.fork();
+ info!(
+ chain = network.to_string(),
+ "Chain Update {updates}:{}", chain_update
+ );
+ // We already showed the last update, no need to show it again.
+ last_update_shown = true;
+ }
+
+ let this_prev_hash = block.header().previous_hash();
+
+ // We have no state, so can only check consistency with block updates.
+ // But thats OK, the chain follower itself is also checking chain consistency.
+ // This is just an example.
+ if chain_update.kind == Kind::Block && last_update.is_some() && prev_hash != this_prev_hash
+ {
+ let display_last_update = if let Some(last_update) = last_update.clone() {
+ format!("{last_update}")
+ } else {
+ "This Can't Happen".to_string()
+ };
+ error!(
+ chain = network.to_string(),
+ "Chain is broken: {chain_update} Does not follow: {display_last_update}",
+ );
+ break;
+ }
+
+ // Inspect the transactions in the block.
+ let mut dump_raw_aux_data = false;
+ for (tx_idx, _tx) in block.txs().iter().enumerate() {
+ if let Some(decoded_metadata) = chain_update
+ .data
+ .txn_metadata(tx_idx, Metadata::cip36::LABEL)
+ {
+ let raw_size = match chain_update
+ .data
+ .txn_raw_metadata(tx_idx, Metadata::cip36::LABEL)
+ {
+ Some(raw) => raw.len(),
+ None => 0,
+ };
+
+ if largest_metadata && raw_size > biggest_aux_data {
+ biggest_aux_data = raw_size;
+ dump_raw_aux_data = true;
+ }
+
+ if bad_cip36 {
+ #[allow(irrefutable_let_patterns)] // Won't always be irrefutable.
+ if let Metadata::DecodedMetadataValues::Cip36(cip36) = &decoded_metadata.value {
+ if !cip36.signed {
+ dump_raw_aux_data = true;
+ }
+ if !decoded_metadata.report.is_empty() {
+ info!(
+ chain = network.to_string(),
+ "Cip36 {tx_idx}:{:?} - {raw_size}", decoded_metadata
+ );
+ dump_raw_aux_data = true;
+ }
+ }
+ }
+ }
+ }
+
+ if dump_raw_aux_data {
+ if let Some(x) = block.as_alonzo() {
+ info!(
+ chain = network.to_string(),
+ "Raw Aux Data: {:02x?}", x.auxiliary_data_set
+ );
+ } else if let Some(x) = block.as_babbage() {
+ info!(
+ chain = network.to_string(),
+ "Raw Aux Data: {:02x?}", x.auxiliary_data_set
+ );
+ } else if let Some(x) = block.as_conway() {
+ info!(
+ chain = network.to_string(),
+ "Raw Aux Data: {:02x?}", x.auxiliary_data_set
+ );
+ }
+ }
+
+ prev_hash = Some(block.hash());
+ last_update = Some(chain_update);
+
+ if reached_tip && stop_at_tip {
+ break;
+ }
+
+ let check_time = Instant::now();
+ if check_time.duration_since(last_metrics_time).as_secs() >= 60 {
+ last_metrics_time = check_time;
+
+ let stats = Statistics::new(network);
+
+ info!("Json Metrics: {}", stats.as_json(true));
+
+ if halt_on_error
+ && (stats.mithril.download_or_validation_failed > 0
+ || stats.mithril.failed_to_get_tip > 0
+ || stats.mithril.tip_did_not_advance > 0
+ || stats.mithril.tip_failed_to_send_to_updater > 0
+ || stats.mithril.failed_to_activate_new_snapshot > 0)
+ {
+ break;
+ }
+ }
+ }
+
+ if !last_update_shown {
+ if let Some(last_update) = last_update.clone() {
+ info!(chain = network.to_string(), "Last Update: {}", last_update);
+ }
+ }
+
+ let stats = Statistics::new(network);
+ info!("Json Metrics: {}", stats.as_json(true));
+
+ info!(chain = network.to_string(), "Following Completed.");
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ tracing_subscriber::fmt()
+ .with_file(true)
+ .with_line_number(true)
+ .with_thread_names(true)
+ .with_thread_ids(true)
+ .pretty()
+ .with_env_filter(
+ EnvFilter::builder()
+ .with_default_directive(LevelFilter::INFO.into())
+ .from_env_lossy(),
+ )
+ .init();
+
+ let (networks, matches) = process_argument();
+ let parallelism = std::thread::available_parallelism()?;
+ info!(
+ Parallelism = parallelism,
+ "Cardano Chain Followers Starting."
+ );
+
+ #[cfg(feature = "mimalloc")]
+ info!("mimalloc global allocator: enabled");
+
+ // First we need to actually start the underlying sync tasks for each blockchain.
+ for network in &networks {
+ start_sync_for(network, matches.clone()).await?;
+ }
+
+ // Make a follower for the network.
+ let mut tasks = Vec::new();
+ for network in &networks {
+ tasks.push(tokio::spawn(follow_for(*network, matches.clone())));
+ }
+
+ // Wait for all followers to finish.
+ for task in tasks {
+ task.await?;
+ }
+
+ // Keep running for 1 minute after last follower reaches its tip.
+ tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+ Ok(())
+}
diff --git a/rust/cardano-chain-follower/examples/read_block.rs b/rust/cardano-chain-follower/examples/read_block.rs
deleted file mode 100644
index b9120a0d0bf..00000000000
--- a/rust/cardano-chain-follower/examples/read_block.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-//! This example shows how to use the chain follower to download arbitrary blocks
-//! from the chain.
-
-use std::error::Error;
-
-use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- let config = FollowerConfigBuilder::default().build();
-
- let follower = Follower::connect(
- "relays-new.cardano-mainnet.iohk.io:3001",
- Network::Mainnet,
- config,
- )
- .await?;
-
- let data = follower
- .read_block(Point::Specific(
- 110_908_236,
- hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
- ))
- .await?;
-
- let block = data.decode()?;
-
- let total_fee = block
- .txs()
- .iter()
- .map(|tx| tx.fee().unwrap_or_default())
- .sum::();
-
- println!("Total fee: {total_fee}");
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/read_block_mithril.rs b/rust/cardano-chain-follower/examples/read_block_mithril.rs
deleted file mode 100644
index f71a1cc183d..00000000000
--- a/rust/cardano-chain-follower/examples/read_block_mithril.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-//! This example shows how to use the chain follower to read arbitrary blocks
-//! from Mithril snapshot files.
-
-// Allowing since this is example code.
-#![allow(clippy::unwrap_used)]
-
-use std::{error::Error, path::PathBuf};
-
-use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- // Defaults to start following from the tip.
- let config = FollowerConfigBuilder::default()
- .mithril_snapshot_path(
- PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap())
- .join("examples/snapshot_data"),
- )
- .build();
-
- let follower = Follower::connect(
- "preprod-node.play.dev.cardano.org:3001",
- Network::Preprod,
- config,
- )
- .await?;
-
- let data = follower
- .read_block(Point::Specific(
- 49_075_418,
- hex::decode("bdb5ce7788850c30342794f252b1d955086862e8f7cb90a32a8f560b693ca78a")?,
- ))
- .await?;
-
- let block = data.decode()?;
-
- let total_fee = block
- .txs()
- .iter()
- .map(|tx| tx.fee().unwrap_or_default())
- .sum::();
-
- println!("Block number: {}", block.number());
- println!("Total fee: {total_fee}");
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/read_block_range.rs b/rust/cardano-chain-follower/examples/read_block_range.rs
deleted file mode 100644
index da41724bbcf..00000000000
--- a/rust/cardano-chain-follower/examples/read_block_range.rs
+++ /dev/null
@@ -1,51 +0,0 @@
-//! This example shows how to use the chain follower to download arbitrary blocks
-//! from the chain.
-
-use std::error::Error;
-
-use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- let config = FollowerConfigBuilder::default().build();
-
- let follower = Follower::connect(
- "relays-new.cardano-mainnet.iohk.io:3001",
- Network::Mainnet,
- config,
- )
- .await?;
-
- let data_vec = follower
- .read_block_range(
- Point::Specific(
- 110_908_236,
- hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
- ),
- Point::Specific(
- 110_908_582,
- hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
- ),
- )
- .await?;
-
- let mut total_txs = 0;
- for data in data_vec {
- let block = data.decode()?;
- total_txs += block.tx_count();
- }
-
- println!("Total transactions: {total_txs}");
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/read_block_range_mithril.rs b/rust/cardano-chain-follower/examples/read_block_range_mithril.rs
deleted file mode 100644
index abeaf264af0..00000000000
--- a/rust/cardano-chain-follower/examples/read_block_range_mithril.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-//! This example shows how to use the chain follower to read arbitrary blocks
-//! from Mithril snapshot files.
-
-// Allowing since this is example code.
-#![allow(clippy::unwrap_used)]
-
-use std::{error::Error, path::PathBuf};
-
-use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- // Defaults to start following from the tip.
- let config = FollowerConfigBuilder::default()
- .mithril_snapshot_path(
- PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap())
- .join("examples/snapshot_data"),
- )
- .build();
-
- let follower = Follower::connect(
- "preprod-node.play.dev.cardano.org:3001",
- Network::Preprod,
- config,
- )
- .await?;
-
- let data_vec = follower
- .read_block_range(
- // Block: 1794556
- Point::Specific(
- 49_075_380,
- hex::decode("a5d7ffbc7e61bf19e90b2b07276026d5fdd43424cc3436547b9532ca4a9f19ad")?,
- ),
- // Block: 1794560
- Point::Specific(
- 49_075_522,
- hex::decode("b7639b523f320643236ab0fc04b7fd381dedd42c8d6b6433b5965a5062411396")?,
- ),
- )
- .await?;
-
- for data in data_vec {
- let block = data.decode()?;
-
- println!(
- "Block {} has {} transactions",
- block.number(),
- block.tx_count()
- );
- }
-
- Ok(())
-}
diff --git a/rust/cardano-chain-follower/examples/set_read_pointer.rs b/rust/cardano-chain-follower/examples/set_read_pointer.rs
deleted file mode 100644
index 8f44191ed4f..00000000000
--- a/rust/cardano-chain-follower/examples/set_read_pointer.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-//! This example shows how to set the follower's read pointer without stopping it.
-
-use std::error::Error;
-
-use cardano_chain_follower::{ChainUpdate, Follower, FollowerConfigBuilder, Network, Point};
-use tracing::level_filters::LevelFilter;
-use tracing_subscriber::EnvFilter;
-
-#[tokio::main]
-async fn main() -> Result<(), Box> {
- tracing_subscriber::fmt()
- .with_env_filter(
- EnvFilter::builder()
- .with_default_directive(LevelFilter::INFO.into())
- .from_env_lossy(),
- )
- .init();
-
- // Defaults to start following from the tip.
- let config = FollowerConfigBuilder::default().build();
-
- let mut follower = Follower::connect(
- "relays-new.cardano-mainnet.iohk.io:3001",
- Network::Mainnet,
- config,
- )
- .await?;
-
- let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
- let mut pointer_set = false;
- tokio::spawn(async move {
- let _tx = tx;
- tokio::time::sleep(std::time::Duration::from_secs(2)).await;
- });
-
- loop {
- tokio::select! {
- _ = &mut rx, if !pointer_set => {
- follower.set_read_pointer(Point::Specific(
- 110_908_236,
- hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
- )).await?;
- println!("set read pointer");
-
- pointer_set = true;
- }
-
- chain_update = follower.next() => {
- match chain_update? {
- ChainUpdate::Block(data) => {
- let block = data.decode()?;
-
- println!(
- "New block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- ChainUpdate::Rollback(data) => {
- let block = data.decode()?;
-
- println!(
- "Rollback block NUMBER={} SLOT={} HASH={}",
- block.number(),
- block.slot(),
- hex::encode(block.hash()),
- );
- },
- }
- }
- }
- }
-}
diff --git a/rust/cardano-chain-follower/src/chain_sync.rs b/rust/cardano-chain-follower/src/chain_sync.rs
new file mode 100644
index 00000000000..779efcc0f6b
--- /dev/null
+++ b/rust/cardano-chain-follower/src/chain_sync.rs
@@ -0,0 +1,573 @@
+//! Sync from the chain to an in-memory buffer.
+//!
+//! All iteration of the chain is done through this buffer or a mithril snapshot.
+//! Consumers of this library do not talk to the node directly.
+
+use std::time::Duration;
+
+use anyhow::Context;
+use pallas::{
+ ledger::traverse::MultiEraHeader,
+ network::{
+ facades::PeerClient,
+ miniprotocols::chainsync::{self, HeaderContent, Tip},
+ },
+};
+use tokio::{
+ spawn,
+ sync::mpsc,
+ time::{sleep, timeout},
+};
+use tracing::{debug, error};
+
+use crate::{
+ chain_sync_live_chains::{
+ get_fill_to_point, get_intersect_points, get_live_block, get_live_head_point, get_peer_tip,
+ live_chain_add_block_to_tip, live_chain_backfill, live_chain_length, purge_live_chain,
+ },
+ chain_sync_ready::{
+ get_chain_update_tx_queue, notify_follower, wait_for_sync_ready, SyncReadyWaiter,
+ },
+ chain_update,
+ error::{Error, Result},
+ mithril_snapshot_config::MithrilUpdateMessage,
+ mithril_snapshot_data::latest_mithril_snapshot_id,
+ point::{TIP_POINT, UNKNOWN_POINT},
+ stats, ChainSyncConfig, MultiEraBlock, Network, Point, ORIGIN_POINT,
+};
+
+/// The maximum number of seconds we wait for a node to connect.
+const MAX_NODE_CONNECT_TIME_SECS: u64 = 2;
+
+/// The maximum number of times we wait for a nodeChainUpdate to connect.
+/// Currently set to never give up.
+const MAX_NODE_CONNECT_RETRIES: u64 = 5;
+
+/// Try and connect to a node, in a robust and quick way.
+///
+/// If it takes longer then 5 seconds, retry the connection.
+/// Retry 5 times before giving up.
+async fn retry_connect(
+ addr: &str, magic: u64,
+) -> std::result::Result {
+ let mut retries = MAX_NODE_CONNECT_RETRIES;
+ loop {
+ match timeout(
+ Duration::from_secs(MAX_NODE_CONNECT_TIME_SECS),
+ PeerClient::connect(addr, magic),
+ )
+ .await
+ {
+ Ok(peer) => {
+ match peer {
+ Ok(peer) => return Ok(peer),
+ Err(err) => {
+ retries -= 1;
+ if retries == 0 {
+ return Err(err);
+ }
+ debug!("retrying {retries} connect to {addr} : {err:?}");
+ },
+ }
+ },
+ Err(error) => {
+ retries -= 1;
+ if retries == 0 {
+ return Err(pallas::network::facades::Error::ConnectFailure(
+ tokio::io::Error::new(
+ tokio::io::ErrorKind::Other,
+ format!("failed to connect to {addr} : {error}"),
+ ),
+ ));
+ }
+ debug!("retrying {retries} connect to {addr} : {error:?}");
+ },
+ }
+ }
+}
+
+/// Purge the live chain, and intersect with TIP.
+async fn purge_and_intersect_tip(client: &mut PeerClient, chain: Network) -> Result {
+ if let Err(error) = purge_live_chain(chain, &TIP_POINT) {
+ // Shouldn't happen.
+ error!("failed to purge live chain: {error}");
+ }
+
+ client
+ .chainsync()
+ .intersect_tip()
+ .await
+ .map_err(Error::Chainsync)
+ .map(std::convert::Into::into)
+}
+
+/// Resynchronize to the live tip in memory.
+async fn resync_live_tip(client: &mut PeerClient, chain: Network) -> Result {
+ let sync_points = get_intersect_points(chain);
+ if sync_points.is_empty() {
+ return purge_and_intersect_tip(client, chain).await;
+ }
+
+ let sync_to_point = match client.chainsync().find_intersect(sync_points).await {
+ Ok((Some(point), _)) => point.into(),
+ Ok((None, _)) => {
+ // No intersection found, so purge live chain and re-sync it.
+ return purge_and_intersect_tip(client, chain).await;
+ },
+ Err(error) => return Err(Error::Chainsync(error)),
+ };
+
+ Ok(sync_to_point)
+}
+
+/// Fetch a single block from the Peer, and Decode it.
+async fn fetch_block_from_peer(
+ peer: &mut PeerClient, chain: Network, point: Point, previous_point: Point, fork_count: u64,
+) -> anyhow::Result {
+ let block_data = peer
+ .blockfetch()
+ .fetch_single(point.clone().into())
+ .await
+ .with_context(|| "Fetching block data")?;
+
+ debug!("{chain}, {previous_point}, {fork_count}");
+ let live_block_data = MultiEraBlock::new(chain, block_data, &previous_point, fork_count)?;
+
+ Ok(live_block_data)
+}
+
+/// Process a rollback.
+///
+/// Fetch the rollback block, and try and insert it into the live-chain.
+/// If its a real rollback, it will purge the chain ahead of the block automatically.
+async fn process_rollback_actual(
+ peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, fork_count: &mut u64,
+) -> anyhow::Result {
+ debug!("RollBackward: {:?} {:?}", point, tip);
+
+ // Check if the block is in the live chain, if it is, re-add it, which auto-purges the
+ // rest of live chain tip. And increments the fork count.
+ if let Some(mut block) = get_live_block(chain, &point, 0, true) {
+ // Even though we are re-adding the known block, increase the fork count.
+ block.set_fork(*fork_count);
+ live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
+ return Ok(point);
+ }
+
+ // If the block is NOT in the chain, fetch it, and insert it, which will automatically
+ // find the correct place to insert it, and purge the old tip blocks.
+
+ // We don't know what or if there is a previous block, so probe for it.
+ // Fizzy search for the block immediately preceding the block we will fetch.
+ // In case we don;t have a previous point on the live chain, it might be the tip of the
+ // mithril chain, so get that.
+ let previous_block = get_live_block(chain, &point, -1, false);
+ let previous_point = if let Some(previous_block) = previous_block {
+ let previous = previous_block.previous();
+ debug!("Previous block: {:?}", previous);
+ if previous == ORIGIN_POINT {
+ latest_mithril_snapshot_id(chain).tip()
+ } else {
+ previous
+ }
+ } else {
+ debug!("Using Mithril Tip as rollback previous point.");
+ latest_mithril_snapshot_id(chain).tip()
+ };
+ debug!("Previous point: {:?}", previous_point);
+ let block =
+ fetch_block_from_peer(peer, chain, point.clone(), previous_point, *fork_count).await?;
+ live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
+
+ // Next block we receive is a rollback.
+ Ok(point)
+}
+
+/// Process a rollback detected from the peer.
+async fn process_rollback(
+ peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, previous_point: &Point,
+ fork_count: &mut u64,
+) -> anyhow::Result {
+ let rollback_slot = point.slot_or_default();
+ let head_slot = previous_point.slot_or_default();
+ debug!("Head slot: {}", head_slot);
+ debug!("Rollback slot: {}", rollback_slot);
+ let slot_rollback_size = if head_slot > rollback_slot {
+ head_slot - rollback_slot
+ } else {
+ 0
+ };
+
+ // We actually do the work here...
+ let response = process_rollback_actual(peer, chain, point, tip, fork_count).await?;
+
+ // We never really know how many blocks are rolled back when advised by the peer, but we
+ // can work out how many slots. This function wraps the real work, so we can properly
+ // record the stats when the rollback is complete. Even if it errors.
+ stats::rollback(chain, stats::RollbackType::Peer, slot_rollback_size);
+
+ Ok(response)
+}
+
+/// Process a rollback detected from the peer.
+async fn process_next_block(
+ peer: &mut PeerClient, chain: Network, header: HeaderContent, tip: &Tip,
+ previous_point: &Point, fork_count: &mut u64,
+) -> anyhow::Result {
+ // Decode the Header of the block so we know what to fetch.
+ let decoded_header = MultiEraHeader::decode(
+ header.variant,
+ header.byron_prefix.map(|p| p.0),
+ &header.cbor,
+ )
+ .with_context(|| "Decoding Block Header")?;
+
+ let block_point = Point::new(decoded_header.slot(), decoded_header.hash().to_vec());
+
+ debug!("RollForward: {block_point:?} {tip:?}");
+
+ let block = fetch_block_from_peer(
+ peer,
+ chain,
+ block_point.clone(),
+ previous_point.clone(),
+ *fork_count,
+ )
+ .await?;
+
+ let block_point = block.point();
+
+ // We can't store this block because we don't know the previous one so the chain
+ // would break, so just use it for previous.
+ if *previous_point == UNKNOWN_POINT {
+ // Nothing else we can do with the first block when we don't know the previous
+ // one. Just return it's point.
+ debug!("Not storing the block, because we did not know the previous point.");
+ } else {
+ live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
+ }
+
+ Ok(block_point)
+}
+
+/// Follows the chain until there is an error.
+/// If this returns it can be assumed the client is disconnected.
+///
+/// We take ownership of the client because of that.
+async fn follow_chain(
+ peer: &mut PeerClient, chain: Network, fork_count: &mut u64,
+) -> anyhow::Result<()> {
+ let mut update_sender = get_chain_update_tx_queue(chain).await;
+ let mut previous_point = UNKNOWN_POINT;
+
+ loop {
+ // debug!("Waiting for data from Cardano Peer Node:");
+
+ // We can't get an update sender UNTIL we have released the sync lock.
+ if update_sender.is_none() {
+ update_sender = get_chain_update_tx_queue(chain).await;
+ }
+
+ // Check what response type we need to process.
+ let response = match peer.chainsync().state() {
+ chainsync::State::CanAwait => peer.chainsync().recv_while_can_await().await,
+ chainsync::State::MustReply => peer.chainsync().recv_while_must_reply().await,
+ _ => peer.chainsync().request_next().await,
+ }
+ .with_context(|| "Error while receiving block data from peer")?;
+
+ match response {
+ chainsync::NextResponse::RollForward(header, tip) => {
+ // Note: Tip is poorly documented.
+ // It is a tuple with the following structure:
+ // ((Slot#, BlockHash), Block# ).
+ // We can find if we are AT tip by comparing the current block Point with the tip
+ // Point. We can estimate how far behind we are (in blocks) by
+ // subtracting current block height and the tip block height.
+ // IF the TIP is <= the current block height THEN we are at tip.
+ previous_point =
+ process_next_block(peer, chain, header, &tip, &previous_point, fork_count)
+ .await?;
+
+ // This update is just for followers to know to look again at their live chains for
+ // new data.
+ notify_follower(chain, &update_sender, &chain_update::Kind::Block);
+ },
+ chainsync::NextResponse::RollBackward(point, tip) => {
+ previous_point =
+ process_rollback(peer, chain, point.into(), &tip, &previous_point, fork_count)
+ .await?;
+ // This update is just for followers to know to look again at their live chains for
+ // new data.
+ notify_follower(chain, &update_sender, &chain_update::Kind::Rollback);
+ },
+ chainsync::NextResponse::Await => {
+ // debug!("Peer Node says: Await");
+ },
+ }
+ }
+}
+
+/// How long we wait before trying to reconnect to a peer when it totally fails our
+/// attempts.
+const PEER_FAILURE_RECONNECT_DELAY: Duration = Duration::from_secs(10);
+
+/// Do not return until we have a connection to the peer.
+async fn persistent_reconnect(addr: &str, chain: Network) -> PeerClient {
+ // Not yet connected to the peer.
+ stats::peer_connected(chain, false, addr);
+
+ loop {
+ // We never have a connection if we end up around the loop, so make a new one.
+ match retry_connect(addr, chain.into()).await {
+ Ok(peer) => {
+ // Successfully connected to the peer.
+ stats::peer_connected(chain, true, addr);
+
+ return peer;
+ },
+ Err(error) => {
+ error!(
+ "Chain Sync for: {} from {} : Failed to connect to relay: {}",
+ chain, addr, error,
+ );
+
+ // Wait a bit before trying again.
+ tokio::time::sleep(PEER_FAILURE_RECONNECT_DELAY).await;
+ },
+ };
+ }
+}
+
+/// Backfill the live chain, based on the Mithril Sync updates.
+/// This does NOT return until the live chain has been backfilled from the end of mithril
+/// to the current synced tip blocks.
+///
+/// This only needs to be done once per chain connection.
+async fn live_sync_backfill(
+ cfg: &ChainSyncConfig, update: &MithrilUpdateMessage,
+) -> anyhow::Result<()> {
+ stats::backfill_started(cfg.chain);
+
+ let (fill_to, _oldest_fork) = get_fill_to_point(cfg.chain).await;
+ let range = (update.tip.clone().into(), fill_to.clone().into());
+ let mut previous_point = update.previous.clone();
+
+ let range_msg = format!("{range:?}");
+
+ let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;
+
+ // Request the range of blocks from the Peer.
+ peer.blockfetch()
+ .request_range(range)
+ .await
+ .with_context(|| "Requesting Block Range")?;
+
+ let mut backfill_blocks = Vec::::new();
+
+ while let Some(block_data) = peer.blockfetch().recv_while_streaming().await? {
+ // Backfilled blocks get placed in the oldest fork currently on the live-chain.
+ let block =
+ MultiEraBlock::new(cfg.chain, block_data, &previous_point, 1).with_context(|| {
+ format!(
+ "Failed to decode block data. previous: {previous_point:?}, range: {range_msg}"
+ )
+ })?;
+
+ // Check we get the first block in the range properly.
+ if backfill_blocks.is_empty() && !block.point().strict_eq(&update.tip) {
+ return Err(Error::BackfillSync(format!(
+ "First Block is invalid: Block {:?} != Range Start {:?}.",
+ block.point(),
+ update.tip
+ ))
+ .into());
+ }
+
+ previous_point = block.point();
+
+ backfill_blocks.push(block);
+ }
+
+ // Check we get the last block in the range properly.
+ if backfill_blocks.is_empty() || !previous_point.strict_eq(&fill_to) {
+ return Err(Error::BackfillSync(format!(
+ "Last Block is invalid. Block {previous_point:?} != Range End {fill_to:?}"
+ ))
+ .into());
+ }
+
+ // Report how many backfill blocks we received.
+ let backfill_size = backfill_blocks.len() as u64;
+
+ // Try and backfill, if anything doesn't work, or the chain integrity would break, fail.
+ live_chain_backfill(cfg.chain, &backfill_blocks)?;
+
+ stats::backfill_ended(cfg.chain, backfill_size);
+
+ debug!("Backfilled Range OK: {}", range_msg);
+
+ Ok(())
+}
+
+/// Backfill and Purge the live chain, based on the Mithril Sync updates.
+async fn live_sync_backfill_and_purge(
+ cfg: ChainSyncConfig, mut rx: mpsc::Receiver,
+ mut sync_ready: SyncReadyWaiter,
+) {
+ // Wait for first Mithril Update advice, which triggers a BACKFILL of the Live Data.
+ let Some(update) = rx.recv().await else {
+ error!("Mithril Sync Failed, can not continue chain sync either.");
+ return;
+ };
+
+ debug!(
+ "Before Backfill: Size of the Live Chain is: {} Blocks",
+ live_chain_length(cfg.chain)
+ );
+
+ let live_chain_head: Point;
+
+ loop {
+ // We will re-attempt backfill, until its successful.
+ // Backfill is atomic, it either fully works, or none of the live-chain is changed.
+ debug!("Mithril Tip has advanced to: {update:?} : BACKFILL");
+ while let Err(error) = live_sync_backfill(&cfg, &update).await {
+ error!("Mithril Backfill Sync Failed: {}", error);
+ sleep(Duration::from_secs(10)).await;
+ }
+
+ if let Some(head_point) = get_live_head_point(cfg.chain) {
+ live_chain_head = head_point;
+ break;
+ }
+ }
+
+ stats::new_mithril_update(
+ cfg.chain,
+ update.tip.slot_or_default(),
+ live_chain_length(cfg.chain) as u64,
+ live_chain_head.slot_or_default(),
+ );
+
+ debug!(
+ "After Backfill: Size of the Live Chain is: {} Blocks",
+ live_chain_length(cfg.chain)
+ );
+
+ // Once Backfill is completed OK we can use the Blockchain data for Syncing and Querying
+ sync_ready.signal();
+
+ let mut update_sender = get_chain_update_tx_queue(cfg.chain).await;
+
+ loop {
+ let Some(update) = rx.recv().await else {
+ error!("Mithril Sync Failed, can not continue chain sync either.");
+ return;
+ };
+
+ // We can't get an update sender until the sync is released.
+ if update_sender.is_none() {
+ update_sender = get_chain_update_tx_queue(cfg.chain).await;
+ }
+
+ debug!("Mithril Tip has advanced to: {update:?} : PURGE NEEDED");
+
+ let update_point: Point = update.tip.clone();
+
+ if let Err(error) = purge_live_chain(cfg.chain, &update_point) {
+ // This should actually never happen.
+ error!("Mithril Purge Failed: {}", error);
+ }
+
+ debug!(
+ "After Purge: Size of the Live Chain is: {} Blocks",
+ live_chain_length(cfg.chain)
+ );
+
+ notify_follower(
+ cfg.chain,
+ &update_sender,
+ &chain_update::Kind::ImmutableBlockRollForward,
+ );
+ }
+
+ // TODO: If the mithril sync dies, sleep for a bit and make sure the live chain
+ // doesn't grow indefinitely.
+ // We COULD move the spawn of mithril following into here, and if the rx dies, kill
+ // that task, and restart it.
+ // In reality, the mithril sync should never die and drop the queue.
+}
+
+/// Handle the background downloading of Mithril snapshots for a given network.
+/// Note: There can ONLY be at most three of these running at any one time.
+/// This is because there can ONLY be one snapshot for each of the three known Cardano
+/// networks.
+/// # Arguments
+///
+/// * `network` - The network type for the client to connect to.
+/// * `aggregator_url` - A reference to the URL of an aggregator that can be used to
+/// create the client.
+/// * `genesis_vkey` - The genesis verification key, which is needed to authenticate with
+/// the server.
+///
+/// # Returns
+///
+/// This does not return, it is a background task.
+pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver) {
+ debug!(
+ "Chain Sync for: {} from {} : Starting",
+ cfg.chain, cfg.relay_address,
+ );
+
+ // Start the SYNC_READY unlock task.
+ let sync_waiter = wait_for_sync_ready(cfg.chain);
+
+ let backfill_cfg = cfg.clone();
+
+ // Start the Live chain backfill task.
+ let _backfill_join_handle = spawn(async move {
+ live_sync_backfill_and_purge(backfill_cfg.clone(), rx, sync_waiter).await;
+ });
+
+ // Live Fill data starts at fork 1.
+ // Immutable data from a mithril snapshot is fork 0.
+ // Live backfill is always Fork 1.
+ let mut fork_count: u64 = 2;
+
+ loop {
+ // We never have a connection if we end up around the loop, so make a new one.
+ let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;
+
+ match resync_live_tip(&mut peer, cfg.chain).await {
+ Ok(tip) => debug!("Tip Resynchronized to {tip}"),
+ Err(error) => {
+ error!(
+ "Cardano Client {} failed to resync Tip: {}",
+ cfg.relay_address, error
+ );
+ continue;
+ },
+ }
+
+ // Note: This can ONLY return with an error, otherwise it will sync indefinitely.
+ if let Err(error) = follow_chain(&mut peer, cfg.chain, &mut fork_count).await {
+ error!(
+ "Cardano Client {} failed to follow chain: {}: Reconnecting.",
+ cfg.relay_address, error
+ );
+ continue;
+ }
+
+ // If this returns, we are on a new fork (or assume we are)
+ fork_count += 1;
+ }
+}
+
+/// Is the current point aligned with what we know as tip.
+pub(crate) async fn point_at_tip(chain: Network, point: &Point) -> bool {
+ let tip = get_peer_tip(chain);
+
+ // We are said to be AT TIP, if the block point is greater than or equal to the tip.
+ tip <= *point
+}
diff --git a/rust/cardano-chain-follower/src/chain_sync_config.rs b/rust/cardano-chain-follower/src/chain_sync_config.rs
new file mode 100644
index 00000000000..e9d29d06c15
--- /dev/null
+++ b/rust/cardano-chain-follower/src/chain_sync_config.rs
@@ -0,0 +1,165 @@
+//! Cardano chain sync configuration.
+//!
+//! Independent of ANY followers, we allow a maximum of 3 Chains being updated, one for
+//! each network. Chain Followers use the data supplied by the Chain-Sync.
+//! This module configures the chain sync processes.
+
+use std::sync::LazyLock;
+
+use dashmap::DashMap;
+use strum::IntoEnumIterator;
+use tokio::{sync::Mutex, task::JoinHandle};
+use tracing::{debug, error};
+
+use crate::{
+ chain_sync::chain_sync,
+ error::{Error, Result},
+ mithril_snapshot_config::MithrilSnapshotConfig,
+ network::Network,
+ stats,
+};
+
+/// Default Follower block buffer size.
+const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;
+
+/// How many slots back from TIP is considered Immutable in the absence of a mithril
+/// snapshot.
+const DEFAULT_IMMUTABLE_SLOT_WINDOW: u64 = 12 * 60 * 60;
+
+/// Type we use to manage the Sync Task handle map.
+type SyncMap = DashMap>>>;
+/// Handle to the mithril sync thread. One for each Network ONLY.
+static SYNC_JOIN_HANDLE_MAP: LazyLock = LazyLock::new(|| {
+ let map = DashMap::new();
+ for network in Network::iter() {
+ map.insert(network, Mutex::new(None));
+ }
+ map
+});
+
+/// A Follower Connection to the Cardano Network.
+#[derive(Clone, Debug)]
+pub struct ChainSyncConfig {
+ /// Chain Network
+ pub chain: Network,
+ /// Relay Node Address
+ pub(crate) relay_address: String,
+ /// Block buffer size option.
+ chain_update_buffer_size: usize,
+ /// If we don't have immutable data, how far back from TIP is the data considered
+ /// Immutable (in slots).
+ immutable_slot_window: u64,
+ /// Configuration of Mithril Snapshots.
+ pub mithril_cfg: MithrilSnapshotConfig,
+}
+
+impl ChainSyncConfig {
+ /// Sets the defaults for a given cardano network.
+ /// Each network has a different set of defaults, so no single "default" can apply.
+ /// This function is preferred to the `default()` standard function.
+ #[must_use]
+ pub fn default_for(chain: Network) -> Self {
+ Self {
+ chain,
+ relay_address: chain.default_relay(),
+ chain_update_buffer_size: DEFAULT_CHAIN_UPDATE_BUFFER_SIZE,
+ immutable_slot_window: DEFAULT_IMMUTABLE_SLOT_WINDOW,
+ mithril_cfg: MithrilSnapshotConfig::default_for(chain),
+ }
+ }
+
+ /// Sets the relay to use for Chain Sync.
+ ///
+ /// # Arguments
+ ///
+ /// * `relay`: Address to use for the blockchain relay node.
+ #[must_use]
+ pub fn relay(mut self, address: String) -> Self {
+ self.relay_address = address;
+ self
+ }
+
+ /// Sets the size of the chain updates buffer used by the Follower.
+ ///
+ /// # Arguments
+ ///
+ /// * `chain_update_buffer_size`: Size of the chain updates buffer.
+ #[must_use]
+ pub fn chain_update_buffer_size(mut self, block_buffer_size: usize) -> Self {
+ self.chain_update_buffer_size = block_buffer_size;
+ self
+ }
+
+ /// Sets the size of the Immutable window used when Mithril is not available.
+ ///
+ /// # Arguments
+ ///
+ /// * `window`: Size of the Immutable window.
+ #[must_use]
+ pub fn immutable_slot_window(mut self, window: u64) -> Self {
+ self.immutable_slot_window = window;
+ self
+ }
+
+ /// Sets the the Mithril snapshot Config the `ChainSync` will use.
+ ///
+ /// # Arguments
+ ///
+ /// * `path`: Mithril snapshot path.
+ /// * `update`: Auto-update this path with the latest mithril snapshot as it changes.
+ #[must_use]
+ pub fn mithril_cfg(mut self, cfg: MithrilSnapshotConfig) -> Self {
+ self.mithril_cfg = cfg;
+ self
+ }
+
+ /// Runs Chain Synchronization.
+ ///
+ /// Must be done BEFORE the chain can be followed.
+ ///
+ /// # Arguments
+ ///
+ /// * `chain`: The chain to follow.
+ ///
+ /// # Returns
+ ///
+ /// `Result<()>`: On success.
+ ///
+ /// # Errors
+ ///
+ /// `Error`: On error.
+ pub async fn run(self) -> Result<()> {
+ debug!(
+ chain = self.chain.to_string(),
+ "Chain Synchronization Starting"
+ );
+
+ stats::sync_started(self.chain);
+
+ // Start the Chain Sync - IFF its not already running.
+ let lock_entry = match SYNC_JOIN_HANDLE_MAP.get(&self.chain) {
+ None => {
+ error!("Join Map improperly initialized: Missing {}!!", self.chain);
+ return Err(Error::Internal); // Should not get here.
+ },
+ Some(entry) => entry,
+ };
+ let mut locked_handle = lock_entry.value().lock().await;
+
+ if (*locked_handle).is_some() {
+ debug!("Chain Sync Already Running for {}", self.chain);
+ return Err(Error::ChainSyncAlreadyRunning(self.chain));
+ }
+
+ // Start the Mithril Snapshot Follower
+ let rx = self.mithril_cfg.run().await?;
+
+ // Start Chain Sync
+ *locked_handle = Some(tokio::spawn(chain_sync(self.clone(), rx)));
+
+ // sync_map.insert(chain, handle);
+ debug!("Chain Sync for {} : Started", self.chain);
+
+ Ok(())
+ }
+}
diff --git a/rust/cardano-chain-follower/src/chain_sync_live_chains.rs b/rust/cardano-chain-follower/src/chain_sync_live_chains.rs
new file mode 100644
index 00000000000..b5a159ebacf
--- /dev/null
+++ b/rust/cardano-chain-follower/src/chain_sync_live_chains.rs
@@ -0,0 +1,516 @@
+//! Storage of each Live Chain per Blockchain.
+
+use std::{
+ ops::Bound,
+ sync::{Arc, LazyLock, RwLock},
+ time::Duration,
+};
+
+use crossbeam_skiplist::SkipMap;
+use rayon::prelude::*;
+use strum::IntoEnumIterator;
+use tracing::{debug, error};
+
+use crate::{
+ error::{Error, Result},
+ mithril_snapshot_data::latest_mithril_snapshot_id,
+ point::UNKNOWN_POINT,
+ stats, MultiEraBlock, Network, Point, TIP_POINT,
+};
+
+/// Type we use to manage the Sync Task handle map.
+type LiveChainBlockList = SkipMap;
+
+/// Because we have multi-entry relationships in the live-chain protect it with a
+/// `read/write lock`. The underlying `SkipMap` is still capable of multiple simultaneous
+/// reads from multiple threads which is the most common access.
+#[derive(Clone)]
+struct ProtectedLiveChainBlockList(Arc>);
+
+/// Handle to the mithril sync thread. One for each Network ONLY.
+static LIVE_CHAINS: LazyLock> = LazyLock::new(|| {
+ let map = SkipMap::new();
+ for network in Network::iter() {
+ map.insert(network, ProtectedLiveChainBlockList::new());
+ }
+ map
+});
+
+/// Latest TIP received from the Peer Node.
+static PEER_TIP: LazyLock> = LazyLock::new(|| {
+ let map = SkipMap::new();
+ for network in Network::iter() {
+ map.insert(network, UNKNOWN_POINT);
+ }
+ map
+});
+
+/// Set the last TIP received from the peer.
+fn update_peer_tip(chain: Network, tip: Point) {
+ PEER_TIP.insert(chain, tip);
+}
+
+/// Set the last TIP received from the peer.
+pub(crate) fn get_peer_tip(chain: Network) -> Point {
+ (*PEER_TIP.get_or_insert(chain, UNKNOWN_POINT).value()).clone()
+}
+
+/// Number of seconds to wait if we detect a `SyncReady` race condition.
+const DATA_RACE_BACKOFF_SECS: u64 = 2;
+
+impl ProtectedLiveChainBlockList {
+ /// Create a new instance of the protected Live Chain skip map.
+ fn new() -> Self {
+ ProtectedLiveChainBlockList(Arc::new(RwLock::new(LiveChainBlockList::new())))
+ }
+
+ /// Get the `nth` Live block immediately following the specified block.
+ /// If the search is NOT strict, then the point requested is never found.
+ /// 0 = The Block immediately after the requested point.
+ /// 1+ = The block that follows the block after the requested point
+ /// negative = The block before the requested point.
+ fn get_block(&self, point: &Point, mut advance: i64, strict: bool) -> Option {
+ let chain = self.0.read().ok()?;
+
+ let mut this = if strict {
+ chain.get(point)?
+ } else if advance < 0 {
+ // This is a fuzzy lookup backwards.
+ advance += 1;
+ chain.upper_bound(Bound::Excluded(point))?
+ } else {
+ // This is a fuzzy lookup forwards.
+ chain.lower_bound(Bound::Excluded(point))?
+ };
+
+ // If we are stepping backwards, look backwards.
+ while advance < 0 {
+ advance += 1;
+ this = this.prev()?;
+ }
+
+ // If we are stepping forwards, look forwards.
+ while advance > 0 {
+ advance -= 1;
+ this = this.next()?;
+ }
+
+ // Return the block we found.
+ Some(this.value().clone())
+ }
+
+ /// Get the earliest block in the Live Chain
+ fn get_earliest_block(&self) -> Option {
+ let chain = self.0.read().ok()?;
+ let entry = chain.front()?;
+ Some(entry.value().clone())
+ }
+
+ /// Get the point of the first known block in the Live Chain.
+ fn get_first_live_point(live_chain: &LiveChainBlockList) -> Result {
+ let Some(check_first_live_entry) = live_chain.front() else {
+ return Err(Error::LiveSync(
+ "First Block not found in the Live Chain during Backfill".to_string(),
+ ));
+ };
+ let check_first_live_block = check_first_live_entry.value();
+ Ok(check_first_live_block.point())
+ }
+
+ /// Get the point of the first known block in the Live Chain.
+ fn get_last_live_point(live_chain: &LiveChainBlockList) -> Point {
+ let Some(check_last_live_entry) = live_chain.back() else {
+ // Its not an error if we can't get a latest block because the chain is empty,
+ // so report that we don't know...
+ return UNKNOWN_POINT;
+ };
+ let check_last_live_block = check_last_live_entry.value();
+ check_last_live_block.point()
+ }
+
+ /// Atomic Backfill the chain with the given blocks
+ /// Blocks must be sorted in order from earliest to latest.
+ /// Final block MUST seamlessly link to the current head of the live chain. (Enforced)
+ /// First block MUST seamlessly link to the Tip of the Immutable chain. (Enforced)
+ /// The blocks MUST be contiguous and properly self referential.
+ /// Note: This last condition is NOT enforced, but must be met or block chain
+ /// iteration will fail.
+ fn backfill(&self, chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
+ let live_chain = self.0.write().map_err(|_| Error::Internal)?;
+
+ // Make sure our first live block == the last mithril tip.
+ // Ensures we are properly connected to the Mithril Chain.
+ let first_block_point = blocks
+ .first()
+ .ok_or(Error::LiveSync("No first block for backfill.".to_string()))?
+ .point();
+ let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
+ if !first_block_point.strict_eq(&latest_mithril_tip) {
+ return Err(Error::LiveSync(format!(
+ "First Block of Live BackFill {first_block_point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
+ )));
+ }
+
+ // Get the current Oldest block in the live chain.
+ let check_first_live_point = Self::get_first_live_point(&live_chain)?;
+
+ let last_backfill_block = blocks
+ .last()
+ .ok_or(Error::LiveSync("No last block for backfill.".to_string()))?
+ .clone();
+ let last_backfill_point = last_backfill_block.point();
+
+ // Make sure the backfill will properly connect the partial Live chain to the Mithril
+ // chain.
+ if !last_backfill_point.strict_eq(&check_first_live_point) {
+ return Err(Error::LiveSync(format!(
+ "Last Block of Live BackFill {last_backfill_point} MUST be First block of current Live Chain {check_first_live_point}."
+ )));
+ }
+
+ // SkipMap is thread-safe, so we can parallel iterate inserting the blocks.
+ blocks.par_iter().for_each(|block| {
+ let _unused = live_chain.insert(block.point(), block.clone());
+ });
+
+ // End of Successful backfill == Reaching TIP, because live sync is always at tip.
+ stats::tip_reached(chain);
+
+ Ok(())
+ }
+
+ /// Check if the given point is strictly in the live-chain. This means the slot and
+ /// Hash MUST be present.
+ fn strict_block_lookup(live_chain: &LiveChainBlockList, point: &Point) -> bool {
+ if let Some(found_block) = live_chain.get(point) {
+ return found_block.value().point().strict_eq(point);
+ }
+ false
+ }
+
+ /// Adds a block to the tip of the live chain, and automatically purges blocks that
+ /// would be lost due to rollback. Will REFUSE to add a block which does NOT have
+ /// a proper "previous" point defined.
+ fn add_block_to_tip(
+ &self, chain: Network, block: MultiEraBlock, fork_count: &mut u64, tip: Point,
+ ) -> Result<()> {
+ let live_chain = self.0.write().map_err(|_| Error::Internal)?;
+
+ // Check if the insert is the next logical block in the live chain.
+ // Most likely case, so check it first.
+ let previous_point = block.previous();
+ let last_live_point = Self::get_last_live_point(&live_chain);
+ if !previous_point.strict_eq(&last_live_point) {
+ // Detected a rollback, so increase the fork count.
+ *fork_count += 1;
+ let mut rollback_size: u64 = 0;
+
+ // We are NOT contiguous, so check if we can become contiguous with a rollback.
+ debug!("Detected non-contiguous block, rolling back. Fork: {fork_count}");
+
+ // First check if the previous is >= the earliest block in the live chain.
+ // This is because when we start syncing we could rollback earlier than our
+ // previously known earliest block.
+ // Also check the point we want to link to actually exists. If either are not true,
+ // Then we could be trying to roll back to an earlier block than our earliest known
+ // block.
+ let check_first_live_point = Self::get_first_live_point(&live_chain)?;
+ if (block.point() < check_first_live_point)
+ || !Self::strict_block_lookup(&live_chain, &previous_point)
+ {
+ debug!("Rollback before live chain, clear it.");
+ // We rolled back earlier than the current live chain.
+ // Purge the entire chain, and just add this one block as the new tip.
+ rollback_size = live_chain.len() as u64;
+ live_chain.clear();
+ } else {
+ // If we get here we know for a fact that the previous block exists.
+ // Remove the latest live block, and keep removing it until we re-establish
+ // connection with the chain sequence.
+ // We search backwards because a rollback is more likely in the newest blocks than
+ // the oldest.
+ while let Some(popped) = live_chain.pop_back() {
+ rollback_size += 1;
+ if previous_point.strict_eq(&popped.value().previous()) {
+ // We are now contiguous, so stop purging.
+ break;
+ }
+ }
+ }
+
+ // Record a rollback statistic (We record the ACTUAL size our rollback effected our
+ // internal live chain, not what the node thinks.)
+ stats::rollback(chain, stats::RollbackType::LiveChain, rollback_size);
+ }
+
+ let head_slot = block.point().slot_or_default();
+
+ // Add the block to the tip of the Live Chain.
+ let _unused = live_chain.insert(block.point(), block);
+
+ let tip_slot = tip.slot_or_default();
+ update_peer_tip(chain, tip);
+
+ // Record the new live chain stats after we add a new block.
+ stats::new_live_block(chain, live_chain.len() as u64, head_slot, tip_slot);
+
+ Ok(())
+ }
+
+ /// Checks if the point exists in the live chain.
+ /// If it does, removes all block preceding it (but not the point itself).
+ /// Will refuse to purge if the point is not the TIP of the mithril chain.
+ fn purge(&self, chain: Network, point: &Point) -> Result<()> {
+ // Make sure our first live block == the last mithril tip.
+ // Ensures we are properly connected to the Mithril Chain.
+ // But don't check this if we are about to purge the entire chain.
+ // We do this before we bother locking the chain for update.
+ if *point != TIP_POINT {
+ let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
+ if !point.strict_eq(&latest_mithril_tip) {
+ return Err(Error::LiveSync(format!(
+ "First Block of Live Purge {point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
+ )));
+ }
+ }
+
+ let live_chain = self.0.write().map_err(|_| Error::Internal)?;
+
+ // Special Case.
+ // If the Purge Point == TIP_POINT, then we purge the entire chain.
+ if *point == TIP_POINT {
+ live_chain.clear();
+ } else {
+ // If the block we want to purge upto must be in the chain.
+ let Some(purge_start_block_entry) = live_chain.get(point) else {
+ return Err(Error::LiveSync(format!(
+ "The block to purge to {point} is not in the Live chain."
+ )));
+ };
+
+ // Make sure the block that IS present, is the actual block, by strict equality.
+ if !purge_start_block_entry.value().point().strict_eq(point) {
+ return Err(Error::LiveSync(format!(
+ "The block to purge to {point} slot is in the live chain, but its hashes do not match."
+ )));
+ }
+
+ // Purge every block prior to the purge point.
+ while let Some(previous_block) = purge_start_block_entry.prev() {
+ let _unused = previous_block.remove();
+ }
+
+ // Try and FORCE the skip map to reclaim its memory
+ crossbeam_epoch::pin().flush();
+ crossbeam_epoch::pin().flush();
+ }
+
+ Ok(())
+ }
+
+ /// Get the current number of blocks in the live chain
+ fn len(&self) -> usize {
+ if let Ok(chain) = self.0.read() {
+ chain.len()
+ } else {
+ 0
+ }
+ }
+
+ /// Get chain sync intersection points for communicating with peer node.
+ fn get_intersect_points(&self) -> Vec {
+ let mut intersect_points = Vec::new();
+
+ let Ok(chain) = self.0.read() else {
+ return intersect_points;
+ };
+
+ // Add the top 4 blocks as the first points to intersect.
+ let Some(entry) = chain.back() else {
+ return intersect_points;
+ };
+ intersect_points.push(entry.value().point().into());
+ for _ in 0..2 {
+ if let Some(entry) = entry.prev() {
+ intersect_points.push(entry.value().point().into());
+ } else {
+ return intersect_points;
+ };
+ }
+
+ // Now find points based on an every increasing Slot age.
+ let mut slot_age: u64 = 40;
+ let reference_slot = entry.value().point().slot_or_default();
+ let mut previous_point = entry.value().point();
+
+ // Loop until we exhaust probe slots, OR we would step past genesis.
+ while slot_age < reference_slot {
+ let ref_point = Point::fuzzy(reference_slot - slot_age);
+ let Some(entry) = chain.lower_bound(Bound::Included(&ref_point)) else {
+ break;
+ };
+ if entry.value().point() == previous_point {
+ break;
+ };
+ previous_point = entry.value().point();
+ intersect_points.push(previous_point.clone().into());
+ slot_age *= 2;
+ }
+
+ intersect_points
+ }
+
+ /// Given a known point on the live chain, and a fork count, find the best block we
+ /// have.
+ fn find_best_fork_block(
+ &self, point: &Point, previous_point: &Point, fork: u64,
+ ) -> Option<(MultiEraBlock, u64)> {
+ let mut rollback_depth: u64 = 0;
+ let Ok(chain) = self.0.read() else {
+ return None;
+ };
+
+ // Get the block <= the current slot.
+ let ref_point = Point::fuzzy(point.slot_or_default());
+ let mut entry = chain.upper_bound(Bound::Included(&ref_point))?;
+
+ let mut this_block = entry.value().clone();
+ // Check if the previous block is the one we previously knew, and if so, thats the best
+ // block.
+ if this_block.point().strict_eq(previous_point) {
+ return Some((this_block, rollback_depth));
+ }
+
+ // Search backwards for a fork smaller than or equal to the one we know.
+ while this_block.fork() > fork {
+ rollback_depth += 1;
+ entry = match entry.prev() {
+ Some(entry) => entry,
+ None => return None,
+ };
+
+ this_block = entry.value().clone();
+ }
+
+ Some((this_block, rollback_depth))
+ }
+
+ /// Get the point of the block at the head of the live chain.
+ fn get_live_head_point(&self) -> Option {
+ let live_chain = self.0.read().map_err(|_| Error::Internal).ok()?;
+
+ let head_point = Self::get_last_live_point(&live_chain);
+ if head_point == UNKNOWN_POINT {
+ return None;
+ }
+
+ Some(head_point)
+ }
+}
+
+/// Get the `LiveChainBlockList` for a particular `Network`.
+fn get_live_chain(chain: Network) -> ProtectedLiveChainBlockList {
+ // Get a reference to our live chain storage.
+ // This SHOULD always exist, because its initialized exhaustively.
+ // If this FAILS, Recreate a blank chain, but log an error as its a serious UNRECOVERABLE
+ // BUG.
+ let entry = if let Some(entry) = LIVE_CHAINS.get(&chain) {
+ entry
+ } else {
+ error!(
+ chain = chain.to_string(),
+ "Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS"
+ );
+
+ // Try and correct the error.
+ LIVE_CHAINS.insert(chain, ProtectedLiveChainBlockList::new());
+
+ // This should NOT fail, because we just inserted it, its catastrophic failure if it does.
+ #[allow(clippy::expect_used)]
+ LIVE_CHAINS
+ .get(&chain)
+ .expect("Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS")
+ };
+
+ let value = entry.value();
+ value.clone()
+}
+
+/// Get the head `Point` currently in the live chain.
+pub(crate) fn get_live_head_point(chain: Network) -> Option {
+ let live_chain = get_live_chain(chain);
+ live_chain.get_live_head_point()
+}
+
+/// Get the Live block relative to the specified point.
+/// The starting block must exist if the search is strict.
+pub(crate) fn get_live_block(
+ chain: Network, point: &Point, advance: i64, strict: bool,
+) -> Option {
+ let live_chain = get_live_chain(chain);
+ live_chain.get_block(point, advance, strict)
+}
+
+/// Get the fill tp point for a chain.
+///
+/// Returns the Point of the block we are filling up-to, and it's fork count.
+///
+/// Note: It MAY change between calling this function and actually backfilling.
+/// This is expected and normal behavior.
+pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, u64) {
+ let live_chain = get_live_chain(chain);
+
+ loop {
+ if let Some(earliest_block) = live_chain.get_earliest_block() {
+ return (earliest_block.point(), earliest_block.fork());
+ }
+ // Nothing in the Live chain to sync to, so wait until there is.
+ tokio::time::sleep(Duration::from_secs(DATA_RACE_BACKOFF_SECS)).await;
+ }
+}
+
+/// Insert a block into the live chain (in-order).
+/// Can ONLY be used to add a new tip block to the live chain.
+/// `rollback_count` should be set to 1 on the very first connection, after that,
+/// it is maintained by this function, and MUST not be modified elsewhere.
+pub(crate) fn live_chain_add_block_to_tip(
+ chain: Network, block: MultiEraBlock, fork_count: &mut u64, tip: Point,
+) -> Result<()> {
+ let live_chain = get_live_chain(chain);
+ live_chain.add_block_to_tip(chain, block, fork_count, tip)
+}
+
+/// Backfill the live chain with the block set provided.
+pub(crate) fn live_chain_backfill(chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
+ let live_chain = get_live_chain(chain);
+ live_chain.backfill(chain, blocks)
+}
+
+/// Get the length of the live chain.
+/// Probably used by debug code only, so its ok if this is not use.
+pub(crate) fn live_chain_length(chain: Network) -> usize {
+ let live_chain = get_live_chain(chain);
+ live_chain.len()
+}
+
+/// On an immutable update, purge the live-chain up to the new immutable tip.
+/// Will error if the point is not in the Live chain.
+pub(crate) fn purge_live_chain(chain: Network, point: &Point) -> Result<()> {
+ let live_chain = get_live_chain(chain);
+ live_chain.purge(chain, point)
+}
+
+/// Get intersection points to try and find best point to connect to the node on
+/// reconnect.
+pub(crate) fn get_intersect_points(chain: Network) -> Vec {
+ let live_chain = get_live_chain(chain);
+ live_chain.get_intersect_points()
+}
+
+/// Find best block from a fork relative to a point.
+pub(crate) fn find_best_fork_block(
+ chain: Network, point: &Point, previous_point: &Point, fork: u64,
+) -> Option<(MultiEraBlock, u64)> {
+ let live_chain = get_live_chain(chain);
+ live_chain.find_best_fork_block(point, previous_point, fork)
+}
diff --git a/rust/cardano-chain-follower/src/chain_sync_ready.rs b/rust/cardano-chain-follower/src/chain_sync_ready.rs
new file mode 100644
index 00000000000..4122a56d111
--- /dev/null
+++ b/rust/cardano-chain-follower/src/chain_sync_ready.rs
@@ -0,0 +1,170 @@
+//! Flag to control if chain sync for a blockchain is ready.
+//! Can not consume the blockchain data until it is.
+
+use std::{sync::LazyLock, time::Duration};
+
+use dashmap::DashMap;
+use strum::IntoEnumIterator;
+use tokio::{
+ sync::{broadcast, oneshot, RwLock},
+ time::sleep,
+};
+use tracing::error;
+
+use crate::{chain_update, Network};
+
+/// Data we hold related to sync being ready or not.
+struct SyncReady {
+ /// MPMC Receive queue for Blockchain Updates
+ rx: broadcast::Receiver,
+ /// MPMC Transmit queue for Blockchain Updates
+ tx: broadcast::Sender,
+ /// Sync is ready flag. (Prevents data race conditions)
+ ready: bool,
+}
+
+impl SyncReady {
+ /// Create a new `SyncReady` state.
+ fn new() -> Self {
+ // Can buffer up to 3 update messages before lagging.
+ let (tx, rx) = broadcast::channel::(3);
+ Self {
+ tx,
+ rx,
+ ready: false,
+ }
+ }
+}
+
+/// Sand a chain update to any subscribers that are listening.
+pub(crate) fn notify_follower(
+ chain: Network, update_sender: &Option>,
+ kind: &chain_update::Kind,
+) {
+ if let Some(update_sender) = update_sender {
+ if let Err(error) = update_sender.send(kind.clone()) {
+ error!(
+ chain = chain.to_string(),
+ "Failed to broadcast the Update {kind} : {error}"
+ );
+ }
+ }
+}
+
+/// Waiter for sync to become ready, use `signal` when it is.
+pub(crate) struct SyncReadyWaiter {
+ /// The oneshot queue we use to signal ready.
+ signal: Option>,
+}
+
+impl SyncReadyWaiter {
+ /// Create a new `SyncReadyWaiter` state.
+ pub(crate) fn signal(&mut self) {
+ if let Some(signaler) = self.signal.take() {
+ if let Err(error) = signaler.send(()) {
+ error!("sync ready waiter signal should not fail: {error:?}");
+ }
+ } else {
+ error!("sync ready waiter signal should not be called more than once.");
+ }
+ }
+}
+
+/// Lock to prevent using any blockchain data for a network UNTIL it is synced to TIP.
+/// Pre-initialized for all possible blockchains, so it's safe to use `expect` to access a
+/// value.
+static SYNC_READY: LazyLock>> = LazyLock::new(|| {
+ let map = DashMap::new();
+ for network in Network::iter() {
+ map.insert(network, RwLock::new(SyncReady::new()));
+ }
+ map
+});
+
+/// Write Lock the `SYNC_READY` lock for a network.
+/// When we are signaled to be ready, set it to true and release the lock.
+pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ tokio::spawn(async move {
+ // We are safe to use `expect` here because the SYNC_READY list is exhaustively
+ // initialized. Its a Serious BUG if that not True, so panic is OK.
+ #[allow(clippy::expect_used)]
+ let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
+
+ let lock = lock_entry.value();
+
+ let mut status = lock.write().await;
+
+ // If we successfully get told to unlock, we do.
+ if let Ok(()) = rx.await {
+ status.ready = true;
+ }
+
+ // If the channel closes early, we can NEVER use the Blockchain data.
+ });
+
+ SyncReadyWaiter { signal: Some(tx) }
+}
+
+/// Get a Read lock on the Sync State, and return if we are ready or not.
+async fn check_sync_ready(chain: Network) -> bool {
+ // We are safe to use `expect` here because the SYNC_READY list is exhaustively
+ // initialized. Its a Serious BUG if that not True, so panic is OK.
+ #[allow(clippy::expect_used)]
+ let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
+ let lock = lock_entry.value();
+
+ let status = lock.read().await;
+
+ // If the transmitter has not been taken, we are not really ready.
+ status.ready
+}
+
+/// Number of seconds to wait if we detect a `SyncReady` race condition.
+const SYNC_READY_RACE_BACKOFF_SECS: u64 = 1;
+
+/// Block until the chain is synced to TIP.
+/// This is necessary to ensure the Blockchain data is fully intact before attempting to
+/// consume it.
+pub(crate) async fn block_until_sync_ready(chain: Network) {
+ // There is a potential race where we haven't yet write locked the SYNC_READY lock when we
+ // check it. So, IF the ready state returns as false, sleep a while and try again.
+ while !check_sync_ready(chain).await {
+ sleep(Duration::from_secs(SYNC_READY_RACE_BACKOFF_SECS)).await;
+ }
+}
+
+/// Get the Broadcast Receive queue for the given chain updates.
+pub(crate) async fn get_chain_update_rx_queue(
+ chain: Network,
+) -> broadcast::Receiver {
+ // We are safe to use `expect` here because the SYNC_READY list is exhaustively
+ // initialized. Its a Serious BUG if that not True, so panic is OK.
+ #[allow(clippy::expect_used)]
+ let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
+
+ let lock = lock_entry.value();
+
+ let status = lock.read().await;
+
+ status.rx.resubscribe()
+}
+
+/// Get the Broadcast Transmit queue for the given chain updates.
+pub(crate) async fn get_chain_update_tx_queue(
+ chain: Network,
+) -> Option> {
+ // We are safe to use `expect` here because the SYNC_READY list is exhaustively
+ // initialized. Its a Serious BUG if that not True, so panic is OK.
+ #[allow(clippy::expect_used)]
+ let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
+
+ let lock = lock_entry.value();
+
+ if let Ok(status) = lock.try_read() {
+ return Some(status.tx.clone());
+ }
+
+ None
+}
diff --git a/rust/cardano-chain-follower/src/chain_update.rs b/rust/cardano-chain-follower/src/chain_update.rs
new file mode 100644
index 00000000000..044982b96f8
--- /dev/null
+++ b/rust/cardano-chain-follower/src/chain_update.rs
@@ -0,0 +1,63 @@
+//! An update of a blockchain
+
+use std::fmt::Display;
+
+use strum::Display;
+
+use crate::multi_era_block_data::MultiEraBlock;
+
+/// Enum of chain updates received by the follower.
+#[derive(Debug, Clone, Display, PartialEq)]
+pub enum Kind {
+ /// A new part of the chain has become immutable (Roll-forward).
+ ImmutableBlockRollForward,
+ /// New block inserted on chain.
+ Block,
+ /// Chain rollback to the given block.
+ Rollback,
+}
+
+/// Actual Chain Update itself.
+#[derive(Clone, Debug)]
+pub struct ChainUpdate {
+ /// What kind of update is this?
+ pub kind: Kind,
+ /// Is this the tip of the chain?
+ pub tip: bool,
+ /// What is the new data?
+ pub data: MultiEraBlock,
+}
+
+impl ChainUpdate {
+ /// Creates a new chain update.
+ #[must_use]
+ pub fn new(kind: Kind, tip: bool, data: MultiEraBlock) -> Self {
+ Self { kind, tip, data }
+ }
+
+ /// Gets the chain update's block data.
+ #[must_use]
+ pub fn block_data(&self) -> &MultiEraBlock {
+ &self.data
+ }
+
+ /// Gets the chain update's block data.
+ #[must_use]
+ pub fn immutable(&self) -> bool {
+ self.data.immutable()
+ }
+}
+
+impl Display for ChainUpdate {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let block_type = self.kind.to_string();
+ let mut tip: String = String::new();
+ if self.tip {
+ tip = " @ Tip".to_string();
+ }
+
+ write!(f, "{block_type}{tip} : {}", self.data)?;
+
+ Ok(())
+ }
+}
diff --git a/rust/cardano-chain-follower/src/data/Readme.md b/rust/cardano-chain-follower/src/data/Readme.md
new file mode 100644
index 00000000000..f2012dfaadc
--- /dev/null
+++ b/rust/cardano-chain-follower/src/data/Readme.md
@@ -0,0 +1,15 @@
+# Data files used by the Follower Crate
+
+## Mithril signature genesis keys
+
+These keys are required to validate mithril signatures for each respective Cardano network.
+
+| File | Network | Source |
+| --- | --- | --- |
+| `mainnet-genesis.vkey` | Main network. | [mainnet-genesis.vkey] |
+| `preprod-genesis.vkey` | Pre-production network. | [preprod-genesis.vkey] |
+| `preview-genesis.vkey` | Preview network. | [preview-genesis.vkey] |
+
+[mainnet-genesis.vkey]: https://raw.githubusercontent.com/input-output-hk/mithril/main/mithril-infra/configuration/release-mainnet/genesis.vkey
+[preprod-genesis.vkey]: https://raw.githubusercontent.com/input-output-hk/mithril/main/mithril-infra/configuration/release-preprod/genesis.vkey
+[preview-genesis.vkey]: https://raw.githubusercontent.com/input-output-hk/mithril/main/mithril-infra/configuration/pre-release-preview/genesis.vkey
diff --git a/rust/cardano-chain-follower/src/data/mainnet-genesis.vkey b/rust/cardano-chain-follower/src/data/mainnet-genesis.vkey
new file mode 100644
index 00000000000..4bbe653ffe7
--- /dev/null
+++ b/rust/cardano-chain-follower/src/data/mainnet-genesis.vkey
@@ -0,0 +1 @@
+5b3139312c36362c3134302c3138352c3133382c31312c3233372c3230372c3235302c3134342c32372c322c3138382c33302c31322c38312c3135352c3230342c31302c3137392c37352c32332c3133382c3139362c3231372c352c31342c32302c35372c37392c33392c3137365d
\ No newline at end of file
diff --git a/rust/cardano-chain-follower/src/data/preprod-genesis.vkey b/rust/cardano-chain-follower/src/data/preprod-genesis.vkey
new file mode 100644
index 00000000000..575154ce701
--- /dev/null
+++ b/rust/cardano-chain-follower/src/data/preprod-genesis.vkey
@@ -0,0 +1 @@
+5b3132372c37332c3132342c3136312c362c3133372c3133312c3231332c3230372c3131372c3139382c38352c3137362c3139392c3136322c3234312c36382c3132332c3131392c3134352c31332c3233322c3234332c34392c3232392c322c3234392c3230352c3230352c33392c3233352c34345d
\ No newline at end of file
diff --git a/rust/cardano-chain-follower/src/data/preview-genesis.vkey b/rust/cardano-chain-follower/src/data/preview-genesis.vkey
new file mode 100644
index 00000000000..575154ce701
--- /dev/null
+++ b/rust/cardano-chain-follower/src/data/preview-genesis.vkey
@@ -0,0 +1 @@
+5b3132372c37332c3132342c3136312c362c3133372c3133312c3231332c3230372c3131372c3139382c38352c3137362c3139392c3136322c3234312c36382c3132332c3131392c3134352c31332c3233322c3234332c34392c3232392c322c3234392c3230352c3230352c33392c3233352c34345d
\ No newline at end of file
diff --git a/rust/cardano-chain-follower/src/error.rs b/rust/cardano-chain-follower/src/error.rs
new file mode 100644
index 00000000000..27218bd8265
--- /dev/null
+++ b/rust/cardano-chain-follower/src/error.rs
@@ -0,0 +1,110 @@
+//! Library Crates Defined Errors
+
+use std::{io, path::PathBuf};
+
+use pallas::network::miniprotocols::chainsync;
+use thiserror::Error;
+
+use crate::network::Network;
+
+/// Crate error type.
+#[derive(Debug, Error)]
+pub enum Error {
+ /// Data encoding/decoding error.
+ #[error("Codec error: {0:?}")]
+ Codec(String),
+ /// Client connection error.
+ #[error("Client error: {0:?}")]
+ Client(pallas::network::facades::Error),
+ /// Blockfetch protocol error.
+ #[error("Blockfetch error: {0:?}")]
+ Blockfetch(pallas::network::miniprotocols::blockfetch::ClientError),
+ /// Chainsync protocol error.
+ #[error("Chainsync error: {0:?}")]
+ Chainsync(chainsync::ClientError),
+ /// Backfill Synch error.
+ #[error("Backfill Sync error: {0}")]
+ BackfillSync(String),
+ /// Live Sync error.
+ #[error("Live Sync error: {0:?}")]
+ LiveSync(String),
+ /// Follower failed to set its read pointer.
+ #[error("Failed to set follower read pointer")]
+ SetReadPointer,
+ /// Follower background follow task has stopped.
+ #[error("Follower follow task is not running")]
+ FollowTaskNotRunning,
+ /// Chain Sync already running error.
+ #[error("Chain Sync already running for network: {0}")]
+ ChainSyncAlreadyRunning(Network),
+ /// Mithril snapshot already running error.
+ #[error("Mithril Snapshot Sync already running for network: {0}")]
+ MithrilSnapshotSyncAlreadyRunning(Network),
+ /// Mithril snapshot error.
+ #[error("Failed to read block(s) from Mithril snapshot")]
+ MithrilSnapshot(Option),
+ /// Mithril snapshot chunk error.
+ #[error("Failed to read block(s) from Mithril snapshot")]
+ MithrilSnapshotChunk(pallas_hardano::storage::immutable::chunk::Error),
+ /// Mithril snapshot traversal error.
+ #[error("Failed to traverse block(s) from Mithril snapshot")]
+ MithrilSnapshotTraverse(pallas::ledger::traverse::Error),
+ /// Failed to parse
+ #[error("Failed to parse network")]
+ ParseNetwork,
+ /// Mithril Snapshot path is not a directory
+ #[error("Mithril Snapshot path `{0}` is not a directory")]
+ MithrilSnapshotDirectoryNotFound(String),
+ /// Mithril Snapshot path is already configured for another network
+ #[error("Mithril Snapshot path `{0}` is already configured for network `{1}`")]
+ MithrilSnapshotDirectoryAlreadyConfiguredForNetwork(PathBuf, Network),
+ /// Mithril Snapshot path is already configured for this network
+ #[error("Mithril Snapshot path `{0}` is already configured as `{1}`")]
+ MithrilSnapshotDirectoryAlreadyConfigured(PathBuf, PathBuf),
+ /// Mithril Snapshot path not configured, trying to start auto-update
+ #[error("Mithril Snapshot path is not configured. Can not start Auto Snapshot Update.")]
+ MithrilSnapshotDirectoryNotConfigured,
+ /// Mithril snapshot directory failed to be created.
+ #[error("Mithril Snapshot path `{0}` does not exist, and could not be created. `{1}`")]
+ MithrilSnapshotDirectoryCreation(PathBuf, io::Error),
+ /// Mithril snapshot directory is not writable and we need to be able to update the
+ /// snapshot data.
+ #[error("Mithril Snapshot path `{0}` is not writable, or contains read-only files.")]
+ MithrilSnapshotDirectoryNotWritable(PathBuf),
+ /// Mithril aggregator URL is already defined for a network.
+ #[error("Mithril Aggregator URL `{0}` is already configured as `{1}`")]
+ MithrilAggregatorURLAlreadyConfigured(String, String),
+ /// Mithril aggregator URL is already defined for a network.
+ #[error("Mithril Aggregator URL `{0}` is already configured for network `{1}`")]
+ MithrilAggregatorURLAlreadyConfiguredForNetwork(String, Network),
+ /// Mithril aggregator URL is not a valid URL
+ #[error("Mithril Aggregator URL `{0}` is not a valid URL: `{1}`")]
+ MithrilAggregatorURLParse(String, url::ParseError),
+ /// General Mithril Client Error
+ #[error("Mithril Client Error for {0} @ {1}: {2}")]
+ MithrilClient(Network, String, anyhow::Error),
+ /// General Mithril Index DB Error
+ #[error("Mithril Index DB Error for {0}: {1}")]
+ MithrilIndexDB(Network, anyhow::Error),
+ /// Mithril Aggregator has no Snapshots
+ #[error("Mithril Aggregator does not list any Mithril Snapshots for {0} @ {1}")]
+ MithrilClientNoSnapshots(Network, String),
+ /// Mithril Aggregator mismatch
+ #[error("Mithril Aggregator network mismatch. Wanted {0} Got {1}")]
+ MithrilClientNetworkMismatch(Network, String),
+ /// Mithril genesis VKEY Mismatch
+ #[error("Mithril Genesis VKEY for Network {0} is already set, and can not be changed to a different value.")]
+ MithrilGenesisVKeyMismatch(Network),
+ /// Mithril genesis VKEY is not properly HEX Encoded
+ #[error("Mithril Genesis VKEY for Network {0} is not hex encoded. Needs to be only HEX Ascii characters, and even length.")]
+ MithrilGenesisVKeyNotHex(Network),
+ /// Mithril Auto-update requires an Aggregator and a VKEY and a Path
+ #[error("Mithril Auto Update Network {0} failed to start. No Aggregator and/or Genesis VKEY and/or Path are configured.")]
+ MithrilUpdateRequiresAggregatorAndVkeyAndPath(Network),
+ /// Internal Error
+ #[error("Internal error")]
+ Internal,
+}
+
+/// Crate result type.
+pub type Result = std::result::Result;
diff --git a/rust/cardano-chain-follower/src/follow.rs b/rust/cardano-chain-follower/src/follow.rs
index b92a79ee763..fc8c1ce348f 100644
--- a/rust/cardano-chain-follower/src/follow.rs
+++ b/rust/cardano-chain-follower/src/follow.rs
@@ -1,771 +1,428 @@
//! Cardano chain follow module.
-use std::{future::Future, path::PathBuf};
-
-use pallas::network::{facades::PeerClient, miniprotocols::Point};
-use tokio::{
- sync::{mpsc, oneshot},
- task::JoinHandle,
-};
+use pallas::network::miniprotocols::txmonitor::{TxBody, TxId};
+use tokio::sync::broadcast::{self};
+use tracing::{debug, error};
use crate::{
- mithril_snapshot::MithrilSnapshot, Error, MultiEraBlockData, Network, PointOrTip, Result,
+ chain_sync::point_at_tip,
+ chain_sync_live_chains::{find_best_fork_block, get_live_block, live_chain_length},
+ chain_sync_ready::{block_until_sync_ready, get_chain_update_rx_queue},
+ chain_update::{self, ChainUpdate},
+ mithril_snapshot::MithrilSnapshot,
+ mithril_snapshot_data::latest_mithril_snapshot_id,
+ mithril_snapshot_iterator::MithrilSnapshotIterator,
+ network::Network,
+ point::{TIP_POINT, UNKNOWN_POINT},
+ stats::{self, rollback},
+ MultiEraBlock, Point, Statistics,
};
-/// Default [`Follower`] block buffer size.
-const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;
-
-/// Enum of chain updates received by the follower.
-pub enum ChainUpdate {
- /// New block inserted on chain.
- Block(MultiEraBlockData),
- /// Chain rollback to the given block.
- Rollback(MultiEraBlockData),
-}
-
-impl ChainUpdate {
- /// Gets the chain update's block data.
- #[must_use]
- pub fn block_data(&self) -> &MultiEraBlockData {
- match self {
- ChainUpdate::Block(block_data) | ChainUpdate::Rollback(block_data) => block_data,
- }
- }
-}
-
-/// Builder used to create [`FollowerConfig`]s.
-pub struct FollowerConfigBuilder {
- /// Block buffer size option.
- chain_update_buffer_size: usize,
- /// Where to start following from.
- follow_from: PointOrTip,
- /// Path to the Mithril snapshot the follower should use.
- mithril_snapshot_path: Option,
-}
-
-impl Default for FollowerConfigBuilder {
- fn default() -> Self {
- Self {
- chain_update_buffer_size: DEFAULT_CHAIN_UPDATE_BUFFER_SIZE,
- follow_from: PointOrTip::Tip,
- mithril_snapshot_path: None,
- }
- }
+/// The Chain Follower
+pub struct ChainFollower {
+ /// The Blockchain network we are following.
+ chain: Network,
+ /// Where we end following.
+ end: Point,
+ /// Block we processed most recently.
+ previous: Point,
+ /// Where we are currently in the following process.
+ current: Point,
+ /// What fork were we last on
+ fork: u64,
+ /// Mithril Snapshot
+ snapshot: MithrilSnapshot,
+ /// Mithril Snapshot Follower
+ mithril_follower: Option,
+ /// Mithril TIP Reached
+ mithril_tip: Option,
+ /// Live Block Updates
+ sync_updates: broadcast::Receiver,
}
-impl FollowerConfigBuilder {
- /// Sets the size of the chain updates buffer used by the [`Follower`].
+impl ChainFollower {
+ /// Follow a blockchain.
///
/// # Arguments
///
- /// * `chain_update_buffer_size`: Size of the chain updates buffer.
- #[must_use]
- pub fn chain_update_buffer_size(mut self, block_buffer_size: usize) -> Self {
- self.chain_update_buffer_size = block_buffer_size;
- self
- }
-
- /// Sets the point at which the follower will start following from.
+ /// * `chain` - The blockchain network to follow.
+ /// * `start` - The point or tip to start following from (inclusive).
+ /// * `end` - The point or tip to stop following from (inclusive).
///
- /// # Arguments
+ /// # Returns
///
- /// * `from`: Sync starting point.
- #[must_use]
- pub fn follow_from
(mut self, from: P) -> Self
- where P: Into {
- self.follow_from = from.into();
- self
- }
-
- /// Sets the path of the Mithril snapshot the [`Follower`] will use.
+ /// The Chain Follower that will return blocks in the requested range.
///
- /// # Arguments
+ /// # Notes
///
- /// * `path`: Mithril snapshot path.
- #[must_use]
- pub fn mithril_snapshot_path(mut self, path: PathBuf) -> Self {
- self.mithril_snapshot_path = Some(path);
- self
- }
-
- /// Builds a [`FollowerConfig`].
+ /// IF end < start, the follower will immediately yield no blocks.
+ /// IF end is TIP, then the follower will continue to follow even when TIP is reached.
+ /// Otherwise only blocks in the request range will be returned.
+ ///
+ /// Also, UNLIKE the blockchain itself, the only relevant information is the Slot#.
+ /// The Block hash is not considered.
+ /// If start is not an exact Slot#, then the NEXT Slot immediately following will be
+ /// the first block returned.
+ /// If the end is also not an exact Slot# with a block, then the last block will be
+ /// the one immediately proceeding it.
+ ///
+ /// To ONLY follow from TIP, set BOTH start and end to TIP.
#[must_use]
- pub fn build(self) -> FollowerConfig {
- FollowerConfig {
- chain_update_buffer_size: self.chain_update_buffer_size,
- follow_from: self.follow_from,
- mithril_snapshot_path: self.mithril_snapshot_path,
+ pub async fn new(chain: Network, start: Point, end: Point) -> Self {
+ let rx = get_chain_update_rx_queue(chain).await;
+
+ ChainFollower {
+ chain,
+ end,
+ previous: UNKNOWN_POINT,
+ current: start,
+ fork: 1, // This is correct, because Mithril is Fork 0.
+ snapshot: MithrilSnapshot::new(chain),
+ mithril_follower: None,
+ mithril_tip: None,
+ sync_updates: rx,
}
}
-}
-/// Configuration for the Cardano chain follower.
-#[derive(Clone)]
-pub struct FollowerConfig {
- /// Configured chain update buffer size.
- pub chain_update_buffer_size: usize,
- /// Where to start following from.
- pub follow_from: PointOrTip,
- /// Path to the Mithril snapshot the follower should use.
- pub mithril_snapshot_path: Option,
-}
+ /// If we can, get the next update from the mithril snapshot.
+ async fn next_from_mithril(&mut self) -> Option {
+ let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
-/// Information used to connect to a client.
-#[derive(Clone)]
-struct ClientConnectInfo {
- /// Node's address
- address: String,
- /// Network magic
- network: Network,
-}
+ if current_mithril_tip > self.current {
+ if self.mithril_follower.is_none() {
+ self.mithril_follower = self
+ .snapshot
+ .try_read_blocks_from_point(&self.current)
+ .await;
+ }
-/// Handler for receiving the read block response from the client.
-pub struct ReadBlock(tokio::task::JoinHandle>);
-
-impl Future for ReadBlock {
- type Output = Result;
-
- fn poll(
- mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll {
- let p = &mut self.0;
- // Using tokio pin instead of, e.g., pin-project because we use tokio as the async runtime
- // lib for this crate.
- tokio::pin!(p);
-
- match p.poll(cx) {
- std::task::Poll::Ready(res) => {
- match res {
- Ok(res) => std::task::Poll::Ready(res),
- Err(_) => std::task::Poll::Ready(Err(Error::InternalError)),
+ if let Some(follower) = self.mithril_follower.as_mut() {
+ if let Some(next) = follower.next().await {
+ // debug!("Pre Previous update 3 : {:?}", self.previous);
+ self.previous = self.current.clone();
+ // debug!("Post Previous update 3 : {:?}", self.previous);
+ self.current = next.point();
+ self.fork = 0; // Mithril Immutable data is always Fork 0.
+ let update = ChainUpdate::new(chain_update::Kind::Block, false, next);
+ return Some(update);
}
- },
- std::task::Poll::Pending => std::task::Poll::Pending,
+ }
}
- }
-}
-/// Handler for receiving the read block range response from the client.
-pub struct ReadBlockRange(tokio::task::JoinHandle>>);
-
-impl Future for ReadBlockRange {
- type Output = Result>;
-
- fn poll(
- mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll {
- let p = &mut self.0;
- // Using tokio pin instead of, e.g., pin-project because we use tokio as the async runtime
- // lib for this crate.
- tokio::pin!(p);
-
- match p.poll(cx) {
- std::task::Poll::Ready(res) => {
- match res {
- Ok(res) => std::task::Poll::Ready(res),
- Err(_) => std::task::Poll::Ready(Err(Error::InternalError)),
- }
- },
- std::task::Poll::Pending => std::task::Poll::Pending,
+ if (self.mithril_tip.is_none() || current_mithril_tip > self.mithril_tip)
+ && self.current < self.mithril_tip
+ {
+ let snapshot = MithrilSnapshot::new(self.chain);
+ if let Some(block) = snapshot.read_block_at(¤t_mithril_tip).await {
+ // The Mithril Tip has moved forwards.
+ self.mithril_tip = Some(current_mithril_tip);
+ // Get the mithril tip block.
+ let update =
+ ChainUpdate::new(chain_update::Kind::ImmutableBlockRollForward, false, block);
+ return Some(update);
+ }
+ error!(
+ tip = ?self.mithril_tip,
+ current = ?current_mithril_tip,
+ "Mithril Tip Block is not in snapshot. Should not happen."
+ );
}
+
+ None
}
-}
-/// Cardano chain follower.
-pub struct Follower {
- /// Client connection information.
- ///
- /// This is used to open more connections when needed.
- client_connect_info: ClientConnectInfo,
- /// Chain update receiver.
- chain_update_rx: mpsc::Receiver>,
- /// Follow task request sender.
- follow_task_request_tx: mpsc::Sender,
- /// Follow task thread join handle.
- follow_task_join_handle: JoinHandle<()>,
- /// Optional Mithril snapshot information.
- mithril_snapshot: Option,
-}
+ /// If we can, get the next update from the mithril snapshot.
+ async fn next_from_live_chain(&mut self) -> Option {
+ let mut next_block: Option = None;
+ let mut update_type = chain_update::Kind::Block;
+ let mut rollback_depth: u64 = 0;
+
+ // Special Case: point = TIP_POINT. Just return the latest block in the live chain.
+ if self.current == TIP_POINT {
+ next_block = {
+ let block = get_live_block(self.chain, &self.current, -1, false)?;
+ Some(block)
+ };
+ }
-impl Follower {
- /// Connects the follower to a producer using the node-to-node protocol.
- ///
- /// # Arguments
- ///
- /// * `address`: Address of the node to connect to.
- /// * `network`: The [Network] the client is assuming it's connecting to.
- /// * `config`: Follower's configuration (see [`FollowerConfigBuilder`]).
- ///
- /// # Errors
- ///
- /// Returns Err if the connection could not be established.
- pub async fn connect(address: &str, network: Network, config: FollowerConfig) -> Result {
- let mut client = PeerClient::connect(address, network.into())
- .await
- .map_err(Error::Client)?;
-
- let Some(follow_from) = set_client_read_pointer(&mut client, config.follow_from).await?
- else {
- return Err(Error::SetReadPointer);
- };
-
- let mithril_snapshot = if let Some(path) = config.mithril_snapshot_path {
- Some(MithrilSnapshot::from_path(path)?)
- } else {
- None
- };
-
- let connect_info = ClientConnectInfo {
- address: address.to_string(),
- network,
- };
-
- let (task_request_tx, chain_update_rx, task_join_handle) = task::FollowTask::spawn(
- client,
- connect_info,
- mithril_snapshot.clone(),
- config.chain_update_buffer_size,
- follow_from,
- );
+ // In most cases we will be able to get the next block.
+ if next_block.is_none() {
+ // If we don't know the previous block, get the block requested.
+ let advance = i64::from(!self.previous.is_unknown());
+ next_block = get_live_block(self.chain, &self.current, advance, true);
+ }
+
+ // If we can't get the next consecutive block, then
+ // Get the best previous block.
+ if next_block.is_none() {
+ debug!("No blocks left in live chain.");
+
+ // IF this is an update still, and not us having caught up, then it WILL be a rollback.
+ update_type = chain_update::Kind::Rollback;
+ next_block = if let Some((block, depth)) =
+ find_best_fork_block(self.chain, &self.current, &self.previous, self.fork)
+ {
+ debug!("Found fork block: {block}");
+ // IF the block is the same as our current previous, there has been no chain
+ // advancement, so just return None.
+ if block.point().strict_eq(&self.current) {
+ None
+ } else {
+ rollback_depth = depth;
+ Some(block)
+ }
+ } else {
+ debug!("No block to find, rewinding to latest mithril tip.");
+ let latest_mithril_point = latest_mithril_snapshot_id(self.chain).tip();
+ if let Some(block) = MithrilSnapshot::new(self.chain)
+ .read_block_at(&latest_mithril_point)
+ .await
+ {
+ rollback_depth = live_chain_length(self.chain) as u64;
+ Some(block)
+ } else {
+ return None;
+ }
+ }
+ }
- let client_connect_info = ClientConnectInfo {
- address: address.to_string(),
- network,
- };
-
- Ok(Self {
- client_connect_info,
- chain_update_rx,
- follow_task_request_tx: task_request_tx,
- follow_task_join_handle: task_join_handle,
- mithril_snapshot,
- })
+ if let Some(next_block) = next_block {
+ // Update rollback stats for the follower if one is reported.
+ if update_type == chain_update::Kind::Rollback {
+ rollback(self.chain, stats::RollbackType::Follower, rollback_depth);
+ }
+ // debug!("Pre Previous update 4 : {:?}", self.previous);
+ self.previous = self.current.clone();
+ // debug!("Post Previous update 4 : {:?}", self.previous);
+ self.current = next_block.point().clone();
+ self.fork = next_block.fork();
+
+ let tip = point_at_tip(self.chain, &self.current).await;
+ let update = ChainUpdate::new(update_type, tip, next_block);
+ return Some(update);
+ }
+
+ None
}
- /// Set the follower's chain read-pointer. Returns None if the point was
- /// not found on the chain.
- ///
- /// # Arguments
- ///
- /// * `at`: Point at which to set the read-pointer.
- ///
- /// # Errors
- ///
- /// Returns Err if something went wrong while communicating with the producer.
- pub async fn set_read_pointer