Skip to content

Commit

Permalink
feat: configure num threads for flat storage creation (#8088)
Browse files Browse the repository at this point in the history
Make number of threads for background flat storage creation configurable. In the node config we set it to 8, but we also allow node owners to modify it if they observe that BP slows down or creation is too slow until FS becomes a requirement. In the tests we limit number of threads to 1.

I think introducing some general structure as `ChainConfig` makes sense here, as we already use a configuration option `save_trie_changes`.

I'm open to other opinions how to organize this. In general `NearConfig` structure looks a bit suspicious to me, because it includes `config` taken from `config.json` and many other configs which are just derived from `config`.

## Testing

Checking that parameter is taken correctly by the node.
  • Loading branch information
Longarithm committed Nov 21, 2022
1 parent 33b2249 commit 0040e92
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
to set limits on the trie cache. Deprecates the never announced
`store.trie_cache_capacities` option which was mentioned in previous change.
[#7578](https://github.com/near/nearcore/pull/7578)
* New option `store.background_migration_threads` in `config.json`. Defines
number of threads to execute background migrations of storage. Currently used
for flat storage migration. Set to 8 by default, can be reduced if it slows down
block processing too much or increased if you want to speed up migration.
* Tracing of work across actix workers within a process:
[#7866](https://github.com/near/nearcore/pull/7866),
[#7819](https://github.com/near/nearcore/pull/7819),
Expand Down
13 changes: 9 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::types::{
AcceptedBlock, ApplySplitStateResult, ApplySplitStateResultOrStateChanges,
ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, BlockHeaderInfo, BlockStatus,
ChainGenesis, Provenance, RuntimeAdapter,
ChainConfig, ChainGenesis, Provenance, RuntimeAdapter,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra,
Expand Down Expand Up @@ -545,11 +545,12 @@ impl Chain {
runtime_adapter: Arc<dyn RuntimeAdapter>,
chain_genesis: &ChainGenesis,
doomslug_threshold_mode: DoomslugThresholdMode,
save_trie_changes: bool,
chain_config: ChainConfig,
) -> Result<Chain, Error> {
// Get runtime initial state and create genesis block out of it.
let (store, state_roots) = runtime_adapter.genesis_state();
let mut store = ChainStore::new(store, chain_genesis.height, save_trie_changes);
let mut store =
ChainStore::new(store, chain_genesis.height, chain_config.save_trie_changes);
let genesis_chunks = genesis_chunks(
state_roots.clone(),
runtime_adapter.num_shards(&EpochId::default())?,
Expand Down Expand Up @@ -653,7 +654,11 @@ impl Chain {
store_update.commit()?;

// Create flat storage or initiate migration to flat storage.
let flat_storage_creator = FlatStorageCreator::new(runtime_adapter.clone(), &store);
let flat_storage_creator = FlatStorageCreator::new(
runtime_adapter.clone(),
&store,
chain_config.background_migration_threads,
);

info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}",
header_head.height, header_head.last_block_hash,
Expand Down
13 changes: 7 additions & 6 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use near_chain_primitives::Error;
use near_primitives::types::{BlockHeight, ShardId};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::store_helper;
use near_store::flat_state::{FlatStorageStateStatus, NUM_PARTS_IN_ONE_STEP};
use near_store::flat_state::FlatStorageStateStatus;
use std::sync::Arc;
#[cfg(feature = "protocol_feature_flat_state")]
use tracing::debug;
Expand Down Expand Up @@ -109,7 +109,11 @@ pub struct FlatStorageCreator {
}

impl FlatStorageCreator {
pub fn new(runtime_adapter: Arc<dyn RuntimeAdapter>, chain_store: &ChainStore) -> Option<Self> {
pub fn new(
runtime_adapter: Arc<dyn RuntimeAdapter>,
chain_store: &ChainStore,
num_threads: usize,
) -> Option<Self> {
let chain_head = chain_store.head().unwrap();
let num_shards = runtime_adapter.num_shards(&chain_head.epoch_id).unwrap();
let start_height = chain_head.height;
Expand Down Expand Up @@ -138,10 +142,7 @@ impl FlatStorageCreator {
if creation_needed {
Some(Self {
shard_creators,
pool: rayon::ThreadPoolBuilder::new()
.num_threads(NUM_PARTS_IN_ONE_STEP as usize)
.build()
.unwrap(),
pool: rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap(),
})
} else {
None
Expand Down
10 changes: 8 additions & 2 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,7 @@ mod tests {
use crate::store::{ChainStoreAccess, GCMode};
use crate::store_validator::StoreValidator;
use crate::test_utils::{KeyValueRuntime, ValidatorSchedule};
use crate::types::ChainConfig;
use crate::{Chain, ChainGenesis, DoomslugThresholdMode, RuntimeAdapter};

fn get_chain() -> Chain {
Expand All @@ -3003,8 +3004,13 @@ mod tests {
.block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]);
let runtime_adapter =
Arc::new(KeyValueRuntime::new_with_validators(store, vs, epoch_length));
Chain::new(runtime_adapter, &chain_genesis, DoomslugThresholdMode::NoApprovals, true)
.unwrap()
Chain::new(
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
ChainConfig::test(),
)
.unwrap()
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ mod tests {
use near_store::test_utils::create_test_store;

use crate::test_utils::KeyValueRuntime;
use crate::types::ChainConfig;
use crate::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode};

use super::*;
Expand All @@ -395,7 +396,7 @@ mod tests {
runtime_adapter.clone(),
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::test(),
)
.unwrap();
(chain, StoreValidator::new(None, genesis, runtime_adapter, store, false))
Expand Down
7 changes: 4 additions & 3 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ use crate::block_processing_utils::BlockNotInPoolError;
use crate::chain::Chain;
use crate::store::ChainStoreAccess;
use crate::types::{
AcceptedBlock, ApplySplitStateResult, ApplyTransactionResult, BlockHeaderInfo, ChainGenesis,
AcceptedBlock, ApplySplitStateResult, ApplyTransactionResult, BlockHeaderInfo, ChainConfig,
ChainGenesis,
};
use crate::{BlockHeader, DoomslugThresholdMode, RuntimeAdapter};
use crate::{BlockProcessingArtifact, Provenance};
Expand Down Expand Up @@ -1425,7 +1426,7 @@ pub fn setup_with_tx_validity_period(
protocol_version: PROTOCOL_VERSION,
},
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::test(),
)
.unwrap();
let test_account = "test".parse::<AccountId>().unwrap();
Expand Down Expand Up @@ -1465,7 +1466,7 @@ pub fn setup_with_validators(
protocol_version: PROTOCOL_VERSION,
},
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::test(),
)
.unwrap();
(chain, runtime, signers)
Expand Down
10 changes: 8 additions & 2 deletions chain/chain/src/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::chain::Chain;
use crate::test_utils::{KeyValueRuntime, ValidatorSchedule};
use crate::types::{ChainGenesis, Tip};
use crate::types::{ChainConfig, ChainGenesis, Tip};
use crate::DoomslugThresholdMode;

use near_chain_configs::GCConfig;
Expand Down Expand Up @@ -30,7 +30,13 @@ fn get_chain_with_epoch_length_and_num_shards(
.block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]])
.num_shards(num_shards);
let runtime_adapter = Arc::new(KeyValueRuntime::new_with_validators(store, vs, epoch_length));
Chain::new(runtime_adapter, &chain_genesis, DoomslugThresholdMode::NoApprovals, true).unwrap()
Chain::new(
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
ChainConfig::test(),
)
.unwrap()
}

// Build a chain of num_blocks on top of prev_block
Expand Down
14 changes: 14 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ pub struct ChainGenesis {
pub protocol_version: ProtocolVersion,
}

pub struct ChainConfig {
/// Whether to save `TrieChanges` on disk or not.
pub save_trie_changes: bool,
/// Number of threads to execute background migration work.
/// Currently used for flat storage background creation.
pub background_migration_threads: usize,
}

impl ChainConfig {
pub fn test() -> Self {
Self { save_trie_changes: true, background_migration_threads: 1 }
}
}

impl ChainGenesis {
pub fn new(genesis: &Genesis) -> Self {
Self {
Expand Down
7 changes: 5 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use near_chain::chain::{
OrphanMissingChunks, StateSplitRequest, TX_ROUTING_HEIGHT_HORIZON,
};
use near_chain::test_utils::format_hash;
use near_chain::types::LatestKnown;
use near_chain::types::{ChainConfig, LatestKnown};
use near_chain::{
BlockProcessingArtifact, BlockStatus, Chain, ChainGenesis, ChainStoreAccess,
DoneApplyChunkCallback, Doomslug, DoomslugThresholdMode, Provenance, RuntimeAdapter,
Expand Down Expand Up @@ -188,7 +188,10 @@ impl Client {
runtime_adapter.clone(),
&chain_genesis,
doomslug_threshold_mode,
!config.archive,
ChainConfig {
save_trie_changes: !config.archive,
background_migration_threads: config.client_background_migration_threads,
},
)?;
let me = validator_signer.as_ref().map(|x| x.validator_id().clone());
let shards_mgr = ShardsManager::new(
Expand Down
10 changes: 8 additions & 2 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use near_chain::test_utils::{KeyValueRuntime, ValidatorSchedule};
use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_network::test_utils::peer_id_from_seed;
use near_primitives::version::PROTOCOL_VERSION;
Expand Down Expand Up @@ -539,8 +540,13 @@ mod tests {
protocol_version: PROTOCOL_VERSION,
};
let doomslug_threshold_mode = DoomslugThresholdMode::TwoThirds;
let chain =
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, true).unwrap();
let chain = Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig::test(),
)
.unwrap();

let telemetry = info_helper.telemetry_info(
&chain.head().unwrap(),
Expand Down
18 changes: 15 additions & 3 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use near_chain::test_utils::{
wait_for_all_blocks_in_processing, wait_for_block_in_processing, KeyValueRuntime,
ValidatorSchedule,
};
use near_chain::types::ChainConfig;
use near_chain::{
Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Provenance, RuntimeAdapter,
};
Expand Down Expand Up @@ -217,8 +218,13 @@ pub fn setup(
} else {
DoomslugThresholdMode::NoApprovals
};
let chain =
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, !archive).unwrap();
let chain = Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig { save_trie_changes: !archive, background_migration_threads: 1 },
)
.unwrap();
let genesis_block = chain.get_block(&chain.genesis().hash().clone()).unwrap();

let signer = Arc::new(InMemoryValidatorSigner::from_seed(
Expand Down Expand Up @@ -302,7 +308,13 @@ pub fn setup_only_view(
} else {
DoomslugThresholdMode::NoApprovals
};
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, !archive).unwrap();
Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig { save_trie_changes: !archive, background_migration_threads: 1 },
)
.unwrap();

let signer = Arc::new(InMemoryValidatorSigner::from_seed(
account_id.clone(),
Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct ClientConfig {
pub max_gas_burnt_view: Option<Gas>,
/// Re-export storage layer statistics as prometheus metrics.
pub enable_statistics_export: bool,
/// Number of threads to execute background migration work in client.
pub client_background_migration_threads: usize,
}

impl ClientConfig {
Expand Down Expand Up @@ -215,6 +217,7 @@ impl ClientConfig {
trie_viewer_state_size_limit: None,
max_gas_burnt_view: None,
enable_statistics_export: true,
client_background_migration_threads: 1,
}
}
}
9 changes: 9 additions & 0 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pub struct StoreConfig {
/// be copied between the databases.
#[serde(skip_serializing_if = "MigrationSnapshot::is_default")]
pub migration_snapshot: MigrationSnapshot,

/// Number of threads to execute storage background migrations.
/// Needed to create flat storage which need to happen in parallel
/// with block processing.
pub background_migration_threads: usize,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -210,6 +215,10 @@ impl Default for StoreConfig {
],

migration_snapshot: Default::default(),

// We checked that this number of threads doesn't impact
// regular block processing significantly.
background_migration_threads: 8,
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions integration-tests/src/genesis_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use tempfile::tempdir;

use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::Genesis;
use near_primitives::block::{Block, BlockHeader};
Expand All @@ -21,7 +22,8 @@ pub fn genesis_header(genesis: &Genesis) -> BlockHeader {
let chain_genesis = ChainGenesis::new(genesis);
let runtime = Arc::new(NightshadeRuntime::test(dir.path(), store, genesis));
let chain =
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, true).unwrap();
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, ChainConfig::test())
.unwrap();
chain.genesis().clone()
}

Expand All @@ -32,6 +34,7 @@ pub fn genesis_block(genesis: &Genesis) -> Block {
let chain_genesis = ChainGenesis::new(genesis);
let runtime = Arc::new(NightshadeRuntime::test(dir.path(), store, genesis));
let chain =
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, true).unwrap();
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, ChainConfig::test())
.unwrap();
chain.get_block(&chain.genesis().hash().clone()).unwrap()
}
1 change: 1 addition & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ impl NearConfig {
trie_viewer_state_size_limit: config.trie_viewer_state_size_limit,
max_gas_burnt_view: config.max_gas_burnt_view,
enable_statistics_export: config.store.enable_statistics_export,
client_background_migration_threads: config.store.background_migration_threads,
},
network_config: NetworkConfig::new(
config.network,
Expand Down
7 changes: 5 additions & 2 deletions tools/speedy_sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use borsh::{BorshDeserialize, BorshSerialize};
use clap::Parser;
use near_chain::types::Tip;
use near_chain::types::{ChainConfig, Tip};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::GenesisValidationMode;
use near_epoch_manager::types::EpochInfoAggregator;
Expand Down Expand Up @@ -237,7 +237,10 @@ fn load_snapshot(load_cmd: LoadCmd) {
runtime.clone(),
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
!config.client_config.archive,
ChainConfig {
save_trie_changes: !config.client_config.archive,
background_migration_threads: 1,
},
)
.unwrap();

Expand Down

0 comments on commit 0040e92

Please sign in to comment.