Skip to content

Commit

Permalink
Fix #11135 (#11153)
Browse files Browse the repository at this point in the history
@Longarithm fixed #11135 by forcing flat storage head to move after
state sync. The pytest `single_shard_tracking` exposes this issue.

Instructions to run the test
```
cargo build -p neard --features test_features,statelessnet_protocol
python3 pytest/tests/sanity/single_shard_tracking.py
```

---------

Co-authored-by: Longarithm <the.aleksandr.logunov@gmail.com>
  • Loading branch information
bowenwang1996 and Longarithm committed Apr 25, 2024
1 parent 6aa1aad commit 4ea04e4
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 4 deletions.
14 changes: 13 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,12 @@ impl Chain {
true,
)
};
tracing::debug!(
target: "chain",
"Updating flat storage for shard {} need_flat_storage_update: {}",
shard_id,
need_flat_storage_update
);

if need_flat_storage_update {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
Expand Down Expand Up @@ -2796,7 +2802,7 @@ impl Chain {
let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let mut height = shard_state_header.chunk_height_included();
let mut chain_update = self.chain_update();
chain_update.set_state_finalize(shard_id, sync_hash, shard_state_header)?;
let shard_uid = chain_update.set_state_finalize(shard_id, sync_hash, shard_state_header)?;
chain_update.commit()?;

// We restored the state on height `shard_state_header.chunk.header.height_included`.
Expand All @@ -2813,6 +2819,12 @@ impl Chain {
}
}

let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
if let Some(flat_storage) = flat_storage_manager.get_flat_storage_for_shard(shard_uid) {
let header = self.get_block_header(&sync_hash)?;
flat_storage.update_flat_head(header.prev_hash(), true).unwrap();
}

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ impl<'a> ChainUpdate<'a> {
shard_id: ShardId,
sync_hash: CryptoHash,
shard_state_header: ShardStateSyncResponseHeader,
) -> Result<(), Error> {
) -> Result<ShardUId, Error> {
let _span =
tracing::debug_span!(target: "sync", "chain_update_set_state_finalize").entered();
let (chunk, incoming_receipts_proofs) = match shard_state_header {
Expand Down Expand Up @@ -805,7 +805,7 @@ impl<'a> ChainUpdate<'a> {
receipt_proof_response.1,
);
}
Ok(())
Ok(shard_uid)
}

pub fn set_state_finalize_on_height(
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl FlatStorageManager {
}
}
});
} else {
tracing::debug!(target: "store", ?shard_uid, block_height=?block.header().height(), "No flat storage!!!");
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl FlatStorage {

let new_head = guard.get_new_flat_head(*block_hash, strict)?;
if new_head == guard.flat_head.hash {
tracing::debug!(target: "store", "update_flat_head, shard id {}, flat head already at block {}", guard.shard_uid.shard_id(), guard.flat_head.height);
return Ok(());
}

Expand Down
2 changes: 2 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pytest --timeout=600 sanity/state_sync_routed.py manytx 115 --features nightly
# working on a fix / deciding whether to remove them.
#pytest --timeout=300 sanity/state_sync_late.py notx
#pytest --timeout=300 sanity/state_sync_late.py notx --features nightly
pytest sanity/single_shard_tracking.py
pytest sanity/single_shard_tracking.py --features nightly

pytest --timeout=3600 sanity/state_sync_massive.py
pytest --timeout=3600 sanity/state_sync_massive_validator.py
Expand Down
3 changes: 2 additions & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ def apply_config_changes(node_dir, client_config_change):
'save_trie_changes', 'split_storage',
'state_sync', 'state_sync_enabled',
'store.state_snapshot_enabled',
'tracked_shard_schedule', 'cold_store')
'tracked_shard_schedule', 'cold_store',
'store.load_mem_tries_for_tracked_shards')

for k, v in client_config_change.items():
if not (k in allowed_missing_configs or k in config_json):
Expand Down
135 changes: 135 additions & 0 deletions pytest/tests/sanity/single_shard_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
# Spins up 4 validating nodes and 1 non-validating node. There are four shards in this test.
# Send random transactions between shards.
# Stop all validating nodes at random times and restart them.
# Repeat the process a few times and make sure the network can progress over a few epochs.

import pathlib
import random
import sys
import time

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import start_cluster, spin_up_node, load_config, apply_config_changes
import account
import state_sync_lib
import transaction
import utils

from configured_logger import logger

EPOCH_LENGTH = 20


def random_u64():
return bytes(random.randint(0, 255) for _ in range(8))


# Generates traffic for all possible shards.
# Assumes that `test0`, `test1`, `near` all belong to different shards.
def random_workload_until(target, nonce, keys, node0, node1, target_node):
last_height = -1
while True:
nonce += 1

last_block = target_node.get_latest_block()
height = last_block.height
if height > target:
break
if height != last_height:
logger.info(
f'@{height}, epoch_height: {state_sync_lib.approximate_epoch_height(height, EPOCH_LENGTH)}'
)
last_height = height

last_block_hash = node0.get_latest_block().hash_bytes
if random.random() < 0.5:
# Make a transfer between shards.
# The goal is to generate cross-shard receipts.
key_from = random.choice([node0, node1]).signer_key
account_to = random.choice([
node0.signer_key.account_id, node1.signer_key.account_id, "near"
])
payment_tx = transaction.sign_payment_tx(key_from, account_to, 1,
nonce, last_block_hash)
node0.send_tx(payment_tx).get('result')
elif (len(keys) > 100 and random.random() < 0.5) or len(keys) > 1000:
# Do some flat storage reads, but only if we have enough keys populated.
key = keys[random.randint(0, len(keys) - 1)]
for node in [node0, node1]:
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
else:
# Generate some data for flat storage reads
key = random_u64()
keys.append(key)
for node in [node0, node1]:
call_function('write', key, nonce, node.signer_key,
last_block_hash, node0)
return nonce, keys


def call_function(op, key, nonce, signer_key, last_block_hash, node):
if op == 'read':
args = key
fn = 'read_value'
else:
args = key + random_u64()
fn = 'write_key_value'

tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id,
fn, args, 300 * account.TGAS, 0,
nonce, last_block_hash)
return node.send_tx(tx).get('result')


def main():
node_config_dump, node_config_sync = state_sync_lib.get_state_sync_configs_pair(
)
node_config_sync["tracked_shards"] = []
node_config_sync["store.load_mem_tries_for_tracked_shards"] = True
node_config_dump["store.load_mem_tries_for_tracked_shards"] = True
configs = {x: node_config_sync for x in range(4)}
configs[4] = node_config_dump

nodes = start_cluster(
4, 1, 4, None, [["epoch_length", EPOCH_LENGTH],
["shuffle_shard_assignment_for_chunk_producers", True],
["block_producer_kickout_threshold", 20],
["chunk_producer_kickout_threshold", 20]], configs)

for node in nodes:
node.stop_checking_store()

print("nodes started")
contract = utils.load_test_contract()

latest_block_hash = nodes[0].get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
nodes[0].signer_key, contract, 1, latest_block_hash)
result = nodes[0].send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))

nonce = 2
keys = []
nonce, keys = random_workload_until(EPOCH_LENGTH * 2, nonce, keys, nodes[0],
nodes[1], nodes[4])
for i in range(2, 6):
print(f"iteration {i} starts")
stop_height = random.randint(1, EPOCH_LENGTH)
nonce, keys = random_workload_until(EPOCH_LENGTH * i + stop_height,
nonce, keys, nodes[i // 5],
nodes[(i + 1) // 5], nodes[4])
for i in range(4):
nodes[i].kill()
time.sleep(2)
for i in range(4):
nodes[i].start(boot_node=nodes[4])


if __name__ == "__main__":
main()

0 comments on commit 4ea04e4

Please sign in to comment.