Skip to content

Commit

Permalink
Merge branch 'main' into reducer-fixes3
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Aug 7, 2022
2 parents 9c64dce + a2a4e7d commit 98546ff
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -22,7 +22,7 @@ clap = { version = "3.2.6", features = ["derive"] }
log = "0.4.14"
env_logger = "0.9.0"
merge = "0.1.0"
config = { version = "0.13.0", default-features = false, features = ["toml"] }
config = { version = "0.13.0", default-features = false, features = ["toml", "json"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
minicbor = "0.14.1"
Expand Down
1 change: 1 addition & 0 deletions assets/test.block

Large diffs are not rendered by default.

13 changes: 0 additions & 13 deletions src/crosscut/addresses.rs

This file was deleted.

223 changes: 223 additions & 0 deletions src/crosscut/filters.rs
@@ -0,0 +1,223 @@
use pallas::ledger::{
addresses::Address,
traverse::{MultiEraBlock, MultiEraTx},
};
use serde::Deserialize;

use crate::prelude::*;
use crate::{crosscut, model};

#[derive(Deserialize, Clone, Default)]
pub struct AddressPattern {
pub exact: Option<String>,
pub payment: Option<String>,
pub stake: Option<String>,
pub is_script: Option<bool>,
}

impl AddressPattern {
pub fn matches(&self, addr: Address) -> bool {
if let Some(x) = &self.exact {
if addr.to_string().eq(x) {
return true;
}
}

if let Some(_) = &self.payment {
todo!();
}

if let Some(_) = &self.stake {
todo!();
}

if let Some(x) = &self.is_script {
return addr.has_script() == *x;
}

false
}
}

#[derive(Deserialize, Clone)]
pub struct AssetPattern {
pub policy: Option<String>,
pub name: Option<String>,
pub subject: Option<String>,
}

#[derive(Deserialize, Clone)]
pub struct BlockPattern {
pub slot_before: Option<u64>,
pub slot_after: Option<u64>,
}

#[derive(Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Predicate {
AllOf(Vec<Predicate>),
AnyOf(Vec<Predicate>),
Not(Box<Predicate>),
InputAddress(AddressPattern),
OutputAddress(AddressPattern),
WithdrawalTo(AddressPattern),
InputAsset(AssetPattern),
OutputAsset(AssetPattern),
Block(BlockPattern),
}

impl Predicate {
pub fn and(&self, other: &Self) -> Self {
Predicate::AllOf(vec![self.clone(), other.clone()])
}
}

#[inline]
fn eval_payment_to(tx: &MultiEraTx, pattern: &AddressPattern) -> Result<bool, crate::Error> {
let x = tx
.outputs()
.iter()
.filter_map(|o| o.address().ok())
.any(|a| pattern.matches(a));

Ok(x)
}

#[inline]
fn eval_payment_from(
tx: &MultiEraTx,
ctx: &model::BlockContext,
pattern: &AddressPattern,
policy: &crosscut::policies::RuntimePolicy,
) -> Result<bool, crate::Error> {
for input in tx.inputs() {
let utxo = ctx.find_utxo(&input.output_ref()).apply_policy(policy)?;
if let Some(utxo) = utxo {
if let Some(addr) = utxo.address().ok() {
if pattern.matches(addr) {
return Ok(true);
}
}
}
}

Ok(false)
}

#[inline]
fn eval_any_of(
predicates: &[Predicate],
block: &MultiEraBlock,
tx: &MultiEraTx,
ctx: &model::BlockContext,
policy: &crosscut::policies::RuntimePolicy,
) -> Result<bool, crate::Error> {
for p in predicates.iter() {
if eval_predicate(p, block, tx, ctx, policy)? {
return Ok(true);
}
}

Ok(false)
}

#[inline]
fn eval_all_of(
predicates: &[Predicate],
block: &MultiEraBlock,
tx: &MultiEraTx,
ctx: &model::BlockContext,
policy: &crosscut::policies::RuntimePolicy,
) -> Result<bool, crate::Error> {
for p in predicates.iter() {
if !eval_predicate(p, block, tx, ctx, policy)? {
return Ok(false);
}
}

Ok(true)
}

pub fn eval_predicate(
predicate: &Predicate,
block: &MultiEraBlock,
tx: &MultiEraTx,
ctx: &model::BlockContext,
policy: &crosscut::policies::RuntimePolicy,
) -> Result<bool, crate::Error> {
match predicate {
Predicate::Not(x) => eval_predicate(x, block, tx, ctx, policy).map(|x| !x),
Predicate::AnyOf(x) => eval_any_of(x, block, tx, ctx, policy),
Predicate::AllOf(x) => eval_all_of(x, block, tx, ctx, policy),
Predicate::OutputAddress(x) => eval_payment_to(tx, x),
Predicate::InputAddress(x) => eval_payment_from(tx, ctx, x, policy),
Predicate::WithdrawalTo(_) => todo!(),
Predicate::InputAsset(_) => todo!(),
Predicate::OutputAsset(_) => todo!(),
Predicate::Block(_) => todo!(),
}
}

#[cfg(test)]
mod tests {
use pallas::ledger::traverse::MultiEraBlock;

use crate::{crosscut::policies::RuntimePolicy, model::BlockContext};

use super::{eval_predicate, AddressPattern, Predicate};

fn test_predicate_in_block(predicate: &Predicate, expected_txs: &[usize]) {
let cbor = include_str!("../../assets/test.block");
let bytes = hex::decode(cbor).unwrap();
let block = MultiEraBlock::decode(&bytes).unwrap();
let ctx = BlockContext::default();
let policy = RuntimePolicy::default();

let idxs: Vec<_> = block
.txs()
.iter()
.enumerate()
.filter(|(_, tx)| eval_predicate(predicate, &block, tx, &ctx, &policy).unwrap())
.map(|(idx, _)| idx)
.collect();

assert_eq!(idxs, expected_txs);
}

#[test]
fn payment_to_exact_address() {
let x = Predicate::OutputAddress(AddressPattern {
exact: Some("addr1q8fukvydr8m5y3gztte3d4tnw0v5myvshusmu45phf20h395kqnygcykgjy42m29tksmwnd0js0z8p3swm5ntryhfu8sg7835c".into()),
..Default::default()
});

test_predicate_in_block(&x, &[0]);
}

#[test]
fn payment_to_script_address() {
let x = Predicate::OutputAddress(AddressPattern {
is_script: Some(true),
..Default::default()
});

test_predicate_in_block(&x, &[]);
}

#[test]
fn any_of() {
let a = Predicate::OutputAddress(AddressPattern {
exact: Some("addr1q8fukvydr8m5y3gztte3d4tnw0v5myvshusmu45phf20h395kqnygcykgjy42m29tksmwnd0js0z8p3swm5ntryhfu8sg7835c".into()),
..Default::default()
});

let b = Predicate::OutputAddress(AddressPattern {
is_script: Some(true),
..Default::default()
});

let x = Predicate::AnyOf(vec![a, b]);

test_predicate_in_block(&x, &[0]);
}
}
2 changes: 1 addition & 1 deletion src/crosscut/mod.rs
@@ -1,6 +1,6 @@
pub mod addresses;
mod args;
pub mod epochs;
pub mod filters;
pub mod policies;

pub use args::*;
1 change: 1 addition & 0 deletions src/prelude.rs
@@ -1,2 +1,3 @@
pub use crate::crosscut::policies::AppliesPolicy;
pub(crate) use crate::reducers::macros::*;
pub use gasket::error::AsWorkError;
31 changes: 17 additions & 14 deletions src/reducers/address_by_txo.rs
Expand Up @@ -3,16 +3,18 @@ use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::MultiEraBlock;
use serde::Deserialize;

use crate::model;
use crate::prelude::*;
use crate::{crosscut, model};

#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub filter: Option<Vec<String>>,
pub filter: Option<crosscut::filters::Predicate>,
}

pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
}

impl Reducer {
Expand All @@ -24,12 +26,6 @@ impl Reducer {
output_idx: usize,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
if let Some(addresses) = &self.config.filter {
if let Err(_) = addresses.binary_search(&address.to_string()) {
return Ok(());
}
}

let crdt = model::CRDTCommand::last_write_wins(
self.config.key_prefix.as_deref(),
&format!("{}#{}", tx_hash, output_idx),
Expand All @@ -45,17 +41,20 @@ impl Reducer {
pub fn reduce_block(
&mut self,
block: &MultiEraBlock,
ctx: &model::BlockContext,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
let slot = block.slot();

for tx in block.txs() {
let tx_hash = tx.hash();
if filter_matches!(self, block, &tx, ctx) {
let tx_hash = tx.hash();

for (output_idx, tx_out) in tx.outputs().iter().enumerate() {
let address = tx_out.address().map(|x| x.to_string()).or_panic()?;
for (output_idx, tx_out) in tx.outputs().iter().enumerate() {
let address = tx_out.address().map(|x| x.to_string()).or_panic()?;

self.send(slot, &address, tx_hash, output_idx, output)?;
self.send(slot, &address, tx_hash, output_idx, output)?;
}
}
}

Expand All @@ -64,8 +63,12 @@ impl Reducer {
}

impl Config {
pub fn plugin(self) -> super::Reducer {
let reducer = Reducer { config: self };
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
let reducer = Reducer {
config: self,
policy: policy.clone(),
};

super::Reducer::AddressByTxo(reducer)
}
}
28 changes: 9 additions & 19 deletions src/reducers/balance_by_address.rs
Expand Up @@ -7,7 +7,7 @@ use crate::{crosscut, model, prelude::*};
#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub filter: Option<Vec<String>>,
pub filter: Option<crosscut::filters::Predicate>,
}

pub struct Reducer {
Expand Down Expand Up @@ -37,12 +37,6 @@ impl Reducer {

let address = utxo.address().map(|x| x.to_string()).or_panic()?;

if let Some(addresses) = &self.config.filter {
if let Err(_) = addresses.binary_search(&address.to_string()) {
return Ok(());
}
}

let key = match &self.config.key_prefix {
Some(prefix) => format!("{}.{}", prefix, address),
None => format!("{}.{}", "balance_by_address".to_string(), address),
Expand All @@ -68,12 +62,6 @@ impl Reducer {

let address = tx_output.address().map(|x| x.to_string()).or_panic()?;

if let Some(addresses) = &self.config.filter {
if let Err(_) = addresses.binary_search(&address) {
return Ok(());
}
}

let key = match &self.config.key_prefix {
Some(prefix) => format!("{}.{}", prefix, address),
None => format!("{}.{}", "balance_by_address".to_string(), address),
Expand All @@ -93,12 +81,14 @@ impl Reducer {
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
for tx in block.txs().into_iter() {
for input in tx.inputs().iter().map(|i| i.output_ref()) {
self.process_inbound_txo(&ctx, &input, output)?;
}

for (_idx, tx_output) in tx.outputs().iter().enumerate() {
self.process_outbound_txo(tx_output, output)?;
if filter_matches!(self, block, &tx, ctx) {
for input in tx.inputs().iter().map(|i| i.output_ref()) {
self.process_inbound_txo(&ctx, &input, output)?;
}

for (_idx, tx_output) in tx.outputs().iter().enumerate() {
self.process_outbound_txo(tx_output, output)?;
}
}
}

Expand Down

0 comments on commit 98546ff

Please sign in to comment.