Skip to content

Commit

Permalink
refactor(storage): refactor compact iter recreate stream (#15919)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Mar 27, 2024
1 parent 97a2e17 commit 5726cf8
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 27 deletions.
9 changes: 7 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ pub struct StorageConfig {

#[serde(default = "default::storage::compactor_max_sst_key_count")]
pub compactor_max_sst_key_count: u64,
// DEPRECATED: This config will be deprecated in the future version, use `storage.compactor_iter_max_io_retry_times` instead.
#[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
pub compact_iter_recreate_timeout_ms: u64,
#[serde(default = "default::storage::compactor_max_sst_size")]
Expand All @@ -698,12 +699,12 @@ pub struct StorageConfig {
pub check_compaction_result: bool,
#[serde(default = "default::storage::max_preload_io_retry_times")]
pub max_preload_io_retry_times: usize,

#[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
pub compactor_fast_max_compact_delete_ratio: u32,

#[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
pub compactor_fast_max_compact_task_size: u64,
#[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
pub compactor_iter_max_io_retry_times: usize,

#[serde(default, flatten)]
#[config_doc(omitted)]
Expand Down Expand Up @@ -1329,6 +1330,10 @@ pub mod default {
10 * 60 * 1000
}

pub fn compactor_iter_max_io_retry_times() -> usize {
8
}

pub fn compactor_max_sst_size() -> u64 {
512 * 1024 * 1024 // 512m
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ This page is automatically generated by `./risedev generate-example-config`
| compact_iter_recreate_timeout_ms | | 600000 |
| compactor_fast_max_compact_delete_ratio | | 40 |
| compactor_fast_max_compact_task_size | | 2147483648 |
| compactor_iter_max_io_retry_times | | 8 |
| compactor_max_sst_key_count | | 2097152 |
| compactor_max_sst_size | | 536870912 |
| compactor_max_task_multiplier | Compactor calculates the maximum number of tasks that can be executed on the node based on `worker_num` and `compactor_max_task_multiplier`. `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier` | 2.5 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ check_compaction_result = false
max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
compactor_iter_max_io_retry_times = 8
mem_table_spill_threshold = 4194304

[storage.cache.block_cache_eviction]
Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ pub async fn check_compaction_result(

let mut table_iters = Vec::new();
let mut del_iter = ForwardMergeRangeIterator::default();
let compact_io_retry_time = context.storage_opts.compact_iter_recreate_timeout_ms;
for level in &compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
Expand All @@ -365,7 +364,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
));
} else {
let mut stats = StoreLocalStatistic::default();
Expand All @@ -381,7 +380,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
));
}
}
Expand All @@ -401,7 +400,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
);
let right_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
Expand Down
16 changes: 8 additions & 8 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ impl CompactorRunner {
task_progress: Arc<TaskProgress>,
) -> HummockResult<impl HummockIterator<Direction = Forward>> {
let mut table_iters = Vec::new();
let compact_io_retry_time = self
.compactor
.context
.storage_opts
.compact_iter_recreate_timeout_ms;

for level in &self.compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
Expand Down Expand Up @@ -189,7 +183,10 @@ impl CompactorRunner {
self.compactor.task_config.key_range.clone(),
self.sstable_store.clone(),
task_progress.clone(),
compact_io_retry_time,
self.compactor
.context
.storage_opts
.compactor_iter_max_io_retry_times,
));
} else {
for table_info in &level.table_infos {
Expand All @@ -209,7 +206,10 @@ impl CompactorRunner {
self.compactor.task_config.key_range.clone(),
self.sstable_store.clone(),
task_progress.clone(),
compact_io_retry_time,
self.compactor
.context
.storage_opts
.compactor_iter_max_io_retry_times,
));
}
}
Expand Down
28 changes: 15 additions & 13 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub struct SstableStreamIterator {
sstable_info: SstableInfo,
existing_table_ids: HashSet<StateTableId>,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
create_time: Instant,
io_retry_times: usize,
max_io_retry_times: usize,
}

impl SstableStreamIterator {
Expand All @@ -82,7 +82,7 @@ impl SstableStreamIterator {
stats: &StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
sstable_store: SstableStoreRef,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
) -> Self {
Self {
block_stream: None,
Expand All @@ -92,10 +92,10 @@ impl SstableStreamIterator {
stats_ptr: stats.remote_io_time.clone(),
existing_table_ids,
sstable_info,
create_time: Instant::now(),
sstable_store,
task_progress,
io_retry_timeout_ms,
io_retry_times: 0,
max_io_retry_times,
}
}

Expand Down Expand Up @@ -178,13 +178,11 @@ impl SstableStreamIterator {
}
Ok(None) => break,
Err(e) => {
if !e.is_object_error()
|| self.create_time.elapsed().as_millis() as u64
> self.io_retry_timeout_ms
{
if !e.is_object_error() || !self.need_recreate_io_stream() {
return Err(e);
}
self.block_stream.take();
self.io_retry_times += 1;
fail_point!("create_stream_err");
}
}
Expand Down Expand Up @@ -250,6 +248,10 @@ impl SstableStreamIterator {
self.sstable_info.table_ids
)
}

fn need_recreate_io_stream(&self) -> bool {
self.io_retry_times < self.max_io_retry_times
}
}

impl Drop for SstableStreamIterator {
Expand Down Expand Up @@ -280,7 +282,7 @@ pub struct ConcatSstableIterator {

stats: StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
}

impl ConcatSstableIterator {
Expand All @@ -293,7 +295,7 @@ impl ConcatSstableIterator {
key_range: KeyRange,
sstable_store: SstableStoreRef,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
) -> Self {
Self {
key_range,
Expand All @@ -304,7 +306,7 @@ impl ConcatSstableIterator {
sstable_store,
task_progress,
stats: StoreLocalStatistic::default(),
io_retry_timeout_ms,
max_io_retry_times,
}
}

Expand Down Expand Up @@ -405,7 +407,7 @@ impl ConcatSstableIterator {
&self.stats,
self.task_progress.clone(),
self.sstable_store.clone(),
self.io_retry_timeout_ms,
self.max_io_retry_times,
);
sstable_iter.seek(seek_key).await?;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct StorageOpts {
/// Capacity of sstable meta cache.
pub compactor_memory_limit_mb: usize,
/// compactor streaming iterator recreate timeout.
/// deprecated
pub compact_iter_recreate_timeout_ms: u64,
/// Number of SST ids fetched from meta per RPC
pub sstable_id_remote_fetch_number: u32,
Expand All @@ -77,6 +78,7 @@ pub struct StorageOpts {
pub max_sub_compaction: u32,
pub max_concurrent_compaction_task_number: u64,
pub max_version_pinning_duration_sec: u64,
pub compactor_iter_max_io_retry_times: usize,

pub data_file_cache_dir: String,
pub data_file_cache_capacity_mb: usize,
Expand Down Expand Up @@ -269,6 +271,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
.storage
.compactor_fast_max_compact_delete_ratio,
compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size,
compactor_iter_max_io_retry_times: c.storage.compactor_iter_max_io_retry_times,
}
}
}

0 comments on commit 5726cf8

Please sign in to comment.