Skip to content

Commit

Permalink
feat(storage): Introduce TtlReclaimSelector and refactor the trigger …
Browse files Browse the repository at this point in the history
…logic of Scheduler (#7937)

Its part of #6918

Improve and introduce TtlReclaimSelector, and introduce TtlReclaimTrigger to periodically initiate compaction against ttl for LastLevel to ensure that data can be reclaimed in a timely manner.  As more triggers are introduced, try to refactor the Scheduler's Trigger logic to ensure the maintainability of the code.
- Stream to simplify the code of the Scheduler trigger
- Replace the last_index policy with key_range to ensure that the compaction runs correctly

Approved-By: zwang28
Approved-By: Little-Wallace

Co-Authored-By: Li0k <yuli@singularity-data.com>
Co-Authored-By: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
Li0k and wangrunji0408 committed Feb 21, 2023
1 parent 0759ad6 commit 672aad5
Show file tree
Hide file tree
Showing 26 changed files with 1,214 additions and 177 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -119,6 +119,7 @@ incremental = false
[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "cc95ee3" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
10 changes: 10 additions & 0 deletions dashboard/proto/gen/hummock.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/hummock.proto
Expand Up @@ -19,6 +19,8 @@ message SstableInfo {
uint64 total_key_count = 7;
// When a SST is divided, its divide_version will increase one.
uint64 divide_version = 8;
uint64 min_epoch = 9;
uint64 max_epoch = 10;
}

enum LevelType {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Expand Up @@ -159,6 +159,10 @@ pub struct MetaConfig {
/// Schedule space_reclaim compaction for all compaction groups with this interval.
#[serde(default = "default::meta::periodic_space_reclaim_compaction_interval_sec")]
pub periodic_space_reclaim_compaction_interval_sec: u64,

/// Schedule ttl_reclaim compaction for all compaction groups with this interval.
#[serde(default = "default::meta::periodic_ttl_reclaim_compaction_interval_sec")]
pub periodic_ttl_reclaim_compaction_interval_sec: u64,
}

impl Default for MetaConfig {
Expand Down Expand Up @@ -507,6 +511,10 @@ mod default {
pub fn periodic_space_reclaim_compaction_interval_sec() -> u64 {
3600 // 60min
}

pub fn periodic_ttl_reclaim_compaction_interval_sec() -> u64 {
1800 // 30mi
}
}

pub mod server {
Expand Down
25 changes: 22 additions & 3 deletions src/meta/src/hummock/compaction/level_selector.rs
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
// (found in the LICENSE.Apache file in the root directory).
use std::sync::Arc;

use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig};
Expand Down Expand Up @@ -50,6 +51,7 @@ pub trait LevelSelector: Sync + Send {
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask>;

fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
Expand Down Expand Up @@ -246,6 +248,7 @@ impl LevelSelector for DynamicLevelSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core =
DynamicLevelSelectorCore::new(compaction_group.compaction_config.clone());
Expand Down Expand Up @@ -305,6 +308,7 @@ impl LevelSelector for ManualCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
Expand Down Expand Up @@ -360,6 +364,7 @@ impl LevelSelector for SpaceReclaimCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let mut picker = SpaceReclaimCompactionPicker::new(
Expand Down Expand Up @@ -404,11 +409,14 @@ impl LevelSelector for TtlCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let picker =
TtlReclaimCompactionPicker::new(group.compaction_config.max_space_reclaim_bytes);
let picker = TtlReclaimCompactionPicker::new(
group.compaction_config.max_space_reclaim_bytes,
table_id_to_options,
);
let state = self
.state
.entry(group.group_id)
Expand Down Expand Up @@ -506,16 +514,21 @@ pub mod tests {
stale_key_count: 0,
total_key_count: 0,
divide_version: 0,
min_epoch: 0,
max_epoch: 0,
}
}

pub fn generate_table_with_table_ids(
#[allow(clippy::too_many_arguments)]
pub fn generate_table_with_ids_and_epochs(
id: u64,
table_prefix: u64,
left: usize,
right: usize,
epoch: u64,
table_ids: Vec<u32>,
min_epoch: u64,
max_epoch: u64,
) -> SstableInfo {
SstableInfo {
id,
Expand All @@ -530,6 +543,8 @@ pub mod tests {
stale_key_count: 0,
total_key_count: 0,
divide_version: 0,
min_epoch,
max_epoch,
}
}

Expand Down Expand Up @@ -726,6 +741,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
// trivial move.
Expand Down Expand Up @@ -753,6 +769,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&compaction, &levels_handlers);
Expand All @@ -771,6 +788,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&compaction, &levels_handlers);
Expand All @@ -791,6 +809,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
);
assert!(compaction.is_none());
}
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Expand Up @@ -15,6 +15,7 @@
pub mod compaction_config;
mod level_selector;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};

Expand Down Expand Up @@ -121,12 +122,19 @@ impl CompactStatus {
group: &CompactionGroup,
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn LevelSelector>,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
let ret =
selector.pick_compaction(task_id, group, levels, &mut self.level_handlers, stats)?;
let ret = selector.pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
)?;
let target_level_id = ret.input.target_level;

let compression_algorithm = match ret.compression_algorithm.as_str() {
Expand Down
Expand Up @@ -319,7 +319,7 @@ impl CompactionPicker for ManualCompactionPicker {

#[cfg(test)]
pub mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use risingwave_pb::hummock::compact_task;
pub use risingwave_pb::hummock::{KeyRange, Level, LevelType};
Expand Down Expand Up @@ -1196,6 +1196,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1231,6 +1232,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1303,6 +1305,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1340,6 +1343,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down

0 comments on commit 672aad5

Please sign in to comment.