Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): refactor compact iter recreate stream #15919

Merged
merged 6 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ pub struct StorageConfig {

#[serde(default = "default::storage::compactor_max_sst_key_count")]
pub compactor_max_sst_key_count: u64,
// DEPRECATED: This config will be deprecated in the future version, use `storage.compactor_iter_max_io_retry_times` instead.
#[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
pub compact_iter_recreate_timeout_ms: u64,
#[serde(default = "default::storage::compactor_max_sst_size")]
Expand All @@ -694,12 +695,12 @@ pub struct StorageConfig {
pub check_compaction_result: bool,
#[serde(default = "default::storage::max_preload_io_retry_times")]
pub max_preload_io_retry_times: usize,

#[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
pub compactor_fast_max_compact_delete_ratio: u32,

#[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
pub compactor_fast_max_compact_task_size: u64,
#[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
pub compactor_iter_max_io_retry_times: usize,

#[serde(default, flatten)]
#[config_doc(omitted)]
Expand Down Expand Up @@ -1325,6 +1326,10 @@ pub mod default {
10 * 60 * 1000
}

pub fn compactor_iter_max_io_retry_times() -> usize {
8
}

pub fn compactor_max_sst_size() -> u64 {
512 * 1024 * 1024 // 512m
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ This page is automatically generated by `./risedev generate-example-config`
| compact_iter_recreate_timeout_ms | | 600000 |
| compactor_fast_max_compact_delete_ratio | | 40 |
| compactor_fast_max_compact_task_size | | 2147483648 |
| compactor_iter_max_io_retry_times | | 8 |
| compactor_max_sst_key_count | | 2097152 |
| compactor_max_sst_size | | 536870912 |
| compactor_max_task_multiplier | Compactor calculates the maximum number of tasks that can be executed on the node based on `worker_num` and `compactor_max_task_multiplier`. `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier` | 2.5 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ check_compaction_result = false
max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
compactor_iter_max_io_retry_times = 8
mem_table_spill_threshold = 4194304

[storage.cache.block_cache_eviction]
Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ pub async fn check_compaction_result(

let mut table_iters = Vec::new();
let mut del_iter = ForwardMergeRangeIterator::default();
let compact_io_retry_time = context.storage_opts.compact_iter_recreate_timeout_ms;
for level in &compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
Expand All @@ -365,7 +364,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
));
} else {
let mut stats = StoreLocalStatistic::default();
Expand All @@ -381,7 +380,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
));
}
}
Expand All @@ -401,7 +400,7 @@ pub async fn check_compaction_result(
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
context.storage_opts.compactor_iter_max_io_retry_times,
);
let right_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
Expand Down
16 changes: 8 additions & 8 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ impl CompactorRunner {
task_progress: Arc<TaskProgress>,
) -> HummockResult<impl HummockIterator<Direction = Forward>> {
let mut table_iters = Vec::new();
let compact_io_retry_time = self
.compactor
.context
.storage_opts
.compact_iter_recreate_timeout_ms;

for level in &self.compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
Expand Down Expand Up @@ -189,7 +183,10 @@ impl CompactorRunner {
self.compactor.task_config.key_range.clone(),
self.sstable_store.clone(),
task_progress.clone(),
compact_io_retry_time,
self.compactor
.context
.storage_opts
.compactor_iter_max_io_retry_times,
));
} else {
for table_info in &level.table_infos {
Expand All @@ -209,7 +206,10 @@ impl CompactorRunner {
self.compactor.task_config.key_range.clone(),
self.sstable_store.clone(),
task_progress.clone(),
compact_io_retry_time,
self.compactor
.context
.storage_opts
.compactor_iter_max_io_retry_times,
));
}
}
Expand Down
28 changes: 15 additions & 13 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub struct SstableStreamIterator {
sstable_info: SstableInfo,
existing_table_ids: HashSet<StateTableId>,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
create_time: Instant,
io_retry_times: usize,
max_io_retry_times: usize,
}

impl SstableStreamIterator {
Expand All @@ -82,7 +82,7 @@ impl SstableStreamIterator {
stats: &StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
sstable_store: SstableStoreRef,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
) -> Self {
Self {
block_stream: None,
Expand All @@ -92,10 +92,10 @@ impl SstableStreamIterator {
stats_ptr: stats.remote_io_time.clone(),
existing_table_ids,
sstable_info,
create_time: Instant::now(),
sstable_store,
task_progress,
io_retry_timeout_ms,
io_retry_times: 0,
max_io_retry_times,
}
}

Expand Down Expand Up @@ -178,13 +178,11 @@ impl SstableStreamIterator {
}
Ok(None) => break,
Err(e) => {
if !e.is_object_error()
|| self.create_time.elapsed().as_millis() as u64
> self.io_retry_timeout_ms
{
if !e.is_object_error() || !self.need_recreate_io_stream() {
return Err(e);
}
self.block_stream.take();
self.io_retry_times += 1;
fail_point!("create_stream_err");
}
}
Expand Down Expand Up @@ -250,6 +248,10 @@ impl SstableStreamIterator {
self.sstable_info.table_ids
)
}

fn need_recreate_io_stream(&self) -> bool {
self.io_retry_times < self.max_io_retry_times
}
}

impl Drop for SstableStreamIterator {
Expand Down Expand Up @@ -280,7 +282,7 @@ pub struct ConcatSstableIterator {

stats: StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
}

impl ConcatSstableIterator {
Expand All @@ -293,7 +295,7 @@ impl ConcatSstableIterator {
key_range: KeyRange,
sstable_store: SstableStoreRef,
task_progress: Arc<TaskProgress>,
io_retry_timeout_ms: u64,
max_io_retry_times: usize,
) -> Self {
Self {
key_range,
Expand All @@ -304,7 +306,7 @@ impl ConcatSstableIterator {
sstable_store,
task_progress,
stats: StoreLocalStatistic::default(),
io_retry_timeout_ms,
max_io_retry_times,
}
}

Expand Down Expand Up @@ -405,7 +407,7 @@ impl ConcatSstableIterator {
&self.stats,
self.task_progress.clone(),
self.sstable_store.clone(),
self.io_retry_timeout_ms,
self.max_io_retry_times,
);
sstable_iter.seek(seek_key).await?;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct StorageOpts {
/// Capacity of sstable meta cache.
pub compactor_memory_limit_mb: usize,
/// compactor streaming iterator recreate timeout.
/// deprecated
pub compact_iter_recreate_timeout_ms: u64,
/// Number of SST ids fetched from meta per RPC
pub sstable_id_remote_fetch_number: u32,
Expand All @@ -77,6 +78,7 @@ pub struct StorageOpts {
pub max_sub_compaction: u32,
pub max_concurrent_compaction_task_number: u64,
pub max_version_pinning_duration_sec: u64,
pub compactor_iter_max_io_retry_times: usize,

pub data_file_cache_dir: String,
pub data_file_cache_capacity_mb: usize,
Expand Down Expand Up @@ -269,6 +271,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
.storage
.compactor_fast_max_compact_delete_ratio,
compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size,
compactor_iter_max_io_retry_times: c.storage.compactor_iter_max_io_retry_times,
}
}
}
Loading