Skip to content

Commit

Permalink
Stop accepting transactions when number of delayed receipts is large (#…
Browse files Browse the repository at this point in the history
…9222)

This PR introduces a soft limit on the number of delayed receipts in each shard. The limit is around 10000 delayed receipts to make sure this is enough to saturate the chunk capacity even if receipts are very small. The largest number of delayed receipts that we've seen in the last 3 months is around 400 and it stays at 0 most of the time.

In the future, we would be also using other signals like the size of delayed receipts, but this information will have to be first computed and stored in the chunk headers/trie store and will be done in #9228.

Addresses: #8877
  • Loading branch information
akashin authored and nikurt committed Aug 24, 2023
1 parent 89e6a16 commit b7ec41a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 78 deletions.
170 changes: 95 additions & 75 deletions integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use near_o11y::WithSpanContextExt;
use near_primitives::block::{Approval, ApprovalInner};
use near_primitives::block_header::BlockHeader;
use near_primitives::epoch_manager::RngSeed;
use near_primitives::errors::InvalidTxError;
use near_primitives::errors::TxExecutionError;
use near_primitives::errors::{ActionError, ActionErrorKind, InvalidTxError};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{verify_hash, PartialMerkleTree};
use near_primitives::receipt::DelayedReceiptIndices;
Expand Down Expand Up @@ -2775,6 +2775,8 @@ fn test_block_execution_outcomes() {
assert!(execution_outcomes_from_block[0].outcome_with_id.id == delayed_receipt_id[0]);
}

// This test verifies that gas consumed for processing refund receipts is taken into account
// for the purpose of limiting the size of the chunk.
#[test]
fn test_refund_receipts_processing() {
init_test_logger();
Expand All @@ -2788,8 +2790,8 @@ fn test_refund_receipts_processing() {
);
genesis.config.epoch_length = epoch_length;
genesis.config.min_gas_price = min_gas_price;
// Set gas limit to be small enough to produce some delay receipts, but
// large enough for transactions to get through.
// Set gas limit to be small enough to produce some delayed receipts, but large enough for
// transactions to get through.
genesis.config.gas_limit = 100_000_000;
let chain_genesis = ChainGenesis::new(&genesis);
let mut env = TestEnv::builder(chain_genesis)
Expand All @@ -2799,9 +2801,9 @@ fn test_refund_receipts_processing() {
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
let mut tx_hashes = vec![];
// send transactions to a non-existing account to generate refund
// Send transactions to a non-existing account to generate refunds.
for i in 0..3 {
// send transaction to the same account to generate local receipts
// Send transaction from the same account to generate local receipts.
let tx = SignedTransaction::send_money(
i + 1,
"test0".parse().unwrap(),
Expand All @@ -2814,84 +2816,102 @@ fn test_refund_receipts_processing() {
assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx);
}

env.produce_block(0, 3);
env.produce_block(0, 4);
let mut block_height = 5;
// Make sure all transactions are processed.
for i in 3..16 {
env.produce_block(0, i);
}

let test_shard_uid = ShardUId { version: 1, shard_id: 0 };
loop {
env.produce_block(0, block_height);
let block = env.clients[0].chain.get_block_by_height(block_height).unwrap().clone();
let prev_block =
env.clients[0].chain.get_block_by_height(block_height - 1).unwrap().clone();
let chunk_extra = env.clients[0]
.chain
.get_chunk_extra(prev_block.hash(), &test_shard_uid)
.unwrap()
.clone();
let state_update = env.clients[0]
.runtime_adapter
.get_tries()
.new_trie_update(test_shard_uid, *chunk_extra.state_root());
let delayed_indices: Option<DelayedReceiptIndices> =
get(&state_update, &TrieKey::DelayedReceiptIndices).unwrap();
let finished_all_delayed_receipts = match delayed_indices {
None => false,
Some(delayed_indices) => {
delayed_indices.next_available_index > 0
&& delayed_indices.first_index == delayed_indices.next_available_index
}
};
let chunk =
env.clients[0].chain.get_chunk(&block.chunks()[0].chunk_hash()).unwrap().clone();
if chunk.receipts().is_empty()
&& chunk.transactions().is_empty()
&& finished_all_delayed_receipts
{
break;
for tx_hash in tx_hashes {
let tx_outcome = env.clients[0].chain.get_execution_outcome(&tx_hash).unwrap();
assert_eq!(tx_outcome.outcome_with_id.outcome.receipt_ids.len(), 1);
if let ExecutionStatus::SuccessReceiptId(id) = tx_outcome.outcome_with_id.outcome.status {
let receipt_outcome = env.clients[0].chain.get_execution_outcome(&id).unwrap();
assert_matches!(
receipt_outcome.outcome_with_id.outcome.status,
ExecutionStatus::Failure(TxExecutionError::ActionError(ActionError {
kind: ActionErrorKind::AccountDoesNotExist { .. },
..
}))
);
let execution_outcomes_from_block = env.clients[0]
.chain
.store()
.get_block_execution_outcomes(&receipt_outcome.block_hash)
.unwrap()
.remove(&0)
.unwrap();
assert_eq!(execution_outcomes_from_block.len(), 1);
let chunk_extra = env.clients[0]
.chain
.get_chunk_extra(&receipt_outcome.block_hash, &test_shard_uid)
.unwrap()
.clone();
assert!(chunk_extra.gas_used() >= chunk_extra.gas_limit());
} else {
unreachable!("Transaction must succeed");
}
block_height += 1;
}
}

let mut refund_receipt_ids = HashSet::new();
for (_, id) in tx_hashes.into_iter().enumerate() {
let execution_outcome = env.clients[0].chain.get_execution_outcome(&id).unwrap();
assert_eq!(execution_outcome.outcome_with_id.outcome.receipt_ids.len(), 1);
match execution_outcome.outcome_with_id.outcome.status {
ExecutionStatus::SuccessReceiptId(id) => {
let receipt_outcome = env.clients[0].chain.get_execution_outcome(&id).unwrap();
assert_matches!(
receipt_outcome.outcome_with_id.outcome.status,
ExecutionStatus::Failure(TxExecutionError::ActionError(_))
);
for id in receipt_outcome.outcome_with_id.outcome.receipt_ids.iter() {
refund_receipt_ids.insert(*id);
}
}
_ => assert!(false),
};
// Tests that the number of delayed receipts in each shard is bounded based on the gas limit of
// the chunk and any new receipts are not included if there are too many delayed receipts.
#[test]
fn test_delayed_receipt_count_limit() {
init_test_logger();

let epoch_length = 5;
let min_gas_price = 10000;
let mut genesis = Genesis::test_sharded_new_version(vec!["test0".parse().unwrap()], 1, vec![1]);
genesis.config.epoch_length = epoch_length;
genesis.config.min_gas_price = min_gas_price;
// Set gas limit to be small enough to produce some delayed receipts, but large enough for
// transactions to get through.
// This will result in delayed receipt count limit of 20.
let transaction_costs = RuntimeConfig::test().fees;
let chunk_gas_limit = 10 * transaction_costs.fee(ActionCosts::new_action_receipt).exec_fee();
genesis.config.gas_limit = chunk_gas_limit;
let chain_genesis = ChainGenesis::new(&genesis);
let mut env = TestEnv::builder(chain_genesis)
.real_epoch_managers(&genesis.config)
.nightshade_runtimes(&genesis)
.build();
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();

let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
// Send enough transactions to saturate delayed receipts capacity.
let total_tx_count = 200usize;
for i in 0..total_tx_count {
let tx = SignedTransaction::from_actions(
(i + 1) as u64,
"test0".parse().unwrap(),
"test0".parse().unwrap(),
&signer,
vec![Action::DeployContract(DeployContractAction { code: vec![92; 10000] })],
*genesis_block.hash(),
);
assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx);
}

let ending_block_height = block_height - 1;
let begin_block_height = ending_block_height - refund_receipt_ids.len() as u64 + 1;
let mut processed_refund_receipt_ids = HashSet::new();
for i in begin_block_height..=ending_block_height {
let block = env.clients[0].chain.get_block_by_height(i).unwrap().clone();
let execution_outcomes_from_block = env.clients[0]
.chain
.store()
.get_block_execution_outcomes(block.hash())
.unwrap()
.remove(&0)
.unwrap();
for outcome in execution_outcomes_from_block.iter() {
processed_refund_receipt_ids.insert(outcome.outcome_with_id.id);
let mut included_tx_count = 0;
let mut height = 1;
while included_tx_count < total_tx_count {
env.produce_block(0, height);
let block = env.clients[0].chain.get_block_by_height(height).unwrap();
let chunk = env.clients[0].chain.get_chunk(&block.chunks()[0].chunk_hash()).unwrap();
// These checks are useful to ensure that we didn't mess up the test setup.
assert!(chunk.receipts().len() <= 1);
assert!(chunk.transactions().len() <= 5);

// Because all transactions are in the transactions pool, this means we have not included
// some transactions due to the delayed receipt count limit.
if included_tx_count > 0 && chunk.transactions().is_empty() {
break;
}
let chunk_extra =
env.clients[0].chain.get_chunk_extra(block.hash(), &test_shard_uid).unwrap().clone();
assert_eq!(execution_outcomes_from_block.len(), 1);
assert!(chunk_extra.gas_used() >= chunk_extra.gas_limit());
included_tx_count += chunk.transactions().len();
height += 1;
}
assert_eq!(processed_refund_receipt_ids, refund_receipt_ids);
assert!(included_tx_count < total_tx_count);
}

#[test]
Expand Down
34 changes: 31 additions & 3 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_primitives::config::ExtCosts;
use near_primitives::contract::ContractCode;
use near_primitives::errors::{InvalidTxError, RuntimeError, StorageError};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::receipt::{DelayedReceiptIndices, Receipt};
use near_primitives::runtime::config_store::RuntimeConfigStore;
use near_primitives::runtime::migration_data::{MigrationData, MigrationFlags};
use near_primitives::sandbox::state_patch::SandboxStatePatch;
Expand All @@ -27,6 +27,7 @@ use near_primitives::shard_layout::{
};
use near_primitives::state_part::PartId;
use near_primitives::transaction::SignedTransaction;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::validator_stake::ValidatorStakeIter;
use near_primitives::types::{
AccountId, Balance, BlockHeight, EpochHeight, EpochId, EpochInfoProvider, Gas, MerkleHash,
Expand All @@ -43,7 +44,7 @@ use near_store::{
ApplyStatePartResult, DBCol, PartialStorage, ShardTries, StateSnapshotConfig, Store,
StoreCompiledContractCache, Trie, TrieConfig, WrappedTrieChanges, COLD_HEAD_KEY,
};
use near_vm_runner::logic::CompiledContractCache;
use near_vm_runner::logic::{ActionCosts, CompiledContractCache};
use near_vm_runner::precompile_contract;
use node_runtime::adapter::ViewRuntimeAdapter;
use node_runtime::state_viewer::TrieViewer;
Expand Down Expand Up @@ -689,6 +690,30 @@ impl RuntimeAdapter for NightshadeRuntime {

let runtime_config = self.runtime_config_store.get_config(current_protocol_version);

// To avoid limiting the throughput of the network, we want to include enough receipts to
// saturate the capacity of the chunk even in case when all of these receipts end up using
// the smallest possible amount of gas, which is at least the cost of execution of action
// receipt.
// Currently, the min execution cost is ~100 GGas and the chunk capacity is 1 PGas, giving
// a bound of at most 10000 receipts processed in a chunk.
let delayed_receipts_indices: DelayedReceiptIndices =
near_store::get(&state_update, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default();
let min_fee = runtime_config.fees.fee(ActionCosts::new_action_receipt).exec_fee();
let new_receipt_count_limit = if min_fee > 0 {
// Round up to include at least one receipt.
let max_processed_receipts_in_chunk = (gas_limit + min_fee - 1) / min_fee;
// Allow at most 2 chunks worth of delayed receipts. This way under congestion,
// after processing a single chunk, we will still have at least 1 chunk worth of
// delayed receipts, ensuring the high throughput even if the next chunk producer
// does not include any receipts.
// This buffer size is a trade-off between the max queue size and system efficiency
// under congestion.
let delayed_receipt_count_limit = max_processed_receipts_in_chunk * 2;
delayed_receipt_count_limit.saturating_sub(delayed_receipts_indices.len()) as usize
} else {
usize::MAX
};

// In general, we limit the number of transactions via send_fees.
// However, as a second line of defense, we want to limit the byte size
// of transaction as well. Rather than introducing a separate config for
Expand All @@ -700,7 +725,10 @@ impl RuntimeAdapter for NightshadeRuntime {
/ (runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_write_value_byte)
+ runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_read_value_byte));

while total_gas_burnt < transactions_gas_limit && total_size < size_limit {
while total_gas_burnt < transactions_gas_limit
&& total_size < size_limit
&& transactions.len() < new_receipt_count_limit
{
if let Some(iter) = pool_iterator.next() {
while let Some(tx) = iter.next() {
num_checked_transactions += 1;
Expand Down

0 comments on commit b7ec41a

Please sign in to comment.