-
Notifications
You must be signed in to change notification settings - Fork 40
/
balance_by_address.rs
108 lines (84 loc) · 3.05 KB
/
balance_by_address.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use pallas::ledger::traverse::MultiEraOutput;
use pallas::ledger::traverse::{MultiEraBlock, OutputRef};
use serde::Deserialize;
use crate::{crosscut, model, prelude::*};
#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub filter: Option<crosscut::filters::Predicate>,
}
pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
}
impl Reducer {
fn process_inbound_txo(
&mut self,
ctx: &model::BlockContext,
input: &OutputRef,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?;
let utxo = match utxo {
Some(x) => x,
None => return Ok(()),
};
let is_script_address = utxo.address().map_or(false, |x| x.has_script());
if !is_script_address {
return Ok(());
}
let address = utxo.address().map(|x| x.to_string()).or_panic()?;
let key = match &self.config.key_prefix {
Some(prefix) => format!("{}.{}", prefix, address),
None => format!("{}.{}", "balance_by_address".to_string(), address),
};
let crdt = model::CRDTCommand::PNCounter(key, (-1) * utxo.ada_amount() as i64);
output.send(gasket::messaging::Message::from(crdt))?;
Ok(())
}
fn process_outbound_txo(
&mut self,
tx_output: &MultiEraOutput,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
let is_script_address = tx_output.address().map_or(false, |x| x.has_script());
if !is_script_address {
return Ok(());
}
let address = tx_output.address().map(|x| x.to_string()).or_panic()?;
let key = match &self.config.key_prefix {
Some(prefix) => format!("{}.{}", prefix, address),
None => format!("{}.{}", "balance_by_address".to_string(), address),
};
let crdt = model::CRDTCommand::PNCounter(key, tx_output.ada_amount() as i64);
output.send(gasket::messaging::Message::from(crdt))?;
Ok(())
}
pub fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
for tx in block.txs().into_iter() {
if filter_matches!(self, block, &tx, ctx) {
for input in tx.inputs().iter().map(|i| i.output_ref()) {
self.process_inbound_txo(&ctx, &input, output)?;
}
for (_idx, tx_output) in tx.outputs().iter().enumerate() {
self.process_outbound_txo(tx_output, output)?;
}
}
}
Ok(())
}
}
impl Config {
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
let reducer = Reducer {
config: self,
policy: policy.clone(),
};
super::Reducer::BalanceByAddress(reducer)
}
}