Skip to content

Commit

Permalink
Merge pull request #1513 from txpipe/feat/stake-distribution
Browse files Browse the repository at this point in the history
add pallas stake snapshots integration
  • Loading branch information
jpraynaud committed Feb 22, 2024
2 parents 17afb2c + bd8cbcd commit cff2c64
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 22 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-common/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "mithril-common"
version = "0.3.6"
version = "0.3.7"
description = "Common types, interfaces, and utilities for Mithril nodes."
authors = { workspace = true }
edition = { workspace = true }
Expand Down
208 changes: 192 additions & 16 deletions mithril-common/src/chain_observer/pallas_observer.rs
@@ -1,18 +1,24 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use bech32::{self, ToBase32, Variant};
use pallas_addresses::Address;
use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
use pallas_network::{
facades::NodeClient,
miniprotocols::localstate::{
queries_v16::{
self, Addr, Addrs, PostAlonsoTransactionOutput, TransactionOutput, UTxOByAddress,
self, Addr, Addrs, PostAlonsoTransactionOutput, StakeSnapshot, Stakes,
TransactionOutput, UTxOByAddress,
},
Client,
},
};

use pallas_primitives::ToCanonicalJson;
use std::path::{Path, PathBuf};
use std::{
collections::BTreeSet,
path::{Path, PathBuf},
};

use crate::{
chain_observer::{interface::*, ChainAddress, TxDatum},
Expand Down Expand Up @@ -175,6 +181,64 @@ impl PallasChainObserver {
Ok(utxo)
}

/// Fetches the current stake distribution using the provided `statequery` client.
async fn do_stake_snapshots_state_query(
&self,
statequery: &mut Client,
) -> StdResult<StakeSnapshot> {
statequery
.acquire(None)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to acquire statequery")?;

let era = queries_v16::get_current_era(statequery)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get current era")?;

let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get stake snapshot")?;

Ok(state_snapshot)
}

/// Returns the stake pool hash from the given bytestring.
fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
let pool_hash = bech32::encode("pool", key.to_base32(), Variant::Bech32)
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to encode stake pool hash")?;

Ok(pool_hash)
}

/// Fetches the current stake distribution using the provided `statequery` client.
async fn get_stake_distribution(
&self,
client: &mut NodeClient,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let statequery = client.statequery();

let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;

let mut stake_distribution = StakeDistribution::new();

let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
for (key, stakes) in stake_snapshot
.snapshots
.stake_snapshots
.iter()
.filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
{
let pool_hash = self.get_stake_pool_hash(key)?;
stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
}

Ok(Some(stake_distribution))
}

/// Processes a state query with the `NodeClient`, releasing the state query.
async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
let statequery = client.statequery();
Expand Down Expand Up @@ -245,8 +309,15 @@ impl ChainObserver for PallasChainObserver {
async fn get_current_stake_distribution(
&self,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let fallback = self.get_fallback();
fallback.get_current_stake_distribution().await
let mut client = self.get_client().await?;

let stake_distribution = self.get_stake_distribution(&mut client).await?;

self.post_process_statequery(&mut client).await?;

client.abort().await;

Ok(stake_distribution)
}

async fn get_current_kes_period(
Expand All @@ -264,7 +335,12 @@ mod tests {

use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
use pallas_crypto::hash::Hash;
use pallas_network::miniprotocols::localstate::{queries_v16::Value, ClientQueryRequest};
use pallas_network::miniprotocols::localstate::{
queries_v16::{
BlockQuery, HardForkQuery, LedgerQuery, Request, Snapshots, StakeSnapshot, Value,
},
ClientQueryRequest,
};
use tokio::net::UnixListener;

use super::*;
Expand Down Expand Up @@ -302,6 +378,64 @@ mod tests {
UTxOByAddress { utxo }
}

fn get_fake_stake_snapshot() -> StakeSnapshot {
let stake_snapshots = KeyValuePairs::from(vec![
(
Bytes::from(
hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
.unwrap(),
),
Stakes {
snapshot_mark_pool: 300000000001,
snapshot_set_pool: 300000000002,
snapshot_go_pool: 300000000000,
},
),
(
Bytes::from(
hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
.unwrap(),
),
Stakes {
snapshot_mark_pool: 600000000001,
snapshot_set_pool: 600000000002,
snapshot_go_pool: 600000000000,
},
),
(
Bytes::from(
hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
Stakes {
snapshot_mark_pool: 1200000000001,
snapshot_set_pool: 1200000000002,
snapshot_go_pool: 1200000000000,
},
),
(
Bytes::from(
hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 1300000000002,
snapshot_go_pool: 0,
},
),
]);

StakeSnapshot {
snapshots: Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 2100000000003,
snapshot_stake_set_total: 2100000000006,
snapshot_stake_go_total: 2100000000000,
},
}
}

/// pallas responses mock server.
async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
let query: queries_v16::Request =
Expand All @@ -311,27 +445,34 @@ mod tests {
};

match query {
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::HardForkQuery(
queries_v16::HardForkQuery::GetCurrentEra,
)) => AnyCbor::from_encode(4),
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
_,
queries_v16::BlockQuery::GetEpochNo,
)) => AnyCbor::from_encode([8]),
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
_,
queries_v16::BlockQuery::GetUTxOByAddress(_),
)) => AnyCbor::from_encode(get_fake_utxo_by_address()),
Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
AnyCbor::from_encode(4)
}
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
AnyCbor::from_encode([8])
}
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
AnyCbor::from_encode(get_fake_utxo_by_address())
}
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
AnyCbor::from_encode(get_fake_stake_snapshot())
}
_ => panic!("unexpected query from client: {query:?}"),
}
}

/// Creates a new work directory in the system's temporary folder.
fn create_temp_dir(folder_name: &str) -> PathBuf {
#[cfg(not(target_os = "macos"))]
let temp_dir = std::env::temp_dir()
.join("mithril_test")
.join("pallas_chain_observer_test")
.join(folder_name);

// macOS-domain addresses are variable-length filesystem pathnames of at most 104 characters.
#[cfg(target_os = "macos")]
let temp_dir: PathBuf = std::env::temp_dir().join(folder_name);

if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).expect("Previous work dir removal failed");
}
Expand Down Expand Up @@ -403,4 +544,39 @@ mod tests {
let datums = client_res.expect("Client failed");
assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
}

#[tokio::test]
async fn get_current_stake_distribution_with_fallback() {
let socket_path =
create_temp_dir("get_current_stake_distribution_with_fallback").join("node.socket");
let server = setup_server(socket_path.clone()).await;
let client = tokio::spawn(async move {
let fallback = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
let observer = super::PallasChainObserver::new(
socket_path.as_path(),
CardanoNetwork::TestNet(10),
fallback,
);
observer.get_current_stake_distribution().await.unwrap()
});

let (_, client_res) = tokio::join!(server, client);
let computed_stake_distribution = client_res.unwrap().unwrap();

let mut expected_stake_distribution = StakeDistribution::new();
expected_stake_distribution.insert(
"pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
300000000001,
);
expected_stake_distribution.insert(
"pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
600000000001,
);
expected_stake_distribution.insert(
"pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
1200000000001,
);

assert_eq!(expected_stake_distribution, computed_stake_distribution);
}
}

0 comments on commit cff2c64

Please sign in to comment.