Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cardano-chain-follower): Refactor cardano-chain-follower to fix client concurrency bug #182

Merged
merged 12 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ The chain follower is capable of receiving chain updates from a Cardano node usi
### Read pointer

The read pointer points at the location the chain is being read by a client connection.
Although the Cardano node maintains a read pointer for each client, the chain follower manages
its own copy of the read pointer in order to follow the chain even when it's reading data from a Mithril snapshot.
The follower's read pointer gets updated every time it receives a chain update.

### Chain Updates

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain concurrently.

use std::error::Error;

use cardano_chain_follower::{Follower, FollowerConfigBuilder, Network, Point};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

let config = FollowerConfigBuilder::default().build();

let follower = Follower::connect(
"relays-new.cardano-mainnet.iohk.io:3001",
Network::Mainnet,
config,
)
.await?;

let points = vec![
Point::Specific(
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
),
Point::Specific(
110_908_582,
hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
),
];
let mut point_count = points.len();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

for p in points {
let slot_no = p.slot_or_default();
let r = follower.read_block(p);
let r_tx = tx.clone();

tokio::spawn(async move {
tracing::info!(slot_no, "Reading block");
let result = r.await;
drop(r_tx.send(result));
});
}

while let Some(result) = rx.recv().await {
let block_data = result?;
let block = block_data.decode()?;

let total_fee = block
.txs()
.iter()
.map(|tx| tx.fee().unwrap_or_default())
.sum::<u64>();

println!(
"Block {} (slot {}) => total fee: {total_fee}",
block.number(),
block.slot()
);

point_count -= 1;
if point_count == 0 {
break;
}
}

Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to download arbitrary blocks
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain.

use std::error::Error;
Expand Down Expand Up @@ -31,7 +31,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
))
.read()
.await?;

let block = data.decode()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to read arbitrary blocks
//! This example shows how to use the chain follower to read arbitrary blocks
//! from Mithril snapshot files.

// Allowing since this is example code.
Expand Down Expand Up @@ -40,7 +40,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
49_075_418,
hex::decode("bdb5ce7788850c30342794f252b1d955086862e8f7cb90a32a8f560b693ca78a")?,
))
.read()
.await?;

let block = data.decode()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to download arbitrary blocks
//! This example shows how to use the chain follower to download arbitrary blocks
//! from the chain.

use std::error::Error;
Expand Down Expand Up @@ -37,7 +37,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
hex::decode("16e97a73e866280582ee1201a5e1815993978eede956af1869b0733bedc131f2")?,
),
)
.read()
.await?;

let mut total_txs = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This example shows how to use the chain reader to read arbitrary blocks
//! This example shows how to use the chain follower to read arbitrary blocks
//! from Mithril snapshot files.

// Allowing since this is example code.
Expand Down Expand Up @@ -48,7 +48,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
hex::decode("b7639b523f320643236ab0fc04b7fd381dedd42c8d6b6433b5965a5062411396")?,
),
)
.read()
.await?;

for data in data_vec {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! This example shows how to set the follower's read pointer without stopping it.

use std::error::Error;

use cardano_chain_follower::{ChainUpdate, Follower, FollowerConfigBuilder, Network, Point};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

// Defaults to start following from the tip.
let config = FollowerConfigBuilder::default().build();

let mut follower = Follower::connect(
"relays-new.cardano-mainnet.iohk.io:3001",
Network::Mainnet,
config,
)
.await?;

let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
let mut pointer_set = false;
tokio::spawn(async move {
let _tx = tx;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
});

loop {
tokio::select! {
_ = &mut rx, if !pointer_set => {
follower.set_read_pointer(Point::Specific(
110_908_236,
hex::decode("ad3798a1db2b6097c71f35609399e4b2ff834f0f45939803d563bf9d660df2f2")?,
)).await?;
println!("set read pointer");

pointer_set = true;
}

chain_update = follower.next() => {
match chain_update? {
ChainUpdate::Block(data) => {
let block = data.decode()?;

println!(
"New block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
ChainUpdate::Rollback(data) => {
let block = data.decode()?;

println!(
"Rollback block NUMBER={} SLOT={} HASH={}",
block.number(),
block.slot(),
hex::encode(block.hash()),
);
},
}
}
}
}
}