Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
alethea-io committed May 30, 2023
2 parents 01e330a + 2799ce1 commit 26cb94a
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 0 deletions.
43 changes: 43 additions & 0 deletions src/model.rs
Expand Up @@ -138,6 +138,9 @@ pub enum CRDTCommand {
AnyWriteWins(Key, Value),
// TODO make sure Value is a generic not stringly typed
PNCounter(Key, Delta),
HashCounter(Key, Member, Delta),
HashSetValue(Key, Member, Value),
HashUnsetKey(Key, Member),
BlockFinished(Point),
}

Expand Down Expand Up @@ -225,6 +228,46 @@ impl CRDTCommand {
CRDTCommand::LastWriteWins(key, value.into(), ts)
}

pub fn hash_set_value<V>(
prefix: Option<&str>,
key: &str,
member: String,
value: V,
) -> CRDTCommand
where
V: Into<Value>,
{
let key = match prefix {
Some(prefix) => format!("{}.{}", prefix, key.to_string()),
None => key.to_string(),
};

CRDTCommand::HashSetValue(key, member, value.into())
}

pub fn hash_del_key(prefix: Option<&str>, key: &str, member: String) -> CRDTCommand {
let key = match prefix {
Some(prefix) => format!("{}.{}", prefix, key.to_string()),
None => key.to_string(),
};

CRDTCommand::HashUnsetKey(key, member)
}

pub fn hash_counter(
prefix: Option<&str>,
key: &str,
member: String,
delta: i64,
) -> CRDTCommand {
let key = match prefix {
Some(prefix) => format!("{}.{}", prefix, key.to_string()),
None => key.to_string(),
};

CRDTCommand::HashCounter(key, member, delta)
}

pub fn block_finished(block: &MultiEraBlock) -> CRDTCommand {
let hash = block.hash();
let slot = block.slot();
Expand Down
139 changes: 139 additions & 0 deletions src/reducers/full_utxos_by_address.rs
@@ -0,0 +1,139 @@
use pallas::codec::utils::CborWrap;
use pallas::crypto::hash::Hash;
use pallas::ledger::primitives::babbage::{DatumOption, PlutusData};
use pallas::ledger::primitives::Fragment;
use pallas::ledger::traverse::{Asset, MultiEraBlock, MultiEraTx};
use pallas::ledger::traverse::{MultiEraOutput, OriginalHash};
use serde::Deserialize;
use serde_json::json;

use crate::{crosscut, model, prelude::*};

#[derive(Deserialize)]
pub struct Config {
pub filter: Vec<String>,
pub prefix: Option<String>,
pub address_as_key: Option<bool>,
}

pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
}

pub fn resolve_datum(utxo: &MultiEraOutput, tx: &MultiEraTx) -> Result<PlutusData, ()> {
match utxo.datum() {
Some(DatumOption::Data(CborWrap(pd))) => Ok(pd),
Some(DatumOption::Hash(datum_hash)) => {
for raw_datum in tx.clone().plutus_data() {
if raw_datum.original_hash().eq(&datum_hash) {
return Ok(raw_datum.clone().unwrap());
}
}

return Err(());
}
_ => Err(()),
}
}

impl Reducer {
fn get_key_value(
&self,
utxo: &MultiEraOutput,
tx: &MultiEraTx,
output_ref: &(Hash<32>, u64),
) -> Option<(String, String)> {
if let Some(address) = utxo.address().map(|addr| addr.to_string()).ok() {
if self.config.filter.iter().any(|addr| address.eq(addr)) {
let mut data = serde_json::Value::Object(serde_json::Map::new());
let address_as_key = self.config.address_as_key.unwrap_or(false);
let key: String;

if address_as_key {
key = address;
data["tx_hash"] = serde_json::Value::String(hex::encode(output_ref.0.to_vec()));
data["output_index"] =
serde_json::Value::from(serde_json::Number::from(output_ref.1));
} else {
key = format!("{}#{}", hex::encode(output_ref.0.to_vec()), output_ref.1);
data["address"] = serde_json::Value::String(address);
}

if let Some(datum) = resolve_datum(utxo, tx).ok() {
data["datum"] = serde_json::Value::String(hex::encode(
datum.encode_fragment().ok().unwrap(),
));
} else if let Some(DatumOption::Hash(h)) = utxo.datum() {
data["datum_hash"] = serde_json::Value::String(hex::encode(h.to_vec()));
}

let mut assets: Vec<serde_json::Value> = Vec::new();
for asset in utxo.non_ada_assets() {
match asset {
Asset::Ada(lovelace_amt) => {
assets.push(json!({
"unit": "lovelace",
"quantity": format!("{}", lovelace_amt)
}));
}
Asset::NativeAsset(cs, tkn, amt) => {
let unit = format!("{}{}", hex::encode(cs.to_vec()), hex::encode(tkn));
assets.push(json!({
"unit": unit,
"quantity": format!("{}", amt)
}));
}
}
}

data["amount"] = serde_json::Value::Array(assets);
return Some((key, data.to_string()));
}
}

None
}

pub fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
let prefix = self.config.prefix.as_deref();
for tx in block.txs().into_iter() {
for consumed in tx.consumes().iter().map(|i| i.output_ref()) {
if let Some(Some(utxo)) = ctx.find_utxo(&consumed).apply_policy(&self.policy).ok() {
if let Some((key, value)) =
self.get_key_value(&utxo, &tx, &(consumed.hash().clone(), consumed.index()))
{
output.send(
model::CRDTCommand::set_remove(prefix, &key.as_str(), value).into(),
)?;
}
}
}

for (index, produced) in tx.produces() {
let output_ref = (tx.hash().clone(), index as u64);
if let Some((key, value)) = self.get_key_value(&produced, &tx, &output_ref) {
output.send(model::CRDTCommand::set_add(None, &key, value).into())?;
}
}
}

Ok(())
}
}

impl Config {
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
let reducer = Reducer {
config: self,
policy: policy.clone(),
};

super::Reducer::FullUtxosByAddress(reducer)
}
}
5 changes: 5 additions & 0 deletions src/reducers/mod.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{bootstrap, crosscut, model};
type InputPort = gasket::messaging::TwoPhaseInputPort<model::EnrichedBlockPayload>;
type OutputPort = gasket::messaging::OutputPort<model::CRDTCommand>;

pub mod full_utxos_by_address;
pub mod macros;
pub mod point_by_tx;
pub mod pool_by_stake;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub mod assets_by_stake_key;
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
FullUtxosByAddress(full_utxos_by_address::Config),
UtxoByAddress(utxo_by_address::Config),
PointByTx(point_by_tx::Config),
PoolByStake(pool_by_stake::Config),
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Config {
policy: &crosscut::policies::RuntimePolicy,
) -> Reducer {
match self {
Config::FullUtxosByAddress(c) => c.plugin(policy),
Config::UtxoByAddress(c) => c.plugin(policy),
Config::PointByTx(c) => c.plugin(),
Config::PoolByStake(c) => c.plugin(),
Expand Down Expand Up @@ -200,6 +203,7 @@ impl Bootstrapper {
}

pub enum Reducer {
FullUtxosByAddress(full_utxos_by_address::Reducer),
UtxoByAddress(utxo_by_address::Reducer),
PointByTx(point_by_tx::Reducer),
PoolByStake(pool_by_stake::Reducer),
Expand Down Expand Up @@ -252,6 +256,7 @@ impl Reducer {
output: &mut OutputPort,
) -> Result<(), gasket::error::Error> {
match self {
Reducer::FullUtxosByAddress(x) => x.reduce_block(block, ctx, output),
Reducer::UtxoByAddress(x) => x.reduce_block(block, ctx, output),
Reducer::PointByTx(x) => x.reduce_block(block, output),
Reducer::PoolByStake(x) => x.reduce_block(block, output),
Expand Down
27 changes: 27 additions & 0 deletions src/storage/redis.rs
Expand Up @@ -242,6 +242,33 @@ impl gasket::runtime::Worker for Worker {
.incr(key, value)
.or_restart()?;
}
model::CRDTCommand::HashSetValue(key, member, value) => {
log::debug!("setting hash key {} member {}", key, member);

self.connection
.as_mut()
.unwrap()
.hset(key, member, value)
.or_restart()?;
}
model::CRDTCommand::HashCounter(key, member, delta) => {
log::debug!("increasing hash key {} member {} by {}", key, member, delta);

self.connection
.as_mut()
.unwrap()
.hincr(key, member, delta)
.or_restart()?;
}
model::CRDTCommand::HashUnsetKey(key, member) => {
log::debug!("deleting hash key {} member {}", key, member);

self.connection
.as_mut()
.unwrap()
.hdel(member, key)
.or_restart()?;
}
model::CRDTCommand::BlockFinished(point) => {
let cursor_str = crosscut::PointArg::from(point).to_string();

Expand Down
9 changes: 9 additions & 0 deletions src/storage/skip.rs
Expand Up @@ -128,6 +128,15 @@ impl gasket::runtime::Worker for Worker {
model::CRDTCommand::PNCounter(key, value) => {
log::debug!("increasing counter [{}], by [{}]", key, value);
}
model::CRDTCommand::HashSetValue(key, member, _) => {
log::debug!("setting hash key {} member {}", key, member);
}
model::CRDTCommand::HashCounter(key, member, delta) => {
log::debug!("increasing hash key {} member {} by {}", key, member, delta);
}
model::CRDTCommand::HashUnsetKey(key, member) => {
log::debug!("deleting hash key {} member {}", key, member);
}
model::CRDTCommand::BlockFinished(point) => {
log::debug!("block finished {:?}", point);
let mut last_point = self.last_point.lock().unwrap();
Expand Down
25 changes: 25 additions & 0 deletions testdrive/full_utxos_by_address/daemon.toml
@@ -0,0 +1,25 @@
[source]
type = "N2N"
address = "relays-new.cardano-mainnet.iohk.io:3001"

[enrich]
type = "Sled"
db_path = "./data/sled_db"

[[reducers]]
type = "FullUtxosByAddress"
filter = ["addr1z8snz7c4974vzdpxu65ruphl3zjdvtxw8strf2c2tmqnxz2j2c79gy9l76sdg0xwhd7r0c0kna0tycz4y5s6mlenh8pq0xmsha"]
# address_as_key = false

[storage]
type = "Redis"
connection_params = "redis://redis:6379"

[intersect]
type = "Tip"

[chain]
type = "Mainnet"

[policy]
missing_data = "Skip"
1 change: 1 addition & 0 deletions testdrive/full_utxos_by_address/data/.gitignore
@@ -0,0 +1 @@
dump.rdb
19 changes: 19 additions & 0 deletions testdrive/full_utxos_by_address/docker-compose.yml
@@ -0,0 +1,19 @@
version: "3.7"

services:
scrolls:
image: local/scrolls:staging
command: ["daemon"]
environment:
- RUST_LOG=info
volumes:
- ./daemon.toml:/etc/scrolls/daemon.toml
links:
- redis
redis:
image: redis/redis-stack:latest
volumes:
- ./data:/data
ports:
- "6379:6379"
- "8001:8001"

0 comments on commit 26cb94a

Please sign in to comment.