Skip to content

Commit

Permalink
fix gc for sharding upgrade (#5040)
Browse files Browse the repository at this point in the history
resolve #4710

Also added a python test for gc with sharding upgrade
  • Loading branch information
mzhangmzz authored and bowenwang1996 committed Oct 26, 2021
1 parent c0abbda commit 07d87cb
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 126 deletions.
17 changes: 13 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,11 @@ impl Chain {
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update
.clear_block_data(*block_hash, GCMode::Canonical(tries.clone()))?;
chain_store_update.clear_block_data(
&*self.runtime_adapter,
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;
} else {
return Err(ErrorKind::GCError(
Expand Down Expand Up @@ -670,8 +673,11 @@ impl Chain {
*chain_store_update.get_block_header(&current_hash)?.prev_hash();

// It's safe to call `clear_block_data` for prev data because it clears fork only here
chain_store_update
.clear_block_data(current_hash, GCMode::Fork(tries.clone()))?;
chain_store_update.clear_block_data(
&*self.runtime_adapter,
current_hash,
GCMode::Fork(tries.clone()),
)?;
chain_store_update.commit()?;
*gc_blocks_remaining -= 1;

Expand Down Expand Up @@ -933,19 +939,22 @@ impl Chain {
let blocks_current_height =
blocks_current_height.values().flatten().cloned().collect::<Vec<_>>();
for block_hash in blocks_current_height {
let runtime_adapter = self.runtime_adapter();
let mut chain_store_update = self.mut_store().store_update();
if !tail_prev_block_cleaned {
let prev_block_hash =
*chain_store_update.get_block_header(&block_hash)?.prev_hash();
if chain_store_update.get_block(&prev_block_hash).is_ok() {
chain_store_update.clear_block_data(
&*runtime_adapter,
prev_block_hash,
GCMode::StateSync { clear_block_info: true },
)?;
}
tail_prev_block_cleaned = true;
}
chain_store_update.clear_block_data(
&*runtime_adapter,
block_hash,
GCMode::StateSync { clear_block_info: block_hash != prev_hash },
)?;
Expand Down
156 changes: 95 additions & 61 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2103,73 +2103,95 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

fn get_shard_uids_to_gc(
&mut self,
runtime_adapter: &dyn RuntimeAdapter,
block_hash: &CryptoHash,
) -> Vec<ShardUId> {
let block_header =
self.get_block_header(&block_hash).expect("block header must exist").clone();
let shard_layout = runtime_adapter
.get_shard_layout(&block_header.epoch_id())
.expect("epoch info must exist");
// gc shards in this epoch
let mut shard_uids_to_gc: Vec<_> = shard_layout.get_shard_uids();
// gc shards in the shard layout in the next epoch if shards will change in the next epoch
// Suppose shard changes at epoch T, we need to garbage collect the new shard layout
// from the last block in epoch T-2 to the last block in epoch T-1
// Because we need to gc the last block in epoch T-2, we can't simply use
// block_header.epoch_id() as next_epoch_id
let next_epoch_id = runtime_adapter
.get_next_epoch_id_from_prev_block(block_hash)
.expect("block info must exist");
let next_shard_layout =
runtime_adapter.get_shard_layout(&next_epoch_id).expect("epoch info must exist");
if shard_layout != next_shard_layout {
shard_uids_to_gc.extend(next_shard_layout.get_shard_uids());
}
shard_uids_to_gc
}

// Clearing block data of `block_hash`, if on a fork.
// Clearing block data of `block_hash.prev`, if on the Canonical Chain.
pub fn clear_block_data(
&mut self,
runtime_adapter: &dyn RuntimeAdapter,
mut block_hash: CryptoHash,
gc_mode: GCMode,
) -> Result<(), Error> {
let mut store_update = self.store().store_update();
let header = self.get_block_header(&block_hash).expect("block header must exist").clone();

// 1. Apply revert insertions or deletions from ColTrieChanges for Trie
match gc_mode.clone() {
GCMode::Fork(tries) => {
// If the block is on a fork, we delete the state that's the result of applying this block
for shard_id in 0..header.chunk_mask().len() as ShardId {
// TODO: pass in the actual shard version that this block uses
// https://github.com/near/nearcore/issues/4710
let shard_uid = ShardUId { version: 0, shard_id: shard_id as u32 };
self.store()
.get_ser(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
.map(|trie_changes: TrieChanges| {
tries
.revert_insertions(&trie_changes, shard_uid, &mut store_update)
.map(|_| {
self.gc_col(
ColTrieChanges,
&get_block_shard_uid(&block_hash, &shard_uid),
);
self.inc_gc_col_state();
})
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or(Ok(()))?;
{
let shard_uids_to_gc: Vec<_> = self.get_shard_uids_to_gc(runtime_adapter, &block_hash);
match gc_mode.clone() {
GCMode::Fork(tries) => {
// If the block is on a fork, we delete the state that's the result of applying this block
for shard_uid in shard_uids_to_gc {
self.store()
.get_ser(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
.map(|trie_changes: TrieChanges| {
tries
.revert_insertions(&trie_changes, shard_uid, &mut store_update)
.map(|_| {
self.gc_col(
ColTrieChanges,
&get_block_shard_uid(&block_hash, &shard_uid),
);
self.inc_gc_col_state();
})
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or(Ok(()))?;
}
}
}
GCMode::Canonical(tries) => {
// If the block is on canonical chain, we delete the state that's before applying this block
for shard_id in 0..header.chunk_mask().len() as ShardId {
// TODO: pass in the actual shard version that this block uses
// https://github.com/near/nearcore/issues/4710
let shard_uid = ShardUId { version: 0, shard_id: shard_id as u32 };
self.store()
.get_ser(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
.map(|trie_changes: TrieChanges| {
tries
.apply_deletions(&trie_changes, shard_uid, &mut store_update)
.map(|_| {
self.gc_col(
ColTrieChanges,
&get_block_shard_uid(&block_hash, &shard_uid),
);
self.inc_gc_col_state();
})
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or(Ok(()))?;
GCMode::Canonical(tries) => {
// If the block is on canonical chain, we delete the state that's before applying this block
for shard_uid in shard_uids_to_gc {
self.store()
.get_ser(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
.map(|trie_changes: TrieChanges| {
tries
.apply_deletions(&trie_changes, shard_uid, &mut store_update)
.map(|_| {
self.gc_col(
ColTrieChanges,
&get_block_shard_uid(&block_hash, &shard_uid),
);
self.inc_gc_col_state();
})
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or(Ok(()))?;
}
// Set `block_hash` on previous one
block_hash = *self.get_block_header(&block_hash)?.prev_hash();
}
// Set `block_hash` on previous one
block_hash = *self.get_block_header(&block_hash)?.prev_hash();
}
GCMode::StateSync { .. } => {
// Not apply the data from ColTrieChanges
// TODO: pass in the actual shard version that this block uses
// https://github.com/near/nearcore/issues/4710
for shard_id in 0..header.chunk_mask().len() as ShardId {
let shard_uid = ShardUId { version: 0, shard_id: shard_id as u32 };
self.gc_col(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid));
GCMode::StateSync { .. } => {
// Not apply the data from ColTrieChanges
for shard_uid in shard_uids_to_gc {
self.gc_col(ColTrieChanges, &get_block_shard_uid(&block_hash, &shard_uid));
}
}
}
}
Expand All @@ -2188,11 +2210,6 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(ColChunkPerHeightShard, &block_shard_id);
self.gc_col(ColNextBlockWithNewChunk, &block_shard_id);

// TODO: use the real shard version https://github.com/near/nearcore/issues/4710
let shard_uid = ShardUId { version: 0, shard_id: shard_id as u32 };
let block_shard_uid = get_block_shard_uid(&block_hash, &shard_uid);
self.gc_col(ColChunkExtra, &block_shard_uid);

// For incoming State Parts it's done in chain.clear_downloaded_parts()
// The following code is mostly for outgoing State Parts.
// However, if node crashes while State Syncing, it may never clear
Expand All @@ -2211,6 +2228,11 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(ColLastBlockWithNewChunk, &index_to_bytes(shard_id));
}
}
// gc ColChunkExtra based on shard_uid since it's indexed by shard_uid in the storage
for shard_uid in self.get_shard_uids_to_gc(runtime_adapter, &block_hash) {
let block_shard_uid = get_block_shard_uid(&block_hash, &shard_uid);
self.gc_col(ColChunkExtra, &block_shard_uid);
}

// 3. Delete block_hash-indexed data
let block_hash_vec: Vec<u8> = block_hash.as_ref().into();
Expand Down Expand Up @@ -3306,6 +3328,7 @@ mod tests {
#[test]
fn test_clear_old_data() {
let mut chain = get_chain_with_epoch_length(1);
let runtime_adapter = chain.runtime_adapter.clone();
let genesis = chain.get_block_by_height(0).unwrap().clone();
let signer = Arc::new(InMemoryValidatorSigner::from_seed(
"test1".parse().unwrap(),
Expand All @@ -3315,6 +3338,10 @@ mod tests {
let mut prev_block = genesis.clone();
let mut blocks = vec![prev_block.clone()];
for i in 1..15 {
// This is a hack to make the KeyValueRuntime to have epoch information stored
runtime_adapter
.get_next_epoch_id_from_prev_block(prev_block.hash())
.expect("block must exist");
let block = Block::empty_with_height(&prev_block, i, &*signer.clone());
blocks.push(block.clone());
let mut store_update = chain.mut_store().store_update();
Expand Down Expand Up @@ -3403,6 +3430,7 @@ mod tests {
#[test]
fn test_clear_old_data_fixed_height() {
let mut chain = get_chain();
let runtime_adapter = chain.runtime_adapter.clone();
let genesis = chain.get_block_by_height(0).unwrap().clone();
let signer = Arc::new(InMemoryValidatorSigner::from_seed(
"test1".parse().unwrap(),
Expand All @@ -3412,6 +3440,10 @@ mod tests {
let mut prev_block = genesis.clone();
let mut blocks = vec![prev_block.clone()];
for i in 1..10 {
// This is a hack to make the KeyValueRuntime to have epoch information stored
runtime_adapter
.get_next_epoch_id_from_prev_block(prev_block.hash())
.expect("block must exist");
let mut store_update = chain.mut_store().store_update();

let block = Block::empty_with_height(&prev_block, i, &*signer);
Expand Down Expand Up @@ -3448,7 +3480,9 @@ mod tests {

let trie = chain.runtime_adapter.get_tries();
let mut store_update = chain.mut_store().store_update();
assert!(store_update.clear_block_data(*blocks[5].hash(), GCMode::Canonical(trie)).is_ok());
assert!(store_update
.clear_block_data(&*runtime_adapter, *blocks[5].hash(), GCMode::Canonical(trie))
.is_ok());
store_update.commit().unwrap();

assert!(chain.get_block(blocks[4].hash()).is_err());
Expand Down
9 changes: 7 additions & 2 deletions chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,14 @@ impl StoreValidator {
self.check(&validate::chunk_tx_exists, &chunk_hash, &shard_chunk, col);
}
DBCol::ColChunkExtra => {
let (block_hash, _) = get_block_shard_uid_rev(key_ref)?;
let (block_hash, shard_uid) = get_block_shard_uid_rev(key_ref)?;
let chunk_extra = ChunkExtra::try_from_slice(value_ref)?;
self.check(&validate::chunk_extra_block_exists, &block_hash, &chunk_extra, col);
self.check(
&validate::chunk_extra_block_exists,
&(block_hash, shard_uid),
&chunk_extra,
col,
);
}
DBCol::ColTrieChanges => {
let (block_hash, shard_uid) = get_block_shard_uid_rev(key_ref)?;
Expand Down
Loading

0 comments on commit 07d87cb

Please sign in to comment.