-
Notifications
You must be signed in to change notification settings - Fork 5
feat: custom indexer module (single index) #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
| if let Err(e) = idx.handle_onchain_tx(block, &tx).await | ||
| { | ||
| warn!( | ||
| "Failed to index tx {} in block {}: {e:#}", | ||
| tx_index, block.number | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're interfacing with code outside of acropolis itself, I think we need a new error-handling strategy. Specifically, if a custom index returns an error when processing a TX, we should not keep passing data to that index.
The reason for that is, the implementation of an index is coming from user code. If that user code throws an error, something is wrong which its code can't account for; maybe a datum is malformed, maybe their DB is down, maybe it's some bug. But the application's operator should be able to fix their code or infrastructure, and after they do, the application should resume indexing from wherever it left off.
| warn!( | ||
| "Failed to decode tx {} in block {}", | ||
| tx_index, block.number | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For similar reasons to the ones described above, if we can't decode a transaction, we should not apply future transactions. If someone submits a "malformed" transaction on-chain, we shouldn't just skip it; we should stop processing transactions until it gets rolled back or our decoder gets fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this'll be handled in a followup
| } | ||
|
|
||
| async fn save(&self, point: &Point) -> Result<()> { | ||
| let raw = bincode::serialize(point)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not likely to be a problem here as I would expect the structure of Point to be stable (it's a simple two-variant enum), however for less stable types that you're encoding/decoding, I like to include a version prefix constant to ensure compatibility with previous versions of a struct's encoding/decoding. This is particularly relevant with bincode since it's a non-self-describing format—it doesn't store field names or type information, so changes to field order or structure can cause silent deserialization failures.
Something like the following:
const CURSOR_VERSION: u8 = 1;
async fn save(&self, point: &Point) -> Result<()> {
let mut raw = vec![CURSOR_VERSION];
raw.extend(bincode::serialize(point)?);
self.cursor.insert("cursor", raw)?;
Ok(())
}
async fn load(&self) -> Result<Option<Point>> {
let Some(bytes) = self.cursor.get("cursor")? else {
return Ok(None);
};
match bytes.first() {
Some(1) => Ok(Some(bincode::deserialize(&bytes[1..])?)),
Some(v) => anyhow::bail!("unsupported cursor version: {v}"),
None => anyhow::bail!("empty cursor data"),
}
}Again, this is more pertinent to data that could be subject to change in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could totally change in the future. A Point is just two fields, but as we index more forms of data we may need more info in these cursors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense 👍🏼
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this'll be handed in a followup
Signed-off-by: William Hankins <william@sundae.fi>
|
Merging. Simon's feedback regarding how the indexer handles decode, rollback, and tx handling errors will be included in the follow up PR. |
Description
This PR introduces the initial custom indexer module. It enables downstream code to implement the
ChainIndextrait and receive transaction events. The module currently supports a single index and a follow up PR will extend this to handle multiple indices within one module.It also includes 2
CursorStoreimplementations (Fjall and in memory). Example usage is provided inprocesses/indexer/main.rs, including a simple pool cost index to demonstrate how to construct and run an indexer.Related Issue(s)
Completes #375
How was this tested?
CursorStorerestores the last persisted point and resumes sync from it.Checklist
Impact / Side effects
No side effects for the omnibus process.
Reviewer notes / Areas to focus
custom_indexer.rs, especially theinitflow andcontext.runbehavior.