Skip to content

Commit

Permalink
fix(cardano-chain-follower): Refactor cardano-chain-follower to fix c…
Browse files Browse the repository at this point in the history
…lient concurrency bug (#182)

* fix(cardano-chain-follower): Refactor cardano-chain-follower to fix client concurrency bug

* chore(cardano-chain-follower): fix fmt

* chore(cardano-chain-follower): fix spelling

* chore(cardano-chain-follower): Add set read pointer example in cardano-chain-follower

* chore(cardano-chain-follower): Add missing docs for struct fields

* chore(cardano-chain-follower): Remove read task in favor of spawning read block and read block range futures

* chore(cardano-chain-follower): Fix set_read_pointer example

* chore(cardano-chain-follower): Fix examples docs

* chore(cardano-chain-follower): Add example showing how to execute concurrent reads

* chore(cardano-chain-follower): Update architecture docs

* Update hermes/crates/cardano-chain-follower/src/follow.rs

* Update hermes/crates/cardano-chain-follower/examples/set_read_pointer.rs

---------

Co-authored-by: Steven Johnson <stevenj@users.noreply.github.com>
  • Loading branch information
FelipeRosa and stevenj committed Mar 18, 2024
1 parent 9e17eaa commit a9516fc
Show file tree
Hide file tree
Showing 10 changed files with 640 additions and 618 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ The chain follower is capable of receiving chain updates from a Cardano node usi
### Read pointer

The read pointer points at the location the chain is being read by a client connection.
Although the Cardano node maintains a read pointer for each client, the chain follower manages
its own copy of the read pointer in order to follow the chain even when it's reading data from a Mithril snapshot.
The follower's read pointer gets updated every time it receives a chain update.

### Chain Updates

Expand Down
78 changes: 78 additions & 0 deletions hermes/crates/cardano-chain-follower/examples/concurrent_reads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain concurrently.

use std::error::Error;

use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

let config = FollowerConfigBuilder::default().build();

let follower = Follower::connect(
"relays-new.cardano-mainnet.iohk.io:3001",
Network::Mainnet,
config,
)
.await?;

let points = vec![
Point::Specific(
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
),
Point::Specific(
110_908_582,
hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
),
];
let mut point_count = points.len();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

for p in points {
let slot_no = p.slot_or_default();
let r = follower.read_block(p);
let r_tx = tx.clone();

tokio::spawn(async move {
tracing::info!(slot_no, "Reading block");
let result = r.await;
drop(r_tx.send(result));
});
}

while let Some(result) = rx.recv().await {
let block_data = result?;
let block = block_data.decode()?;

let total_fee = block
.txs()
.iter()
.map(|tx| tx.fee().unwrap_or_default())
.sum::<u64>();

println!(
"Block {} (slot {}) => total fee: {total_fee}",
block.number(),
block.slot()
);

point_count -= 1;
if point_count == 0 {
break;
}
}

Ok(())
}
3 changes: 1 addition & 2 deletions hermes/crates/cardano-chain-follower/examples/read_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to download arbitrary blocks
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain.

use std::error::Error;
Expand Down Expand Up @@ -31,7 +31,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
))
.read()
.await?;

let block = data.decode()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to read arbitrary blocks
//! This example shows how to use the chain follower to read arbitrary blocks
//! from Mithril snapshot files.

// Allowing since this is example code.
Expand Down Expand Up @@ -40,7 +40,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
49_075_418,
hex::decode("bdb5ce7788850c30342794f252b1d955086862e8f7cb90a32a8f560b693ca78a")?,
))
.read()
.await?;

let block = data.decode()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to download arbitrary blocks
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain.

use std::error::Error;
Expand Down Expand Up @@ -37,7 +37,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
),
)
.read()
.await?;

let mut total_txs = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to read arbitrary blocks
//! This example shows how to use the chain follower to read arbitrary blocks
//! from Mithril snapshot files.

// Allowing since this is example code.
Expand Down Expand Up @@ -48,7 +48,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
hex::decode("b7639b523f320643236ab0fc04b7fd381dedd42c8d6b6433b5965a5062411396")?,
),
)
.read()
.await?;

for data in data_vec {
Expand Down
74 changes: 74 additions & 0 deletions hermes/crates/cardano-chain-follower/examples/set_read_pointer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! This example shows how to set the follower's read pointer without stopping it.

use std::error::Error;

use cardano_chain_follower::{ChainUpdate, Follower, FollowerConfigBuilder, Network, Point};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

// Defaults to start following from the tip.
let config = FollowerConfigBuilder::default().build();

let mut follower = Follower::connect(
"relays-new.cardano-mainnet.iohk.io:3001",
Network::Mainnet,
config,
)
.await?;

let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
let mut pointer_set = false;
tokio::spawn(async move {
let _tx = tx;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
});

loop {
tokio::select! {
_ = &mut rx, if !pointer_set => {
follower.set_read_pointer(Point::Specific(
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
)).await?;
println!("set read pointer");

pointer_set = true;
}

chain_update = follower.next() => {
match chain_update? {
ChainUpdate::Block(data) => {
let block = data.decode()?;

println!(
"New block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
ChainUpdate::Rollback(data) => {
let block = data.decode()?;

println!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
}
}
}
}
}

0 comments on commit a9516fc

Please sign in to comment.