Skip to content

Commit

Permalink
feat: Introduce Supply by Asset reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Feb 8, 2023
2 parents 7bec205 + ef5b147 commit e05f460
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/crosscut/mod.rs
Expand Up @@ -2,5 +2,6 @@ mod args;
pub mod epochs;
pub mod filters;
pub mod policies;
pub mod time;

pub use args::*;
72 changes: 72 additions & 0 deletions src/crosscut/time.rs
@@ -0,0 +1,72 @@
use super::ChainWellKnownInfo;

#[inline]
fn compute_linear_timestamp(
known_slot: u64,
known_time: u64,
slot_length: u64,
query_slot: u64,
) -> u64 {
known_time + (query_slot - known_slot) * slot_length
}

#[inline]
fn compute_era_epoch(era_slot: u64, era_slot_length: u64, era_epoch_length: u64) -> (u64, u64) {
let epoch = (era_slot * era_slot_length) / era_epoch_length;
let reminder = era_slot % era_epoch_length;

(epoch, reminder)
}

/// A naive, standalone implementation of a time provider
///
/// This time provider doesn't require any external resources other than an
/// initial config. It works by applying simple slot => wallclock conversion
/// logic from a well-known configured point in the chain, assuming homogeneous
/// slot length from that point forward.
#[derive(Clone)]
pub(crate) struct NaiveProvider {
config: ChainWellKnownInfo,
}

impl NaiveProvider {
pub fn new(config: ChainWellKnownInfo) -> Self {
assert!(
config.byron_epoch_length > 0,
"byron epoch length needs to be greater than zero"
);

assert!(
config.shelley_epoch_length > 0,
"shelley epoch length needs to be greater than zero"
);

let (shelley_start_epoch, _) = compute_era_epoch(
config.shelley_known_slot,
config.byron_slot_length as u64,
config.byron_epoch_length as u64,
);

NaiveProvider { config }
}

pub fn slot_to_wallclock(&self, slot: u64) -> u64 {
let NaiveProvider { config, .. } = self;

if slot < config.shelley_known_slot {
compute_linear_timestamp(
config.byron_known_slot,
config.byron_known_time,
config.byron_slot_length as u64,
slot,
)
} else {
compute_linear_timestamp(
config.shelley_known_slot,
config.shelley_known_time,
config.shelley_slot_length as u64,
slot,
)
}
}
}
12 changes: 11 additions & 1 deletion src/reducers/mod.rs
Expand Up @@ -28,6 +28,8 @@ pub mod block_header_by_hash;
#[cfg(feature = "unstable")]
pub mod last_block_parameters;
#[cfg(feature = "unstable")]
pub mod supply_by_asset;
#[cfg(feature = "unstable")]
pub mod tx_by_hash;
#[cfg(feature = "unstable")]
pub mod tx_count_by_address;
Expand Down Expand Up @@ -67,6 +69,8 @@ pub enum Config {
UtxosByAsset(utxos_by_asset::Config),
#[cfg(feature = "unstable")]
UtxoByStake(utxo_by_stake::Config),
#[cfg(feature = "unstable")]
SupplyByAsset(supply_by_asset::Config),
}

impl Config {
Expand All @@ -85,7 +89,7 @@ impl Config {
#[cfg(feature = "unstable")]
Config::BalanceByAddress(c) => c.plugin(policy),
#[cfg(feature = "unstable")]
Config::TxByHash(c) => c.plugin(policy),
Config::TxByHash(c) => c.plugin(chain, policy),
#[cfg(feature = "unstable")]
Config::TxCountByAddress(c) => c.plugin(policy),
#[cfg(feature = "unstable")]
Expand All @@ -102,6 +106,8 @@ impl Config {
Config::UtxosByAsset(c) => c.plugin(policy),
#[cfg(feature = "unstable")]
Config::UtxoByStake(c) => c.plugin(policy),
#[cfg(feature = "unstable")]
Config::SupplyByAsset(c) => c.plugin(policy),
}
}
}
Expand Down Expand Up @@ -178,6 +184,8 @@ pub enum Reducer {
UtxosByAsset(utxos_by_asset::Reducer),
#[cfg(feature = "unstable")]
UtxoByStake(utxo_by_stake::Reducer),
#[cfg(feature = "unstable")]
SupplyByAsset(supply_by_asset::Reducer),
}

impl Reducer {
Expand Down Expand Up @@ -214,6 +222,8 @@ impl Reducer {
Reducer::UtxosByAsset(x) => x.reduce_block(block, ctx, output),
#[cfg(feature = "unstable")]
Reducer::UtxoByStake(x) => x.reduce_block(block, ctx, output),
#[cfg(feature = "unstable")]
Reducer::SupplyByAsset(x) => x.reduce_block(block, ctx, output),
}
}
}
102 changes: 102 additions & 0 deletions src/reducers/supply_by_asset.rs
@@ -0,0 +1,102 @@
use std::str::FromStr;

use gasket::error::AsWorkError;
use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::Asset;
use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx};
use serde::Deserialize;

use crate::{crosscut, model};

#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub policy_ids_hex: Option<Vec<String>>,
}

pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
policy_ids: Option<Vec<Hash<28>>>,
}

impl Reducer {
fn is_policy_id_accepted(&self, policy_id: &Hash<28>) -> bool {
return match &self.policy_ids {
Some(pids) => pids.contains(&policy_id),
None => true,
};
}

fn process_asset(
&mut self,
policy: Hash<28>,
asset: Vec<u8>,
qty: i64,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
if !self.is_policy_id_accepted(&policy) {
return Ok(());
}
let prefix = self.config.key_prefix.as_deref();
let asset_id = &format!("{}{}", policy, hex::encode(asset));

let key = match &self.config.key_prefix {
Some(prefix) => format!("{}.{}", prefix, asset_id),
None => format!("{}.{}", "supply_by_asset".to_string(), asset_id),
};

let crdt = model::CRDTCommand::PNCounter(key, qty);

output.send(crdt.into())
}

pub fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
for tx in block.txs().into_iter() {
for (idx, txo) in ctx.find_consumed_txos(&tx, &self.policy).or_panic()? {
for asset in txo.assets() {
if let Asset::NativeAsset(policy, asset, qty) = asset {
self.process_asset(policy,asset, -1 * qty as i64, output)?;
}
}
}

for (idx, txo) in tx.produces() {
for asset in txo.assets() {
if let Asset::NativeAsset(policy, asset, qty) = asset {
self.process_asset(policy, asset, qty as i64, output)?;
}
}
}
}

Ok(())
}
}

impl Config {
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
let policy_ids: Option<Vec<Hash<28>>> = match &self.policy_ids_hex {
Some(pids) => {
let ps = pids
.iter()
.map(|pid| Hash::<28>::from_str(pid).expect("invalid policy_id"))
.collect();

Some(ps)
}
None => None,
};
let reducer = Reducer {
config: self,
policy: policy.clone(),
};

super::Reducer::UtxosByAsset(reducer)
}
}
20 changes: 17 additions & 3 deletions src/reducers/tx_by_hash.rs
@@ -1,5 +1,6 @@
use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx};
use serde::Deserialize;
use serde_json::json;

use crate::prelude::*;
use crate::{crosscut, model};
Expand All @@ -26,11 +27,13 @@ pub struct Config {
pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
time: crosscut::time::NaiveProvider,
}

impl Reducer {
fn send(
&mut self,
block: &MultiEraBlock,
tx: &MultiEraTx,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
Expand All @@ -40,7 +43,13 @@ impl Reducer {
let cbor = tx.encode();
model::CRDTCommand::any_write_wins(key_prefix, tx.hash(), cbor)
}
Projection::Json => todo!(),
Projection::Json => {
let cbor = tx.encode();
let slot = block.slot();
let ts = self.time.slot_to_wallclock(slot);
let json = json!({ "cbor": hex::encode(cbor), "slot": slot, "time": ts});
model::CRDTCommand::any_write_wins(key_prefix, tx.hash(), json.to_string())
}
};

output.send(gasket::messaging::Message::from(crdt))?;
Expand All @@ -56,7 +65,7 @@ impl Reducer {
) -> Result<(), gasket::error::Error> {
for tx in &block.txs() {
if filter_matches!(self, block, &tx, ctx) {
self.send(tx, output)?;
self.send(block, tx, output)?;
}
}

Expand All @@ -65,10 +74,15 @@ impl Reducer {
}

impl Config {
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
pub fn plugin(
self,
chain: &crosscut::ChainWellKnownInfo,
policy: &crosscut::policies::RuntimePolicy,
) -> super::Reducer {
let worker = Reducer {
config: self,
policy: policy.clone(),
time: crosscut::time::NaiveProvider::new(chain.clone()),
};
super::Reducer::TxByHash(worker)
}
Expand Down

0 comments on commit e05f460

Please sign in to comment.