Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion modules/chain_store/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl ChainStore {
// Get promise of params message so the params queue is cleared and
// the message is ready as soon as possible when we need it
let mut params_message = params_subscription.read();

// Validate the first stored block matches what is already persisted when clear-on-start is false
let Ok((_, first_block_message)) = new_blocks_subscription.read().await else {
return;
};
if let Err(err) = Self::handle_first_block(&store, &first_block_message) {
panic!("Corrupted DB: {err}")
};

loop {
let Ok((_, message)) = new_blocks_subscription.read().await else {
return;
Expand Down Expand Up @@ -129,7 +138,38 @@ impl ChainStore {
bail!("Unexpected message type: {message:?}");
};

store.insert_block(info, &raw_block.body)
if store.should_persist(info.number) {
store.insert_block(info, &raw_block.body)?;
}

Ok(())
}

fn handle_first_block(store: &Arc<dyn Store>, message: &Message) -> Result<()> {
let Message::Cardano((block_info, CardanoMessage::BlockAvailable(raw_block))) = message
else {
bail!("Unexpected message type: {message:?}");
};

if !store.should_persist(block_info.number) {
if let Some(existing) = store.get_block_by_number(block_info.number)? {
if existing.bytes != raw_block.body {
return Err(anyhow::anyhow!(
"Stored block {} does not match. Set clear-store to true",
block_info.number
));
}
} else {
return Err(anyhow::anyhow!(
"Unable to retrieve block {}. Set clear-store to true",
block_info.number
));
}
}

Self::handle_new_block(store, message)?;

Ok(())
}

fn handle_blocks_query(
Expand Down
19 changes: 19 additions & 0 deletions modules/chain_store/src/stores/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct FjallStore {
keyspace: Keyspace,
blocks: FjallBlockStore,
txs: FjallTXStore,
last_persisted_block: Option<u64>,
}

const DEFAULT_DATABASE_PATH: &str = "fjall-blocks";
Expand All @@ -33,10 +34,20 @@ impl FjallStore {
let keyspace = fjall_config.open()?;
let blocks = FjallBlockStore::new(&keyspace)?;
let txs = FjallTXStore::new(&keyspace)?;

let last_persisted_block = if !clear {
blocks.block_hashes_by_number.iter().next_back().and_then(|res| {
res.ok().and_then(|(key, _)| key.as_ref().try_into().ok().map(u64::from_be_bytes))
})
} else {
None
};

Ok(Self {
keyspace,
blocks,
txs,
last_persisted_block,
})
}
}
Expand Down Expand Up @@ -69,6 +80,13 @@ impl super::Store for FjallStore {
Ok(())
}

fn should_persist(&self, block_number: u64) -> bool {
match self.last_persisted_block {
Some(last) => block_number > last,
None => false,
}
}

fn get_block_by_hash(&self, hash: &[u8]) -> Result<Option<Block>> {
self.blocks.get_by_hash(hash)
}
Expand Down Expand Up @@ -117,6 +135,7 @@ impl FjallBlockStore {
BLOCK_HASHES_BY_EPOCH_SLOT_PARTITION,
fjall::PartitionCreateOptions::default(),
)?;

Ok(Self {
blocks,
block_hashes_by_slot,
Expand Down
1 change: 1 addition & 0 deletions modules/chain_store/src/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod fjall;

pub trait Store: Send + Sync {
fn insert_block(&self, info: &BlockInfo, block: &[u8]) -> Result<()>;
fn should_persist(&self, block_number: u64) -> bool;

fn get_block_by_hash(&self, hash: &[u8]) -> Result<Option<Block>>;
fn get_block_by_slot(&self, slot: u64) -> Result<Option<Block>>;
Expand Down