Skip to content

Commit

Permalink
Unify stalled writes timeout logic
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 23, 2023
1 parent 2152207 commit 3345c17
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 69 deletions.
16 changes: 7 additions & 9 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,9 @@ pub struct OptionalENConfig {
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")]
merkle_tree_memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
#[serde(default = "OptionalENConfig::default_merkle_tree_init_stopped_writes_timeout_sec")]
merkle_tree_init_stopped_writes_timeout_sec: u64,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
#[serde(default = "OptionalENConfig::default_merkle_tree_stalled_writes_timeout_sec")]
merkle_tree_stalled_writes_timeout_sec: u64,

// Other config settings
/// Port on which the Prometheus exporter server is listening.
Expand Down Expand Up @@ -286,7 +285,7 @@ impl OptionalENConfig {
256
}

const fn default_merkle_tree_init_stopped_writes_timeout_sec() -> u64 {
const fn default_merkle_tree_stalled_writes_timeout_sec() -> u64 {
30
}

Expand Down Expand Up @@ -339,10 +338,9 @@ impl OptionalENConfig {
self.merkle_tree_memtable_capacity_mb * BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that
/// it doesn't have stopped writes.
pub fn merkle_tree_init_stopped_writes_timeout(&self) -> Duration {
Duration::from_secs(self.merkle_tree_init_stopped_writes_timeout_sec)
/// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub fn merkle_tree_stalled_writes_timeout(&self) -> Duration {
Duration::from_secs(self.merkle_tree_stalled_writes_timeout_sec)
}

pub fn api_namespaces(&self) -> Vec<Namespace> {
Expand Down
2 changes: 1 addition & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn init_tasks(
multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size,
block_cache_capacity: config.optional.merkle_tree_block_cache_size(),
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
init_stopped_writes_timeout: config.optional.merkle_tree_init_stopped_writes_timeout(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
})
.await;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));
Expand Down
26 changes: 12 additions & 14 deletions core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ pub struct MerkleTreeConfig {
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "MerkleTreeConfig::default_memtable_capacity_mb")]
pub memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
#[serde(default = "MerkleTreeConfig::default_init_stopped_writes_timeout_sec")]
pub init_stopped_writes_timeout_sec: u64,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
#[serde(default = "MerkleTreeConfig::default_stalled_writes_timeout_sec")]
pub stalled_writes_timeout_sec: u64,
/// Maximum number of L1 batches to be processed by the Merkle tree at a time.
#[serde(default = "MerkleTreeConfig::default_max_l1_batches_per_iter")]
pub max_l1_batches_per_iter: usize,
Expand All @@ -60,7 +59,7 @@ impl Default for MerkleTreeConfig {
multi_get_chunk_size: Self::default_multi_get_chunk_size(),
block_cache_size_mb: Self::default_block_cache_size_mb(),
memtable_capacity_mb: Self::default_memtable_capacity_mb(),
init_stopped_writes_timeout_sec: Self::default_init_stopped_writes_timeout_sec(),
stalled_writes_timeout_sec: Self::default_stalled_writes_timeout_sec(),
max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(),
}
}
Expand All @@ -87,7 +86,7 @@ impl MerkleTreeConfig {
256
}

const fn default_init_stopped_writes_timeout_sec() -> u64 {
const fn default_stalled_writes_timeout_sec() -> u64 {
30
}

Expand All @@ -105,10 +104,9 @@ impl MerkleTreeConfig {
self.memtable_capacity_mb * super::BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on startup so that
/// it doesn't have stopped writes.
pub fn init_stopped_writes_timeout(&self) -> Duration {
Duration::from_secs(self.init_stopped_writes_timeout_sec)
/// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub fn stalled_writes_timeout(&self) -> Duration {
Duration::from_secs(self.stalled_writes_timeout_sec)
}
}

Expand Down Expand Up @@ -181,7 +179,7 @@ mod tests {
DATABASE_MERKLE_TREE_MODE=lightweight
DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE=250
DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB=512
DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC=60
DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC=60
DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50
DATABASE_BACKUP_COUNT=5
DATABASE_BACKUP_INTERVAL_MS=60000
Expand All @@ -196,7 +194,7 @@ mod tests {
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 250);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 512);
assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 60);
assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 60);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);
}
Expand All @@ -212,7 +210,7 @@ mod tests {
"DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE",
"DATABASE_MERKLE_TREE_BLOCK_CACHE_SIZE_MB",
"DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB",
"DATABASE_MERKLE_TREE_INIT_STOPPED_WRITES_TIMEOUT_SEC",
"DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC",
"DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER",
"DATABASE_BACKUP_COUNT",
"DATABASE_BACKUP_INTERVAL_MS",
Expand All @@ -227,7 +225,7 @@ mod tests {
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 20);
assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 256);
assert_eq!(db_config.merkle_tree.init_stopped_writes_timeout_sec, 30);
assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 30);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);

Expand Down
86 changes: 53 additions & 33 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rocksdb::{
use std::{
collections::{HashMap, HashSet},
ffi::CStr,
fmt,
fmt, iter,
marker::PhantomData,
ops,
path::Path,
Expand Down Expand Up @@ -176,12 +176,8 @@ impl RocksDBInner {
/// Waits until writes are not stopped for any of the CFs. Writes can stop immediately on DB initialization
/// if there are too many level-0 SST files; in this case, it may help waiting several seconds until
/// these files are compacted.
fn wait_for_writes_to_resume(&self, timeout: Duration) {
const RETRY_INTERVAL: Duration = Duration::from_secs(1);

let started_at = Instant::now();
let mut retry = 0;
while started_at.elapsed() < timeout {
fn wait_for_writes_to_resume(&self, retries: &StalledWritesRetries) {
for (retry_idx, retry_interval) in retries.intervals().enumerate() {
let cfs_with_stopped_writes = self.cf_names.iter().copied().filter(|cf_name| {
let cf = self.db.cf_handle(cf_name).unwrap();
// ^ `unwrap()` is safe (CF existence is checked during DB initialization)
Expand All @@ -193,11 +189,10 @@ impl RocksDBInner {
} else {
tracing::info!(
"Writes are stopped for column families {cfs_with_stopped_writes:?} in DB `{}` \
(retry: {retry})",
(retry #{retry_idx})",
self.db_name
);
thread::sleep(RETRY_INTERVAL);
retry += 1;
thread::sleep(retry_interval);
}
}

Expand All @@ -208,20 +203,22 @@ impl RocksDBInner {
}
}

/// Configuration for retries when RocksDB writes are stalled.
#[derive(Debug, Clone, Copy)]
struct StalledWritesRetries {
pub struct StalledWritesRetries {
max_batch_size: usize,
retry_count: usize,
timeout: Duration,
start_interval: Duration,
max_interval: Duration,
scale_factor: f64,
}

impl Default for StalledWritesRetries {
fn default() -> Self {
impl StalledWritesRetries {
/// Creates retries configuration with the specified timeout.
pub fn new(timeout: Duration) -> Self {
Self {
max_batch_size: 128 << 20, // 128 MiB
retry_count: 20,
timeout,
start_interval: Duration::from_millis(50),
max_interval: Duration::from_secs(2),
scale_factor: 1.5,
Expand All @@ -230,10 +227,20 @@ impl Default for StalledWritesRetries {
}

impl StalledWritesRetries {
fn interval(&self, retry_index: usize) -> Duration {
self.start_interval
.mul_f64(self.scale_factor.powi(retry_index as i32))
.min(self.max_interval)
fn intervals(&self) -> impl Iterator<Item = Duration> {
let &Self {
timeout,
start_interval,
max_interval,
scale_factor,
..
} = self;
let started_at = Instant::now();

iter::successors(Some(start_interval), move |&prev_interval| {
Some(prev_interval.mul_f64(scale_factor).min(max_interval))
})
.take_while(move |_| started_at.elapsed() <= timeout)
}

// **NB.** The error message may change between RocksDB versions!
Expand All @@ -254,16 +261,17 @@ pub struct RocksDBOptions {
/// Setting this to a reasonably large value (order of 512 MiB) is helpful for large DBs that experience
/// write stalls. If not set, large CFs will not be configured specially.
pub large_memtable_capacity: Option<usize>,
/// Timeout to wait for the database to run compaction on startup so that it doesn't have stopped writes.
pub init_stopped_writes_timeout: Duration,
/// Timeout to wait for the database to run compaction on stalled writes during startup or
/// when the corresponding RocksDB error is encountered.
pub stalled_writes_retries: StalledWritesRetries,
}

impl Default for RocksDBOptions {
fn default() -> Self {
Self {
block_cache_capacity: None,
large_memtable_capacity: None,
init_stopped_writes_timeout: Duration::from_secs(10),
stalled_writes_retries: StalledWritesRetries::new(Duration::from_secs(10)),
}
}
}
Expand Down Expand Up @@ -352,11 +360,11 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
path.display()
);

inner.wait_for_writes_to_resume(options.init_stopped_writes_timeout);
inner.wait_for_writes_to_resume(&options.stalled_writes_retries);
Self {
inner,
sync_writes: false,
stalled_writes_retries: StalledWritesRetries::default(),
stalled_writes_retries: options.stalled_writes_retries,
_cf: PhantomData,
}
}
Expand Down Expand Up @@ -439,24 +447,24 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
}

let raw_batch_bytes = raw_batch.data().to_vec();
let mut retry_count = 0;
let mut retries = self.stalled_writes_retries.intervals();
loop {
match self.write_inner(raw_batch) {
Ok(()) => return Ok(()),
Err(err) => {
let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err);
if is_stalled_write {
METRICS.report_stalled_write(CF::DB_NAME);
} else {
return Err(err);
}

if is_stalled_write && retry_count < retries.retry_count {
let retry_interval = retries.interval(retry_count);
if let Some(retry_interval) = retries.next() {
tracing::warn!(
"Writes stalled when writing to DB `{}`; will retry after {retry_interval:?}",
CF::DB_NAME
);
thread::sleep(retry_interval);
retry_count += 1;
raw_batch = rocksdb::WriteBatch::from_data(&raw_batch_bytes);
} else {
return Err(err);
Expand Down Expand Up @@ -581,11 +589,23 @@ mod tests {

#[test]
fn retry_interval_computation() {
let retries = StalledWritesRetries::default();
assert_close(retries.interval(0), Duration::from_millis(50));
assert_close(retries.interval(1), Duration::from_millis(75));
assert_close(retries.interval(2), Duration::from_micros(112_500));
assert_close(retries.interval(20), retries.max_interval);
let retries = StalledWritesRetries::new(Duration::from_secs(10));
let intervals: Vec<_> = retries.intervals().take(20).collect();
assert_close(intervals[0], Duration::from_millis(50));
assert_close(intervals[1], Duration::from_millis(75));
assert_close(intervals[2], Duration::from_micros(112_500));
assert_close(intervals[19], retries.max_interval);
}

#[test]
fn retries_iterator_is_finite() {
let retries = StalledWritesRetries::new(Duration::from_millis(10));
let mut retry_count = 0;
for _ in retries.intervals() {
thread::sleep(Duration::from_millis(5));
retry_count += 1;
}
assert!(retry_count <= 2);
}

fn assert_close(lhs: Duration, rhs: Duration) {
Expand Down
2 changes: 1 addition & 1 deletion core/lib/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod db;
mod metrics;

pub use db::{RocksDB, RocksDBOptions};
pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries};
pub use rocksdb;
13 changes: 7 additions & 6 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zksync_merkle_tree::{
domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader},
Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof,
};
use zksync_storage::{RocksDB, RocksDBOptions};
use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries};
use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256};

use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS};
Expand Down Expand Up @@ -62,11 +62,12 @@ impl AsyncTree {
multi_get_chunk_size: usize,
block_cache_capacity: usize,
memtable_capacity: usize,
init_stopped_writes_timeout: Duration,
stalled_writes_timeout: Duration,
) -> Self {
tracing::info!(
"Initializing Merkle tree at `{db_path}` with {multi_get_chunk_size} multi-get chunk size, \
{block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity",
{block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity, \
{stalled_writes_timeout:?} stalled writes timeout",
db_path = db_path.display()
);

Expand All @@ -75,7 +76,7 @@ impl AsyncTree {
&db_path,
block_cache_capacity,
memtable_capacity,
init_stopped_writes_timeout,
stalled_writes_timeout,
);
match mode {
MerkleTreeMode::Full => ZkSyncTree::new(db),
Expand All @@ -96,14 +97,14 @@ impl AsyncTree {
path: &Path,
block_cache_capacity: usize,
memtable_capacity: usize,
init_stopped_writes_timeout: Duration,
stalled_writes_timeout: Duration,
) -> RocksDB<MerkleTreeColumnFamily> {
let db = RocksDB::with_options(
path,
RocksDBOptions {
block_cache_capacity: Some(block_cache_capacity),
large_memtable_capacity: Some(memtable_capacity),
init_stopped_writes_timeout,
stalled_writes_retries: StalledWritesRetries::new(stalled_writes_timeout),
},
);
if cfg!(test) {
Expand Down
7 changes: 3 additions & 4 deletions core/lib/zksync_core/src/metadata_calculator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ pub struct MetadataCalculatorConfig<'a> {
/// Capacity of RocksDB memtables. Can be set to a reasonably large value (order of 512 MiB)
/// to mitigate write stalls.
pub memtable_capacity: usize,
/// Timeout to wait for the Merkle tree database to run compaction on startup so that it doesn't have
/// stopped writes.
pub init_stopped_writes_timeout: Duration,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub stalled_writes_timeout: Duration,
}

impl<'a> MetadataCalculatorConfig<'a> {
Expand All @@ -94,7 +93,7 @@ impl<'a> MetadataCalculatorConfig<'a> {
multi_get_chunk_size: db_config.merkle_tree.multi_get_chunk_size,
block_cache_capacity: db_config.merkle_tree.block_cache_size(),
memtable_capacity: db_config.merkle_tree.memtable_capacity(),
init_stopped_writes_timeout: db_config.merkle_tree.init_stopped_writes_timeout(),
stalled_writes_timeout: db_config.merkle_tree.stalled_writes_timeout(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/metadata_calculator/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl TreeUpdater {
config.multi_get_chunk_size,
config.block_cache_capacity,
config.memtable_capacity,
config.init_stopped_writes_timeout,
config.stalled_writes_timeout,
)
.await;
Self {
Expand Down

0 comments on commit 3345c17

Please sign in to comment.