Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Acropolis indexer module

[package]
name = "acropolis_module_indexer"
name = "acropolis_module_custom_indexer"
version = "0.1.0"
edition = "2021"
authors = ["William Hankins <william@sundae.fi>"]
Expand All @@ -14,9 +14,13 @@ acropolis_common = { path = "../../common" }
caryatid_sdk = { workspace = true }

anyhow = { workspace = true }
bincode = "1"
config = { workspace = true }
fjall = "2.7.0"
pallas = { workspace = true}
serde = { workspace = true, features = ["rc"] }
tokio = { workspace = true }
tracing = { workspace = true }

[lib]
path = "src/indexer.rs"
path = "src/custom_indexer.rs"
4 changes: 4 additions & 0 deletions modules/custom_indexer/config.default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# The topic to publish sync commands on
sync-command-publisher-topic = "cardano.sync.command"
# The topic to receive txs on
txs-subscribe-topic = "cardano.txs"
19 changes: 19 additions & 0 deletions modules/custom_indexer/src/chain_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use acropolis_common::{BlockInfo, Point};
use anyhow::Result;
use caryatid_sdk::async_trait;
use pallas::ledger::traverse::MultiEraTx;

#[async_trait]
pub trait ChainIndex: Send + Sync + 'static {
fn name(&self) -> String;

async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> {
let _ = (info, tx);
Ok(())
}

async fn handle_rollback(&mut self, point: &Point) -> Result<()> {
let _ = point;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use config::Config;

#[derive(serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct IndexerConfig {
pub sync_command_topic: String,
pub struct CustomIndexerConfig {
pub sync_command_publisher_topic: String,
pub txs_subscribe_topic: String,
}

impl IndexerConfig {
impl CustomIndexerConfig {
pub fn try_load(config: &Config) -> Result<Self> {
let full_config = Config::builder()
.add_source(config::File::from_str(
Expand Down
79 changes: 79 additions & 0 deletions modules/custom_indexer/src/cursor_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{future::Future, path::Path, sync::Mutex};

use acropolis_common::Point;
use anyhow::Result;
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};

pub trait CursorStore: Send + Sync + 'static {
fn load(&self) -> impl Future<Output = Result<Option<Point>>> + Send;
fn save(&self, point: &Point) -> impl Future<Output = Result<()>> + Send;
}

// In memory cursor store (Not persisted across runs)
pub struct InMemoryCursorStore {
cursor: Mutex<Option<Point>>,
}
impl InMemoryCursorStore {
pub fn new(point: Point) -> Self {
Self {
cursor: Mutex::new(Some(point)),
}
}
}
impl CursorStore for InMemoryCursorStore {
async fn load(&self) -> Result<Option<Point>> {
let guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?;
Ok(guard.as_ref().cloned())
}

async fn save(&self, point: &Point) -> Result<()> {
let mut guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?;
*guard = Some(point.clone());
Ok(())
}
}

// Fjall backed cursor store (Retains last stored point)
pub struct FjallCursorStore {
cursor: Partition,
}

impl FjallCursorStore {
pub fn new(path: impl AsRef<Path>, point: Point) -> Result<Self> {
let cfg = Config::new(path);
let keyspace = Keyspace::open(cfg)?;
let partition = keyspace.open_partition("cursor", PartitionCreateOptions::default())?;

// Use stored point if exists or initialize with provided point
match partition.get("cursor")? {
Some(_) => Ok(Self { cursor: partition }),
None => {
let raw = bincode::serialize(&point)?;
partition.insert("cursor", raw)?;
Ok(Self { cursor: partition })
}
}
}
}

impl CursorStore for FjallCursorStore {
async fn load(&self) -> Result<Option<Point>> {
let raw = self.cursor.get("cursor")?;

let Some(bytes) = raw else {
return Ok(None);
};

let point: Point = bincode::deserialize(&bytes)?;

Ok(Some(point))
}

async fn save(&self, point: &Point) -> Result<()> {
let raw = bincode::serialize(point)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not likely to be a problem here as I would expect the structure of Point to be stable (it's a simple two-variant enum), however for less stable types that you're encoding/decoding, I like to include a version prefix constant to ensure compatibility with previous versions of a struct's encoding/decoding. This is particularly relevant with bincode since it's a non-self-describing format—it doesn't store field names or type information, so changes to field order or structure can cause silent deserialization failures.

Something like the following:

const CURSOR_VERSION: u8 = 1;

async fn save(&self, point: &Point) -> Result<()> {
    let mut raw = vec![CURSOR_VERSION];
    raw.extend(bincode::serialize(point)?);
    self.cursor.insert("cursor", raw)?;
    Ok(())
}

async fn load(&self) -> Result<Option<Point>> {
    let Some(bytes) = self.cursor.get("cursor")? else {
        return Ok(None);
    };
    
    match bytes.first() {
        Some(1) => Ok(Some(bincode::deserialize(&bytes[1..])?)),
        Some(v) => anyhow::bail!("unsupported cursor version: {v}"),
        None => anyhow::bail!("empty cursor data"),
    }
}

Again, this is more pertinent to data that could be subject to change in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could totally change in the future. A Point is just two fields, but as we index more forms of data we may need more info in these cursors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍🏼

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this'll be handed in a followup


self.cursor.insert("cursor", raw)?;

Ok(())
}
}
Loading