Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Fix write stalls in RocksDB (for real this time) #292

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ pub struct OptionalENConfig {
/// The default value is 128 MiB.
#[serde(default = "OptionalENConfig::default_merkle_tree_block_cache_size_mb")]
merkle_tree_block_cache_size_mb: usize,

/// Byte capacity of memtables (recent, non-persisted changes to RocksDB). Setting this to a reasonably
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")]
merkle_tree_memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
#[serde(default = "OptionalENConfig::default_merkle_tree_stalled_writes_timeout_sec")]
merkle_tree_stalled_writes_timeout_sec: u64,

// Other config settings
/// Port on which the Prometheus exporter server is listening.
Expand Down Expand Up @@ -283,6 +285,10 @@ impl OptionalENConfig {
256
}

const fn default_merkle_tree_stalled_writes_timeout_sec() -> u64 {
30
}

const fn default_fee_history_limit() -> u64 {
1_024
}
Expand Down Expand Up @@ -332,6 +338,11 @@ impl OptionalENConfig {
self.merkle_tree_memtable_capacity_mb * BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub fn merkle_tree_stalled_writes_timeout(&self) -> Duration {
Duration::from_secs(self.merkle_tree_stalled_writes_timeout_sec)
}

pub fn api_namespaces(&self) -> Vec<Namespace> {
self.api_namespaces
.clone()
Expand Down
1 change: 1 addition & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async fn init_tasks(
multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size,
block_cache_capacity: config.optional.merkle_tree_block_cache_size(),
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
})
.await;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));
Expand Down
21 changes: 21 additions & 0 deletions core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct MerkleTreeConfig {
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "MerkleTreeConfig::default_memtable_capacity_mb")]
pub memtable_capacity_mb: usize,
/// Timeout to wait for the Merkle tree database to run compaction on stalled writes.
#[serde(default = "MerkleTreeConfig::default_stalled_writes_timeout_sec")]
pub stalled_writes_timeout_sec: u64,
/// Maximum number of L1 batches to be processed by the Merkle tree at a time.
#[serde(default = "MerkleTreeConfig::default_max_l1_batches_per_iter")]
pub max_l1_batches_per_iter: usize,
Expand All @@ -56,6 +59,7 @@ impl Default for MerkleTreeConfig {
multi_get_chunk_size: Self::default_multi_get_chunk_size(),
block_cache_size_mb: Self::default_block_cache_size_mb(),
memtable_capacity_mb: Self::default_memtable_capacity_mb(),
stalled_writes_timeout_sec: Self::default_stalled_writes_timeout_sec(),
max_l1_batches_per_iter: Self::default_max_l1_batches_per_iter(),
}
}
Expand All @@ -82,6 +86,10 @@ impl MerkleTreeConfig {
256
}

const fn default_stalled_writes_timeout_sec() -> u64 {
30
}

const fn default_max_l1_batches_per_iter() -> usize {
20
}
Expand All @@ -95,6 +103,11 @@ impl MerkleTreeConfig {
pub fn memtable_capacity(&self) -> usize {
self.memtable_capacity_mb * super::BYTES_IN_MEGABYTE
}

/// Returns the timeout to wait for the Merkle tree database to run compaction on stalled writes.
pub fn stalled_writes_timeout(&self) -> Duration {
Duration::from_secs(self.stalled_writes_timeout_sec)
}
}

/// Database configuration.
Expand Down Expand Up @@ -165,6 +178,8 @@ mod tests {
DATABASE_MERKLE_TREE_PATH="/db/tree"
DATABASE_MERKLE_TREE_MODE=lightweight
DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE=250
DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB=512
DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC=60
DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50
DATABASE_BACKUP_COUNT=5
DATABASE_BACKUP_INTERVAL_MS=60000
Expand All @@ -178,6 +193,8 @@ mod tests {
assert_eq!(db_config.merkle_tree.mode, MerkleTreeMode::Lightweight);
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 250);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 512);
assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 60);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);
}
Expand All @@ -192,6 +209,8 @@ mod tests {
"DATABASE_MERKLE_TREE_MODE",
"DATABASE_MERKLE_TREE_MULTI_GET_CHUNK_SIZE",
"DATABASE_MERKLE_TREE_BLOCK_CACHE_SIZE_MB",
"DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB",
"DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC",
"DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER",
"DATABASE_BACKUP_COUNT",
"DATABASE_BACKUP_INTERVAL_MS",
Expand All @@ -205,6 +224,8 @@ mod tests {
assert_eq!(db_config.merkle_tree.multi_get_chunk_size, 500);
assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 20);
assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128);
assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 256);
assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 30);
assert_eq!(db_config.backup_count, 5);
assert_eq!(db_config.backup_interval().as_secs(), 60);

Expand Down
97 changes: 67 additions & 30 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use rocksdb::{
use std::{
collections::{HashMap, HashSet},
ffi::CStr,
fmt,
fmt, iter,
marker::PhantomData,
ops,
path::Path,
sync::{Arc, Condvar, Mutex},
thread,
time::Duration,
time::{Duration, Instant},
};

use crate::metrics::{RocksdbLabels, RocksdbSizeMetrics, METRICS};
Expand Down Expand Up @@ -176,11 +176,8 @@ impl RocksDBInner {
/// Waits until writes are not stopped for any of the CFs. Writes can stop immediately on DB initialization
/// if there are too many level-0 SST files; in this case, it may help waiting several seconds until
/// these files are compacted.
fn wait_for_writes_to_resume(&self) {
const RETRY_COUNT: usize = 10;
const RETRY_INTERVAL: Duration = Duration::from_secs(1);

for retry in 0..RETRY_COUNT {
fn wait_for_writes_to_resume(&self, retries: &StalledWritesRetries) {
for (retry_idx, retry_interval) in retries.intervals().enumerate() {
let cfs_with_stopped_writes = self.cf_names.iter().copied().filter(|cf_name| {
let cf = self.db.cf_handle(cf_name).unwrap();
// ^ `unwrap()` is safe (CF existence is checked during DB initialization)
Expand All @@ -192,44 +189,58 @@ impl RocksDBInner {
} else {
tracing::info!(
"Writes are stopped for column families {cfs_with_stopped_writes:?} in DB `{}` \
(retry: {retry}/{RETRY_COUNT})",
(retry #{retry_idx})",
self.db_name
);
thread::sleep(RETRY_INTERVAL);
thread::sleep(retry_interval);
}
}

tracing::warn!(
"Exceeded {RETRY_COUNT} retries waiting for writes to resume in DB `{}`; \
proceeding with stopped writes",
"Exceeded retries waiting for writes to resume in DB `{}`; proceeding with stopped writes",
self.db_name
);
}
}

/// Configuration for retries when RocksDB writes are stalled.
#[derive(Debug, Clone, Copy)]
struct StalledWritesRetries {
pub struct StalledWritesRetries {
max_batch_size: usize,
retry_count: usize,
timeout: Duration,
start_interval: Duration,
max_interval: Duration,
scale_factor: f64,
}

impl Default for StalledWritesRetries {
fn default() -> Self {
impl StalledWritesRetries {
/// Creates retries configuration with the specified timeout.
pub fn new(timeout: Duration) -> Self {
Self {
max_batch_size: 128 << 20, // 128 MiB
retry_count: 10,
timeout,
start_interval: Duration::from_millis(50),
max_interval: Duration::from_secs(2),
scale_factor: 1.5,
}
}
}

impl StalledWritesRetries {
fn interval(&self, retry_index: usize) -> Duration {
self.start_interval
.mul_f64(self.scale_factor.powi(retry_index as i32))
fn intervals(&self) -> impl Iterator<Item = Duration> {
let &Self {
timeout,
start_interval,
max_interval,
scale_factor,
..
} = self;
let started_at = Instant::now();

iter::successors(Some(start_interval), move |&prev_interval| {
Some(prev_interval.mul_f64(scale_factor).min(max_interval))
})
.take_while(move |_| started_at.elapsed() <= timeout)
}

// **NB.** The error message may change between RocksDB versions!
Expand All @@ -240,7 +251,7 @@ impl StalledWritesRetries {
}

/// [`RocksDB`] options.
#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy)]
pub struct RocksDBOptions {
/// Byte capacity of the block cache (the main RocksDB cache for reads). If not set, default RocksDB
/// cache options will be used.
Expand All @@ -250,6 +261,19 @@ pub struct RocksDBOptions {
/// Setting this to a reasonably large value (order of 512 MiB) is helpful for large DBs that experience
/// write stalls. If not set, large CFs will not be configured specially.
pub large_memtable_capacity: Option<usize>,
/// Timeout to wait for the database to run compaction on stalled writes during startup or
/// when the corresponding RocksDB error is encountered.
pub stalled_writes_retries: StalledWritesRetries,
}

impl Default for RocksDBOptions {
fn default() -> Self {
Self {
block_cache_capacity: None,
large_memtable_capacity: None,
stalled_writes_retries: StalledWritesRetries::new(Duration::from_secs(10)),
}
}
}

/// Thin wrapper around a RocksDB instance.
Expand Down Expand Up @@ -336,11 +360,11 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
path.display()
);

inner.wait_for_writes_to_resume();
inner.wait_for_writes_to_resume(&options.stalled_writes_retries);
Self {
inner,
sync_writes: false,
stalled_writes_retries: StalledWritesRetries::default(),
stalled_writes_retries: options.stalled_writes_retries,
_cf: PhantomData,
}
}
Expand Down Expand Up @@ -423,24 +447,24 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {
}

let raw_batch_bytes = raw_batch.data().to_vec();
let mut retry_count = 0;
let mut retries = self.stalled_writes_retries.intervals();
loop {
match self.write_inner(raw_batch) {
Ok(()) => return Ok(()),
Err(err) => {
let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err);
if is_stalled_write {
METRICS.report_stalled_write(CF::DB_NAME);
} else {
return Err(err);
}

if is_stalled_write && retry_count < retries.retry_count {
let retry_interval = retries.interval(retry_count);
if let Some(retry_interval) = retries.next() {
tracing::warn!(
"Writes stalled when writing to DB `{}`; will retry after {retry_interval:?}",
CF::DB_NAME
);
thread::sleep(retry_interval);
retry_count += 1;
raw_batch = rocksdb::WriteBatch::from_data(&raw_batch_bytes);
} else {
return Err(err);
Expand Down Expand Up @@ -565,10 +589,23 @@ mod tests {

#[test]
fn retry_interval_computation() {
let retries = StalledWritesRetries::default();
assert_close(retries.interval(0), Duration::from_millis(50));
assert_close(retries.interval(1), Duration::from_millis(75));
assert_close(retries.interval(2), Duration::from_micros(112_500));
let retries = StalledWritesRetries::new(Duration::from_secs(10));
let intervals: Vec<_> = retries.intervals().take(20).collect();
assert_close(intervals[0], Duration::from_millis(50));
assert_close(intervals[1], Duration::from_millis(75));
assert_close(intervals[2], Duration::from_micros(112_500));
assert_close(intervals[19], retries.max_interval);
}

#[test]
fn retries_iterator_is_finite() {
let retries = StalledWritesRetries::new(Duration::from_millis(10));
let mut retry_count = 0;
for _ in retries.intervals() {
thread::sleep(Duration::from_millis(5));
retry_count += 1;
}
assert!(retry_count <= 2);
}

fn assert_close(lhs: Duration, rhs: Duration) {
Expand Down
2 changes: 1 addition & 1 deletion core/lib/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod db;
mod metrics;

pub use db::{RocksDB, RocksDBOptions};
pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries};
pub use rocksdb;
18 changes: 14 additions & 4 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zksync_merkle_tree::{
domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader},
Key, MerkleTreeColumnFamily, NoVersionError, TreeEntryWithProof,
};
use zksync_storage::{RocksDB, RocksDBOptions};
use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries};
use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageLog, H256};

use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS};
Expand Down Expand Up @@ -62,15 +62,22 @@ impl AsyncTree {
multi_get_chunk_size: usize,
block_cache_capacity: usize,
memtable_capacity: usize,
stalled_writes_timeout: Duration,
) -> Self {
tracing::info!(
"Initializing Merkle tree at `{db_path}` with {multi_get_chunk_size} multi-get chunk size, \
{block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity",
{block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity, \
{stalled_writes_timeout:?} stalled writes timeout",
db_path = db_path.display()
);

let mut tree = tokio::task::spawn_blocking(move || {
let db = Self::create_db(&db_path, block_cache_capacity, memtable_capacity);
let db = Self::create_db(
&db_path,
block_cache_capacity,
memtable_capacity,
stalled_writes_timeout,
);
match mode {
MerkleTreeMode::Full => ZkSyncTree::new(db),
MerkleTreeMode::Lightweight => ZkSyncTree::new_lightweight(db),
Expand All @@ -90,12 +97,14 @@ impl AsyncTree {
path: &Path,
block_cache_capacity: usize,
memtable_capacity: usize,
stalled_writes_timeout: Duration,
) -> RocksDB<MerkleTreeColumnFamily> {
let db = RocksDB::with_options(
path,
RocksDBOptions {
block_cache_capacity: Some(block_cache_capacity),
large_memtable_capacity: Some(memtable_capacity),
stalled_writes_retries: StalledWritesRetries::new(stalled_writes_timeout),
},
);
if cfg!(test) {
Expand Down Expand Up @@ -462,7 +471,8 @@ mod tests {
MerkleTreeMode::Full,
500,
0,
16 << 20, // 16 MiB
16 << 20, // 16 MiB,
Duration::ZERO, // writes should never be stalled in tests
)
.await
}
Expand Down
Loading