Skip to content

Commit

Permalink
refactor(indexer): Indexer Framework tries to travel back to find mis…
Browse files Browse the repository at this point in the history
…sing local receipt in case of delayed receipt (#4316)

* Indexer Framework tries to travel back to find missing local receipt in case of delayed receipt

* Update Cargo.lock

* Update CHANGELOG

* Improve CHANGELOG
  • Loading branch information
khorolets committed May 25, 2021
1 parent 9a7d172 commit ca39084
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 13 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion chain/indexer/CHANGELOG.md
@@ -1,6 +1,10 @@
# Changelog

## 0.9.0
## 0.9.1

* Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include.

## 0.9.0 (do not use this version, it contains a bug)

* Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt`
* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
Expand Down
3 changes: 2 additions & 1 deletion chain/indexer/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "near-indexer"
version = "0.8.1"
version = "0.9.1"
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2018"

[dependencies]
actix = "0.11.0-beta.2"
async-recursion = "0.3.2"
tracing = "0.1.13"
futures = "0.3.5"
rocksdb = "0.15.0"
Expand Down
60 changes: 50 additions & 10 deletions chain/indexer/src/streamer/mod.rs
@@ -1,6 +1,7 @@
use std::time::Duration;

use actix::Addr;
use async_recursion::async_recursion;
use rocksdb::DB;
use tokio::sync::mpsc;
use tokio::time;
Expand All @@ -12,8 +13,8 @@ use crate::{AwaitForNodeSyncedEnum, IndexerConfig};

use self::errors::FailedToFetchData;
use self::fetchers::{
fetch_block_by_height, fetch_chunks, fetch_latest_block, fetch_outcomes, fetch_state_changes,
fetch_status,
fetch_block_by_hash, fetch_block_by_height, fetch_chunks, fetch_latest_block, fetch_outcomes,
fetch_state_changes, fetch_status,
};
pub use self::types::{
IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt,
Expand All @@ -34,6 +35,7 @@ const INTERVAL: Duration = Duration::from_millis(500);
/// This function supposed to return the entire `StreamerMessage`.
/// It fetches the block and all related parts (chunks, outcomes, state changes etc.)
/// and returns everything together in one struct
#[async_recursion]
async fn build_streamer_message(
client: &Addr<near_client::ViewClientActor>,
block: views::BlockView,
Expand Down Expand Up @@ -116,15 +118,53 @@ async fn build_streamer_message(

let shard_id = header.shard_id.clone() as usize;

indexer_shards[shard_id].receipt_execution_outcomes = receipt_outcomes
.into_iter()
.map(|IndexerExecutionOutcomeWithOptionalReceipt { execution_outcome, receipt }| {
IndexerExecutionOutcomeWithReceipt {
execution_outcome,
receipt: receipt.expect("`receipt` must be present at this moment"),
let mut receipt_execution_outcomes: Vec<IndexerExecutionOutcomeWithReceipt> = vec![];
for outcome in receipt_outcomes {
let IndexerExecutionOutcomeWithOptionalReceipt { execution_outcome, receipt } = outcome;
let receipt = if let Some(receipt) = receipt {
receipt
} else {
// TODO(#4317): optimize it https://github.com/near/nearcore/issues/4317
// Receipt might be missing only in case of delayed local receipt
// that appeared in some of the previous blocks
// we will be iterating over previous blocks until we found the receipt
let mut prev_block_tried = 0u16;
let mut prev_block_hash = block.header.prev_hash;
'find_local_receipt: loop {
if prev_block_tried > 1000 {
panic!("Failed to find local receipt in 1000 prev blocks");
}
let prev_block = match fetch_block_by_hash(&client, prev_block_hash).await {
Ok(block) => block,
Err(err) => panic!("Unable to get previous block: {:?}", err),
};

prev_block_hash = prev_block.header.prev_hash;

let streamer_message = match build_streamer_message(&client, prev_block).await {
Ok(response) => response,
Err(err) => {
panic!("Unable to build streamer message for previous block: {:?}", err)
}
};
for shard in streamer_message.shards {
if let Some(chunk) = shard.chunk {
if let Some(receipt) = chunk
.receipts
.into_iter()
.find(|rec| rec.receipt_id == execution_outcome.id)
{
break 'find_local_receipt receipt;
}
}
}
prev_block_tried += 1;
}
})
.collect();
};
receipt_execution_outcomes
.push(IndexerExecutionOutcomeWithReceipt { execution_outcome, receipt: receipt });
}
indexer_shards[shard_id].receipt_execution_outcomes = receipt_execution_outcomes;

// Put the chunk into corresponding indexer shard
indexer_shards[shard_id].chunk = Some(IndexerChunkView {
Expand Down

0 comments on commit ca39084

Please sign in to comment.