Skip to content

Commit

Permalink
feat: State sync from local filesystem (#8913)
Browse files Browse the repository at this point in the history
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump
  • Loading branch information
nikurt committed Apr 25, 2023
1 parent c271196 commit 4cd60f2
Show file tree
Hide file tree
Showing 23 changed files with 904 additions and 477 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

### Non-protocol Changes

* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)
* The contract runtime switched to using our fork of wasmer, with various improvements.
* undo-block tool to reset the chain head from current head to its prev block. Use the tool by running: `./target/release/neard --home {path_to_config_directory} undo-block`. [#8681](https://github.com/near/nearcore/pull/8681)
* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)
* Node can sync State from local filesystem. [#8789](https://github.com/near/nearcore/pull/8789)
* Add per shard granularity for chunks in validator info metric. [#8934](https://github.com/near/nearcore/pull/8934)
* Add prometheus metrics for expected number of blocks/chunks at the end of the epoch. [#8759](https://github.com/near/nearcore/pull/8759)

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub enum AccountOrPeerIdOrHash {
AccountId(AccountId),
PeerId(PeerId),
Hash(CryptoHash),
ExternalStorage,
}

#[derive(Debug, serde::Serialize)]
Expand All @@ -63,7 +62,8 @@ pub struct DownloadStatus {
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
// Use type `String` as an error to avoid a dependency on the `rust-s3` or `anyhow` crates.
pub response: Arc<Mutex<Option<Result<Vec<u8>, String>>>>,
}

impl DownloadStatus {
Expand Down
22 changes: 12 additions & 10 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
actix-rt.workspace = true
actix.workspace = true
ansi_term.workspace = true
anyhow.workspace = true
async-trait.workspace = true
borsh.workspace = true
chrono.workspace = true
Expand All @@ -20,6 +21,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
regex.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
Expand All @@ -28,24 +30,24 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

delay-detector.workspace = true
near-async.workspace = true
near-chain-primitives.workspace = true
near-crypto.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-chain.workspace = true
near-chunks.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-pool.workspace = true
near-chunks.workspace = true
near-telemetry.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
near-epoch-manager.workspace = true
delay-detector.workspace = true
near-performance-metrics.workspace = true
near-pool.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-telemetry.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ impl Client {
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
&config.state_sync.sync,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
Expand Down Expand Up @@ -2131,10 +2128,7 @@ impl Client {
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
&self.config.state_sync.sync,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
Expand Down
6 changes: 5 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,11 @@ impl Handler<WithSpanContext<Status>> for ClientActor {
sync_status: format!(
"{} ({})",
self.client.sync_status.as_variant_name().to_string(),
display_sync_status(&self.client.sync_status, &self.client.chain.head()?,),
display_sync_status(
&self.client.sync_status,
&self.client.chain.head()?,
&self.client.config.state_sync.sync,
),
),
catchup_status: self.client.get_catchup_status()?,
current_head_status: head.clone().into(),
Expand Down
32 changes: 19 additions & 13 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config_updater::ConfigUpdater;
use crate::{metrics, SyncStatus};
use actix::Addr;
use itertools::Itertools;
use near_chain_configs::{ClientConfig, LogSummaryStyle};
use near_chain_configs::{ClientConfig, LogSummaryStyle, SyncConfig};
use near_network::types::NetworkInfo;
use near_primitives::block::Tip;
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -343,10 +343,9 @@ impl InfoHelper {

let s = |num| if num == 1 { "" } else { "s" };

let sync_status_log = Some(display_sync_status(sync_status, head));

let sync_status_log =
Some(display_sync_status(sync_status, head, &client_config.state_sync.sync));
let catchup_status_log = display_catchup_status(catchup_status);

let validator_info_log = validator_info.as_ref().map(|info| {
format!(
" {}{} validator{}",
Expand Down Expand Up @@ -564,7 +563,11 @@ pub fn display_catchup_status(catchup_status: Vec<CatchupStatusView>) -> String
.join("\n")
}

pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
pub fn display_sync_status(
sync_status: &SyncStatus,
head: &Tip,
state_sync_config: &SyncConfig,
) -> String {
metrics::SYNC_STATUS.set(sync_status.repr() as i64);
match sync_status {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
Expand Down Expand Up @@ -609,14 +612,17 @@ pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
for (shard_id, shard_status) in shard_statuses {
write!(res, "[{}: {}]", shard_id, shard_status.status.to_string(),).unwrap();
}
// TODO #8719
tracing::warn!(target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
if matches!(state_sync_config, SyncConfig::Peers) {
// TODO #8719
tracing::warn!(
target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
}
res
}
SyncStatus::StateSyncDone => "State sync done".to_string(),
Expand Down
115 changes: 55 additions & 60 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy<IntGauge> = Lazy::new
.unwrap()
});

pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub(crate) static PRODUCE_CHUNK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_produce_chunk_time",
"Time taken to produce a chunk",
Expand All @@ -351,7 +351,7 @@ pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::ne
.unwrap()
});

pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
Expand All @@ -361,16 +361,15 @@ pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = La
.unwrap()
});

pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
"Time to produce a chunk and distribute it to peers",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
"Time to produce a chunk and distribute it to peers",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
/// Exports neard, protocol and database versions via Prometheus metrics.
///
/// Sets metrics which export node’s max supported protocol version, used
Expand All @@ -391,7 +390,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
.inc();
}

pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_STAGE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
Expand All @@ -400,7 +399,7 @@ pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|
.unwrap()
});

pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_RETRY_PART: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
Expand All @@ -409,7 +408,7 @@ pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy
.unwrap()
});

pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
Expand All @@ -418,16 +417,16 @@ pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::
.unwrap()
});

pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
"Number of parts in the shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
Expand All @@ -436,54 +435,50 @@ pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = L
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts retrieved from external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Failed retrieval attempts from external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
"Bytes downloaded from an external storage",
&["shard_id"],
)
.unwrap()
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Latency of writes to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
Loading

0 comments on commit 4cd60f2

Please sign in to comment.