diff --git a/Cargo.lock b/Cargo.lock index 5b87392d090e..fd5a01346334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7612,12 +7612,11 @@ dependencies = [ [[package]] name = "tokio-stream" version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +source = "git+https://github.com/madsim-rs/tokio.git?rev=0c25710#0c25710a9d4e415a47820a4bdff11bd30e377572" dependencies = [ "futures-core", + "madsim-tokio", "pin-project-lite", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7fb3e579492f..a033e61dab5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ incremental = false [patch.crates-io] quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "cc95ee3" } +tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" } tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } diff --git a/dashboard/proto/gen/hummock.ts b/dashboard/proto/gen/hummock.ts index 2deaff7c6e9a..d63dc2e133f7 100644 --- a/dashboard/proto/gen/hummock.ts +++ b/dashboard/proto/gen/hummock.ts @@ -56,6 +56,8 @@ export interface SstableInfo { totalKeyCount: number; /** When a SST is divided, its divide_version will increase one. */ divideVersion: number; + minEpoch: number; + maxEpoch: number; } export interface OverlappingLevel { @@ -840,6 +842,8 @@ function createBaseSstableInfo(): SstableInfo { staleKeyCount: 0, totalKeyCount: 0, divideVersion: 0, + minEpoch: 0, + maxEpoch: 0, }; } @@ -854,6 +858,8 @@ export const SstableInfo = { staleKeyCount: isSet(object.staleKeyCount) ? Number(object.staleKeyCount) : 0, totalKeyCount: isSet(object.totalKeyCount) ? Number(object.totalKeyCount) : 0, divideVersion: isSet(object.divideVersion) ? Number(object.divideVersion) : 0, + minEpoch: isSet(object.minEpoch) ? Number(object.minEpoch) : 0, + maxEpoch: isSet(object.maxEpoch) ? Number(object.maxEpoch) : 0, }; }, @@ -871,6 +877,8 @@ export const SstableInfo = { message.staleKeyCount !== undefined && (obj.staleKeyCount = Math.round(message.staleKeyCount)); message.totalKeyCount !== undefined && (obj.totalKeyCount = Math.round(message.totalKeyCount)); message.divideVersion !== undefined && (obj.divideVersion = Math.round(message.divideVersion)); + message.minEpoch !== undefined && (obj.minEpoch = Math.round(message.minEpoch)); + message.maxEpoch !== undefined && (obj.maxEpoch = Math.round(message.maxEpoch)); return obj; }, @@ -886,6 +894,8 @@ export const SstableInfo = { message.staleKeyCount = object.staleKeyCount ?? 0; message.totalKeyCount = object.totalKeyCount ?? 0; message.divideVersion = object.divideVersion ?? 0; + message.minEpoch = object.minEpoch ?? 0; + message.maxEpoch = object.maxEpoch ?? 0; return message; }, }; diff --git a/proto/hummock.proto b/proto/hummock.proto index e7f84b6ac727..bbc2af71f421 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -19,6 +19,8 @@ message SstableInfo { uint64 total_key_count = 7; // When a SST is divided, its divide_version will increase one. uint64 divide_version = 8; + uint64 min_epoch = 9; + uint64 max_epoch = 10; } enum LevelType { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ff434c809e50..3535b3e07488 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -159,6 +159,10 @@ pub struct MetaConfig { /// Schedule space_reclaim compaction for all compaction groups with this interval. #[serde(default = "default::meta::periodic_space_reclaim_compaction_interval_sec")] pub periodic_space_reclaim_compaction_interval_sec: u64, + + /// Schedule ttl_reclaim compaction for all compaction groups with this interval. + #[serde(default = "default::meta::periodic_ttl_reclaim_compaction_interval_sec")] + pub periodic_ttl_reclaim_compaction_interval_sec: u64, } impl Default for MetaConfig { @@ -507,6 +511,10 @@ mod default { pub fn periodic_space_reclaim_compaction_interval_sec() -> u64 { 3600 // 60min } + + pub fn periodic_ttl_reclaim_compaction_interval_sec() -> u64 { + 1800 // 30mi + } } pub mod server { diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 71885539f015..2f423dacd7be 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; // (found in the LICENSE.Apache file in the root directory). use std::sync::Arc; +use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig}; @@ -50,6 +51,7 @@ pub trait LevelSelector: Sync + Send { levels: &Levels, level_handlers: &mut [LevelHandler], selector_stats: &mut LocalSelectorStatistic, + table_id_to_options: HashMap, ) -> Option; fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {} @@ -246,6 +248,7 @@ impl LevelSelector for DynamicLevelSelector { levels: &Levels, level_handlers: &mut [LevelHandler], selector_stats: &mut LocalSelectorStatistic, + _table_id_to_options: HashMap, ) -> Option { let dynamic_level_core = DynamicLevelSelectorCore::new(compaction_group.compaction_config.clone()); @@ -305,6 +308,7 @@ impl LevelSelector for ManualCompactionSelector { levels: &Levels, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, + _table_id_to_options: HashMap, ) -> Option { let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone()); let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode()); @@ -360,6 +364,7 @@ impl LevelSelector for SpaceReclaimCompactionSelector { levels: &Levels, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, + _table_id_to_options: HashMap, ) -> Option { let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone()); let mut picker = SpaceReclaimCompactionPicker::new( @@ -404,11 +409,14 @@ impl LevelSelector for TtlCompactionSelector { levels: &Levels, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, + table_id_to_options: HashMap, ) -> Option { let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone()); let ctx = dynamic_level_core.calculate_level_base_size(levels); - let picker = - TtlReclaimCompactionPicker::new(group.compaction_config.max_space_reclaim_bytes); + let picker = TtlReclaimCompactionPicker::new( + group.compaction_config.max_space_reclaim_bytes, + table_id_to_options, + ); let state = self .state .entry(group.group_id) @@ -506,16 +514,21 @@ pub mod tests { stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch: 0, + max_epoch: 0, } } - pub fn generate_table_with_table_ids( + #[allow(clippy::too_many_arguments)] + pub fn generate_table_with_ids_and_epochs( id: u64, table_prefix: u64, left: usize, right: usize, epoch: u64, table_ids: Vec, + min_epoch: u64, + max_epoch: u64, ) -> SstableInfo { SstableInfo { id, @@ -530,6 +543,8 @@ pub mod tests { stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch, + max_epoch, } } @@ -726,6 +741,7 @@ pub mod tests { &levels, &mut levels_handlers, &mut local_stats, + HashMap::default(), ) .unwrap(); // trivial move. @@ -753,6 +769,7 @@ pub mod tests { &levels, &mut levels_handlers, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&compaction, &levels_handlers); @@ -771,6 +788,7 @@ pub mod tests { &levels, &mut levels_handlers, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&compaction, &levels_handlers); @@ -791,6 +809,7 @@ pub mod tests { &levels, &mut levels_handlers, &mut local_stats, + HashMap::default(), ); assert!(compaction.is_none()); } diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 2c015e44760a..4b90d0527606 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -15,6 +15,7 @@ pub mod compaction_config; mod level_selector; mod overlap_strategy; +use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::compact_task::{self, TaskStatus}; @@ -121,12 +122,19 @@ impl CompactStatus { group: &CompactionGroup, stats: &mut LocalSelectorStatistic, selector: &mut Box, + table_id_to_options: HashMap, ) -> Option { // When we compact the files, we must make the result of compaction meet the following // conditions, for any user key, the epoch of it in the file existing in the lower // layer must be larger. - let ret = - selector.pick_compaction(task_id, group, levels, &mut self.level_handlers, stats)?; + let ret = selector.pick_compaction( + task_id, + group, + levels, + &mut self.level_handlers, + stats, + table_id_to_options, + )?; let target_level_id = ret.input.target_level; let compression_algorithm = match ret.compression_algorithm.as_str() { diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index a511dc30f6be..60b8c175c7c5 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -319,7 +319,7 @@ impl CompactionPicker for ManualCompactionPicker { #[cfg(test)] pub mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{KeyRange, Level, LevelType}; @@ -1196,6 +1196,7 @@ pub mod tests { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1231,6 +1232,7 @@ pub mod tests { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1303,6 +1305,7 @@ pub mod tests { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1340,6 +1343,7 @@ pub mod tests { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index bdb4937ce4da..a193a05065d0 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -14,21 +14,56 @@ use std::collections::HashSet; +use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_pb::hummock::hummock_version::Levels; -use risingwave_pb::hummock::{InputLevel, SstableInfo}; +use risingwave_pb::hummock::{InputLevel, KeyRange, SstableInfo}; use crate::hummock::compaction::CompactionInput; use crate::hummock::level_handler::LevelHandler; +// The execution model of SpaceReclaimCompactionPicker scans through the last level of files by +// key_range and selects the appropriate files to generate compaction pub struct SpaceReclaimCompactionPicker { // config pub max_space_reclaim_bytes: u64, + + // for filter pub all_table_ids: HashSet, } +// According to the execution model of SpaceReclaimCompactionPicker, SpaceReclaimPickerState is +// designed to record the state of each round of scanning #[derive(Default)] pub struct SpaceReclaimPickerState { - pub last_select_index: usize, + // Because of the right_exclusive, we use KeyRangeCommon to determine if the end_bounds + // overlap instead of directly comparing Vec. We don't need to use the start_bound in the + // filter, set it to -inf + + // record the end_bound that has been scanned + pub last_select_end_bound: KeyRange, + + // record the end_bound in the current round of scanning tasks + pub end_bound_in_round: KeyRange, +} + +impl SpaceReclaimPickerState { + pub fn valid(&self) -> bool { + !self.end_bound_in_round.right.is_empty() + } + + pub fn init(&mut self, key_range: KeyRange) { + self.last_select_end_bound = KeyRange { + left: vec![], + right: key_range.left.clone(), + right_exclusive: true, + }; + self.end_bound_in_round = key_range; + } + + pub fn clear(&mut self) { + self.end_bound_in_round = KeyRange::default(); + self.last_select_end_bound = KeyRange::default(); + } } impl SpaceReclaimCompactionPicker { @@ -60,16 +95,52 @@ impl SpaceReclaimCompactionPicker { let mut select_input_ssts = vec![]; let level_handler = &level_handlers[reclaimed_level.level_idx as usize]; - if state.last_select_index >= reclaimed_level.table_infos.len() { - state.last_select_index = 0; + if reclaimed_level.table_infos.is_empty() { + // no file to be picked + state.clear(); + return None; + } + + if state.valid() + && state + .last_select_end_bound + .compare_right_with(&state.end_bound_in_round.right) + == std::cmp::Ordering::Greater + { + // in round but end_key overflow + // turn to next_round + state.clear(); + return None; + } + + if !state.valid() { + // new round init key_range bound with table_infos + let first_sst = reclaimed_level.table_infos.first().unwrap(); + let last_sst = reclaimed_level.table_infos.last().unwrap(); + + let key_range_this_round = KeyRange { + left: first_sst.key_range.as_ref().unwrap().left.clone(), + right: last_sst.key_range.as_ref().unwrap().right.clone(), + right_exclusive: last_sst.key_range.as_ref().unwrap().right_exclusive, + }; + state.init(key_range_this_round); } - let start_indedx = state.last_select_index; let mut select_file_size = 0; + for sst in &reclaimed_level.table_infos { + let unmatched_sst = sst + .key_range + .as_ref() + .unwrap() + .sstable_overlap(&state.last_select_end_bound); + if unmatched_sst || (level_handler.is_pending_compact(&sst.id) || self.filter(sst)) { + if !select_input_ssts.is_empty() { + // Our goal is to pick as many complete layers of data as possible and keep the + // picked files contiguous to avoid overlapping key_ranges, so the strategy is + // to pick as many contiguous files as possible (at least one) + break; + } - for sst in &reclaimed_level.table_infos[start_indedx..] { - state.last_select_index += 1; - if level_handler.is_pending_compact(&sst.id) || self.filter(sst) { continue; } @@ -80,10 +151,19 @@ impl SpaceReclaimCompactionPicker { } } + // turn to next_round if select_input_ssts.is_empty() { + state.clear(); return None; } + let select_last_sst = select_input_ssts.last().unwrap(); + state.last_select_end_bound.full_key_extend(&KeyRange { + left: vec![], + right: select_last_sst.key_range.as_ref().unwrap().right.clone(), + right_exclusive: select_last_sst.key_range.as_ref().unwrap().right_exclusive, + }); + Some(CompactionInput { input_levels: vec![ InputLevel { @@ -106,6 +186,8 @@ impl SpaceReclaimCompactionPicker { #[cfg(test)] mod test { + use std::collections::HashMap; + use itertools::Itertools; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{KeyRange, Level, LevelType}; @@ -114,7 +196,7 @@ mod test { use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::level_selector::tests::{ assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level, - generate_table_with_table_ids, + generate_table_with_ids_and_epochs, }; use crate::hummock::compaction::level_selector::SpaceReclaimCompactionSelector; use crate::hummock::compaction::{LevelSelector, LocalSelectorStatistic}; @@ -131,34 +213,42 @@ mod test { let l0 = generate_l0_nonoverlapping_sublevels(vec![]); assert_eq!(l0.sub_levels.len(), 0); - let levels = vec![ + let mut levels = vec![ generate_level(1, vec![]), generate_level(2, vec![]), generate_level( 3, vec![ - generate_table_with_table_ids(0, 1, 150, 151, 1, vec![0]), - generate_table_with_table_ids(1, 1, 250, 251, 1, vec![1]), + generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0), + generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0), ], ), Level { level_idx: 4, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![ - generate_table_with_table_ids(2, 1, 0, 100, 1, vec![2]), - generate_table_with_table_ids(3, 1, 101, 200, 1, vec![3]), - generate_table_with_table_ids(4, 1, 222, 300, 1, vec![4]), - generate_table_with_table_ids(5, 1, 333, 400, 1, vec![5]), - generate_table_with_table_ids(6, 1, 444, 500, 1, vec![6]), - generate_table_with_table_ids(7, 1, 555, 600, 1, vec![7]), - generate_table_with_table_ids(8, 1, 666, 700, 1, vec![8]), - generate_table_with_table_ids(9, 1, 777, 800, 1, vec![9]), - generate_table_with_table_ids(10, 1, 888, 900, 1, vec![10]), + generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0), + generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0), + generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0), + generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0), + generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0), + generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0), + generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0), + generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0), + generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0), + generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0), ], total_file_size: 0, sub_level_id: 0, }, ]; + + { + let sst_10 = levels[3].table_infos.get_mut(8).unwrap(); + assert_eq!(10, sst_10.id); + sst_10.key_range.as_mut().unwrap().right_exclusive = true; + } + assert_eq!(levels.len(), 4); let mut levels = Levels { levels, @@ -170,6 +260,8 @@ mod test { let mut selector = SpaceReclaimCompactionSelector::default(); { + // test max_pick_files limit + // pick space reclaim let task = selector .pick_compaction( @@ -178,6 +270,7 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -209,6 +302,7 @@ mod test { } { + // test state for level_handler in &mut levels_handler { for pending_task_id in &level_handler.pending_tasks_ids() { level_handler.remove_task(*pending_task_id); @@ -223,18 +317,15 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); assert_eq!(task.input.input_levels.len(), 2); assert_eq!(task.input.input_levels[0].level_idx, 4); - // test select index, picker will select file from last_select_index - let all_file_count = levels.get_levels().last().unwrap().get_table_infos().len(); - assert_eq!( - task.input.input_levels[0].table_infos.len(), - all_file_count - 5 - ); + // test select index, picker will select file from state + assert_eq!(task.input.input_levels[0].table_infos.len(), 4,); let mut start_id = 7; for sst in &task.input.input_levels[0].table_infos { @@ -249,9 +340,49 @@ mod test { task.compaction_task_type, compact_task::TaskType::SpaceReclaim )); + + // test pick key_range right exclusive + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + HashMap::default(), + ) + .unwrap(); + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + assert_eq!(task.input.input_levels[0].table_infos.len(), 1); + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::SpaceReclaim + )); + for sst in &task.input.input_levels[0].table_infos { + assert_eq!(start_id, sst.id); + start_id += 1; + } + + assert!(selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + HashMap::default(), + ) + .is_none()) } { + // test state, after above 2 + for level_handler in &mut levels_handler { for pending_task_id in &level_handler.pending_tasks_ids() { level_handler.remove_task(*pending_task_id); @@ -266,6 +397,7 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ); assert!(task.is_none()); } @@ -286,6 +418,7 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + HashMap::default(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -308,5 +441,109 @@ mod test { compact_task::TaskType::SpaceReclaim )); } + + { + // test continuous file selection + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = SpaceReclaimCompactionSelector::default(); + // cut range [3,4] [6] [8,9,10] + levels.member_table_ids = vec![2, 5, 7]; + let expect_task_file_count = vec![2, 1, 3]; + let expect_task_sst_id_range = vec![vec![3, 4], vec![6], vec![8, 9, 10]]; + for (index, x) in expect_task_file_count.iter().enumerate() { + // // pick space reclaim + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + HashMap::default(), + ) + .unwrap(); + + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + + assert_eq!(task.input.input_levels[0].table_infos.len(), *x); + let select_sst = &task.input.input_levels[0] + .table_infos + .iter() + .map(|sst| sst.id) + .collect_vec(); + assert!(select_sst.is_sorted()); + assert_eq!(expect_task_sst_id_range[index], *select_sst); + + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::SpaceReclaim + )); + } + } + + { + // test continuous file selection with filter change + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = SpaceReclaimCompactionSelector::default(); + // cut range [3,4] [6] [8,9,10] + levels.member_table_ids = vec![2, 5, 7]; + let expect_task_file_count = vec![2, 1, 4]; + let expect_task_sst_id_range = vec![vec![3, 4], vec![6], vec![7, 8, 9, 10]]; + for (index, x) in expect_task_file_count.iter().enumerate() { + if index == expect_task_file_count.len() - 1 { + levels.member_table_ids = vec![2, 5]; + } + + // // pick space reclaim + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + HashMap::default(), + ) + .unwrap(); + + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + + assert_eq!(task.input.input_levels[0].table_infos.len(), *x); + let select_sst = &task.input.input_levels[0] + .table_infos + .iter() + .map(|sst| sst.id) + .collect_vec(); + assert!(select_sst.is_sorted()); + assert_eq!(expect_task_sst_id_range[index], *select_sst); + + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::SpaceReclaim + )); + } + } } } diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index d04a934a7cda..d9cf605da7e0 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -12,28 +12,101 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; + +use risingwave_common::catalog::TableOption; +use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::compaction_group::StateTableId; +use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_pb::hummock::hummock_version::Levels; -use risingwave_pb::hummock::InputLevel; +use risingwave_pb::hummock::{InputLevel, KeyRange, SstableInfo}; use crate::hummock::compaction::CompactionInput; use crate::hummock::level_handler::LevelHandler; +const MIN_TTL_EXPIRE_INTERVAL_MS: u64 = 60 * 60 * 1000; // 1h + #[derive(Default)] pub struct TtlPickerState { - last_select_index: usize, + // Because of the right_exclusive, we use KeyRangeCommon to determine if the end_bounds + // overlap instead of directly comparing Vec. We don't need to use the start_bound in the + // filter, set it to -inf + + // record the end_bound that has been scanned + pub last_select_end_bound: KeyRange, + + // record the end_bound in the current round of scanning tasks + pub end_bound_in_round: KeyRange, +} + +impl TtlPickerState { + pub fn valid(&self) -> bool { + !self.end_bound_in_round.right.is_empty() + } + + pub fn init(&mut self, key_range: KeyRange) { + self.last_select_end_bound = KeyRange { + left: vec![], + right: key_range.left.clone(), + right_exclusive: true, + }; + self.end_bound_in_round = key_range; + } + + pub fn clear(&mut self) { + self.end_bound_in_round = KeyRange::default(); + self.last_select_end_bound = KeyRange::default(); + } } pub struct TtlReclaimCompactionPicker { max_ttl_reclaim_bytes: u64, - // todo: filter table option + table_id_to_ttl: HashMap, } impl TtlReclaimCompactionPicker { - pub fn new(max_ttl_reclaim_bytes: u64) -> Self { + pub fn new( + max_ttl_reclaim_bytes: u64, + table_id_to_options: HashMap, + ) -> Self { + let table_id_to_ttl: HashMap = table_id_to_options + .iter() + .filter(|id_to_option| { + let table_option = id_to_option.1; + table_option.retention_seconds.is_some() + }) + .map(|id_to_option| (*id_to_option.0, id_to_option.1.retention_seconds.unwrap())) + .collect(); + Self { max_ttl_reclaim_bytes, + table_id_to_ttl, } } + + fn filter(&self, sst: &SstableInfo, current_epoch_time: u64) -> bool { + let table_id_in_sst = sst.table_ids.iter().cloned().collect::>(); + let expire_epoch = + Epoch::from_physical_time(current_epoch_time - MIN_TTL_EXPIRE_INTERVAL_MS); + + for table_id in table_id_in_sst { + match self.table_id_to_ttl.get(&table_id) { + Some(ttl_second_u32) => { + assert!(*ttl_second_u32 != TABLE_OPTION_DUMMY_RETENTION_SECOND); + // default to zero. + let ttl_mill = (*ttl_second_u32 * 1000) as u64; + let min_epoch = expire_epoch.subtract_ms(ttl_mill); + if Epoch(sst.min_epoch) <= min_epoch { + return false; + } + } + None => continue, + } + } + + true + } } impl TtlReclaimCompactionPicker { @@ -48,30 +121,83 @@ impl TtlReclaimCompactionPicker { let mut select_input_ssts = vec![]; let level_handler = &level_handlers[reclaimed_level.level_idx as usize]; - if state.last_select_index >= reclaimed_level.table_infos.len() { - state.last_select_index = 0; + if reclaimed_level.table_infos.is_empty() { + // 1. not file to be picked + state.clear(); + return None; } - let start_indedx = state.last_select_index; + if state.valid() + && state + .last_select_end_bound + .compare_right_with(&state.end_bound_in_round.right) + == std::cmp::Ordering::Greater + { + // in round but end_key overflow + // turn to next_round + state.clear(); + return None; + } + + if !state.valid() { + // new round init key_range bound with table_infos + let first_sst = reclaimed_level.table_infos.first().unwrap(); + let last_sst = reclaimed_level.table_infos.last().unwrap(); + + let key_range_this_round = KeyRange { + left: first_sst.key_range.as_ref().unwrap().left.clone(), + right: last_sst.key_range.as_ref().unwrap().right.clone(), + right_exclusive: last_sst.key_range.as_ref().unwrap().right_exclusive, + }; + + state.init(key_range_this_round); + } + + let current_epoch_time = Epoch::now().0; let mut select_file_size = 0; - for sst in &reclaimed_level.table_infos[start_indedx..] { - state.last_select_index += 1; - if level_handler.is_pending_compact(&sst.id) { + for sst in &reclaimed_level.table_infos { + let unmatched_sst = sst + .key_range + .as_ref() + .unwrap() + .sstable_overlap(&state.last_select_end_bound); + + if unmatched_sst + || level_handler.is_pending_compact(&sst.id) + || self.filter(sst, current_epoch_time) + { + if !select_input_ssts.is_empty() { + // Our goal is to pick as many complete layers of data as possible and keep the + // picked files contiguous to avoid overlapping key_ranges, so the strategy is + // to pick as many contiguous files as possible (at least one) + break; + } + continue; } select_input_ssts.push(sst.clone()); select_file_size += sst.file_size; + if select_file_size > self.max_ttl_reclaim_bytes { break; } } + // turn to next_round if select_input_ssts.is_empty() { + state.clear(); return None; } + let select_last_sst = select_input_ssts.last().unwrap(); + state.last_select_end_bound.full_key_extend(&KeyRange { + left: vec![], + right: select_last_sst.key_range.as_ref().unwrap().right.clone(), + right_exclusive: select_last_sst.key_range.as_ref().unwrap().right_exclusive, + }); + Some(CompactionInput { input_levels: vec![ InputLevel { @@ -93,7 +219,6 @@ impl TtlReclaimCompactionPicker { #[cfg(test)] mod test { - use itertools::Itertools; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{KeyRange, Level, LevelType}; @@ -102,7 +227,7 @@ mod test { use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::level_selector::tests::{ assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level, - generate_table, + generate_table_with_ids_and_epochs, }; use crate::hummock::compaction::level_selector::{LevelSelector, TtlCompactionSelector}; use crate::hummock::compaction::LocalSelectorStatistic; @@ -117,34 +242,129 @@ mod test { let group_config = CompactionGroup::new(1, config); let l0 = generate_l0_nonoverlapping_sublevels(vec![]); assert_eq!(l0.sub_levels.len(), 0); - let levels = vec![ + + let current_epoch_time = Epoch::now().0; + let expired_epoch = Epoch::from_physical_time( + current_epoch_time - MIN_TTL_EXPIRE_INTERVAL_MS - (1000 * 1000), + ) + .0; + let mut levels = vec![ generate_level(1, vec![]), generate_level(2, vec![]), generate_level( 3, vec![ - generate_table(0, 1, 150, 151, 1), - generate_table(1, 1, 250, 251, 1), + generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0), + generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0), ], ), Level { level_idx: 4, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![ - generate_table(2, 1, 0, 100, 1), - generate_table(3, 1, 101, 200, 1), - generate_table(4, 1, 222, 300, 1), - generate_table(5, 1, 333, 400, 1), - generate_table(6, 1, 444, 500, 1), - generate_table(7, 1, 555, 600, 1), - generate_table(8, 1, 666, 700, 1), - generate_table(9, 1, 777, 800, 1), - generate_table(10, 1, 888, 900, 1), + generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], expired_epoch, 0), + generate_table_with_ids_and_epochs( + 3, + 1, + 101, + 200, + 1, + vec![3], + expired_epoch, + 0, + ), + generate_table_with_ids_and_epochs( + 4, + 1, + 222, + 300, + 1, + vec![4], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 5, + 1, + 333, + 400, + 1, + vec![5], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 6, + 1, + 444, + 500, + 1, + vec![6], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 7, + 1, + 555, + 600, + 1, + vec![7], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 8, + 1, + 666, + 700, + 1, + vec![8], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 9, + 1, + 777, + 800, + 1, + vec![9], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 10, + 1, + 888, + 1600, + 1, + vec![10], + expired_epoch, + u64::MAX, + ), + generate_table_with_ids_and_epochs( + 11, + 1, + 1600, + 1800, + 1, + vec![10], + expired_epoch, + u64::MAX, + ), ], total_file_size: 0, sub_level_id: 0, }, ]; + + { + let sst_10 = levels[3].table_infos.get_mut(8).unwrap(); + assert_eq!(10, sst_10.id); + sst_10.key_range.as_mut().unwrap().right_exclusive = true; + } + assert_eq!(levels.len(), 4); let levels = Levels { levels, @@ -155,7 +375,17 @@ mod test { let mut local_stats = LocalSelectorStatistic::default(); let mut selector = TtlCompactionSelector::default(); { - // pick space reclaim + let table_id_to_options: HashMap = (2..=10) + .map(|table_id| { + ( + table_id as u32, + TableOption { + retention_seconds: Some(5_u32), + }, + ) + }) + .collect(); + // pick ttl reclaim let task = selector .pick_compaction( 1, @@ -163,6 +393,7 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + table_id_to_options, ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -192,7 +423,18 @@ mod test { } } - // pick space reclaim + let table_id_to_options: HashMap = (2..=10) + .map(|table_id| { + ( + table_id as u32, + TableOption { + retention_seconds: Some(5_u32), + }, + ) + }) + .collect(); + + // pick ttl reclaim let task = selector .pick_compaction( 1, @@ -200,18 +442,15 @@ mod test { &levels, &mut levels_handler, &mut local_stats, + table_id_to_options.clone(), ) .unwrap(); assert_compaction_task(&task, &levels_handler); assert_eq!(task.input.input_levels.len(), 2); assert_eq!(task.input.input_levels[0].level_idx, 4); - // test select index, picker will select file from last_select_index - let all_file_count = levels.get_levels().last().unwrap().get_table_infos().len(); - assert_eq!( - task.input.input_levels[0].table_infos.len(), - all_file_count - 5 - ); + // test select index, picker will select file from state + assert_eq!(task.input.input_levels[0].table_infos.len(), 4); let mut start_id = 7; for sst in &task.input.input_levels[0].table_infos { @@ -226,6 +465,299 @@ mod test { task.compaction_task_type, compact_task::TaskType::Ttl )); + + // test pick key_range right exclusive + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + table_id_to_options.clone(), + ) + .unwrap(); + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + assert_eq!(task.input.input_levels[0].table_infos.len(), 1); + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::Ttl + )); + for sst in &task.input.input_levels[0].table_infos { + assert_eq!(start_id, sst.id); + start_id += 1; + } + + assert!(selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + table_id_to_options, + ) + .is_none()) + } + + { + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = TtlCompactionSelector::default(); + let mut table_id_to_options: HashMap = (2..=10) + .map(|table_id| { + ( + table_id as u32, + TableOption { + retention_seconds: Some(7200), + }, + ) + }) + .collect(); + + table_id_to_options.insert( + 5, + TableOption { + retention_seconds: Some(5), + }, + ); + + // // pick ttl reclaim + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + table_id_to_options, + ) + .unwrap(); + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + + // test table_option_filter + assert_eq!(task.input.input_levels[0].table_infos.len(), 1); + let select_sst = &task.input.input_levels[0].table_infos.first().unwrap(); + assert_eq!(select_sst.id, 5); + + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::Ttl + )); + } + + { + // test empty table_option filter + + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = TtlCompactionSelector::default(); + + // // pick ttl reclaim + let task = selector.pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + HashMap::default(), + ); + + // empty table_options does not select any files + assert!(task.is_none()); + } + + { + // test continuous file selection + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = TtlCompactionSelector::default(); + let mut table_id_to_options: HashMap = (2..=10) + .map(|table_id| { + ( + table_id as u32, + TableOption { + retention_seconds: Some(5_u32), + }, + ) + }) + .collect(); + + // cut range [2,3,4] [6,7] [10] + table_id_to_options.insert( + 5, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + table_id_to_options.insert( + 8, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + table_id_to_options.insert( + 9, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + let expect_task_file_count = vec![3, 2, 1]; + let expect_task_sst_id_range = vec![vec![2, 3, 4], vec![6, 7], vec![10]]; + for (index, x) in expect_task_file_count.iter().enumerate() { + // // pick ttl reclaim + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + table_id_to_options.clone(), + ) + .unwrap(); + + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + + // test table_option_filter + assert_eq!(task.input.input_levels[0].table_infos.len(), *x); + let select_sst = &task.input.input_levels[0] + .table_infos + .iter() + .map(|sst| sst.id) + .collect_vec(); + assert!(select_sst.is_sorted()); + assert_eq!(expect_task_sst_id_range[index], *select_sst); + + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::Ttl + )); + } + } + + { + // test continuous file selection with filter change + for level_handler in &mut levels_handler { + for pending_task_id in &level_handler.pending_tasks_ids() { + level_handler.remove_task(*pending_task_id); + } + } + + // rebuild selector + selector = TtlCompactionSelector::default(); + let mut table_id_to_options: HashMap = (2..=10) + .map(|table_id| { + ( + table_id as u32, + TableOption { + retention_seconds: Some(5_u32), + }, + ) + }) + .collect(); + + // cut range [2,3,4] [6,7] [10] + table_id_to_options.insert( + 5, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + table_id_to_options.insert( + 8, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + table_id_to_options.insert( + 9, + TableOption { + retention_seconds: Some(7200_u32), + }, + ); + + let expect_task_file_count = vec![3, 3]; + let expect_task_sst_id_range = vec![vec![2, 3, 4], vec![5, 6, 7]]; + for (index, x) in expect_task_file_count.iter().enumerate() { + if index == expect_task_file_count.len() - 1 { + table_id_to_options.insert( + 5, + TableOption { + retention_seconds: Some(5_u32), + }, + ); + } + + // // pick ttl reclaim + let task = selector + .pick_compaction( + 1, + &group_config, + &levels, + &mut levels_handler, + &mut local_stats, + table_id_to_options.clone(), + ) + .unwrap(); + + assert_compaction_task(&task, &levels_handler); + assert_eq!(task.input.input_levels.len(), 2); + assert_eq!(task.input.input_levels[0].level_idx, 4); + + // test table_option_filter + assert_eq!(task.input.input_levels[0].table_infos.len(), *x); + let select_sst = &task.input.input_levels[0] + .table_infos + .iter() + .map(|sst| sst.id) + .collect_vec(); + assert!(select_sst.is_sorted()); + assert_eq!(expect_task_sst_id_range[index], *select_sst); + + assert_eq!(task.input.input_levels[1].level_idx, 4); + assert_eq!(task.input.input_levels[1].table_infos.len(), 0); + assert_eq!(task.input.target_level, 4); + assert!(matches!( + task.compaction_task_type, + compact_task::TaskType::Ttl + )); + } } } } diff --git a/src/meta/src/hummock/compaction_schedule_policy.rs b/src/meta/src/hummock/compaction_schedule_policy.rs index f49f88e31b38..e5f191a6f3ca 100644 --- a/src/meta/src/hummock/compaction_schedule_policy.rs +++ b/src/meta/src/hummock/compaction_schedule_policy.rs @@ -422,6 +422,8 @@ mod tests { stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch: 0, + max_epoch: 0, }], }], splits: vec![], diff --git a/src/meta/src/hummock/compaction_scheduler.rs b/src/meta/src/hummock/compaction_scheduler.rs index 0beacea7168e..a02230bf71c6 100644 --- a/src/meta/src/hummock/compaction_scheduler.rs +++ b/src/meta/src/hummock/compaction_scheduler.rs @@ -16,6 +16,9 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; +use futures::future::{Either, Shared}; +use futures::stream::select; +use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::CompactionGroupId; @@ -23,9 +26,10 @@ use risingwave_pb::hummock::compact_task::{self, TaskStatus}; use risingwave_pb::hummock::subscribe_compact_tasks_response::Task; use risingwave_pb::hummock::CompactTask; use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Receiver; use tokio::sync::Notify; +use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; use super::Compactor; use crate::hummock::compaction::{ @@ -45,7 +49,7 @@ type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType); /// compaction groups. pub struct CompactionRequestChannel { request_tx: UnboundedSender, - scheduled: Mutex>, + scheduled: Mutex>, } #[derive(Debug, PartialEq)] @@ -72,16 +76,21 @@ impl CompactionRequestChannel { task_type: compact_task::TaskType, ) -> Result> { let mut guard = self.scheduled.lock(); - if guard.contains(&compaction_group) { + let key = (compaction_group, task_type); + if guard.contains(&key) { return Ok(false); } - self.request_tx.send((compaction_group, task_type))?; - guard.insert(compaction_group); + self.request_tx.send(key)?; + guard.insert(key); Ok(true) } - pub fn unschedule(&self, compaction_group: CompactionGroupId) { - self.scheduled.lock().remove(&compaction_group); + pub fn unschedule( + &self, + compaction_group: CompactionGroupId, + task_type: compact_task::TaskType, + ) { + self.scheduled.lock().remove(&(compaction_group, task_type)); } } @@ -118,8 +127,8 @@ where } } - pub async fn start(&self, mut shutdown_rx: Receiver<()>) { - let (sched_tx, mut sched_rx) = + pub async fn start(&self, shutdown_rx: Receiver<()>) { + let (sched_tx, sched_rx) = tokio::sync::mpsc::unbounded_channel::(); let sched_channel = Arc::new(CompactionRequestChannel::new(sched_tx)); @@ -129,86 +138,22 @@ where ); tracing::info!("Start compaction scheduler."); - let mut min_trigger_interval = tokio::time::interval(Duration::from_secs( - self.env.opts.periodic_compaction_interval_sec, - )); - min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut min_space_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs( + let compaction_selectors = Self::init_selectors(); + let shutdown_rx = shutdown_rx.shared(); + let schedule_event_stream = Self::scheduler_event_stream( + sched_rx, self.env.opts.periodic_space_reclaim_compaction_interval_sec, - )); - min_space_reclaim_trigger_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut compaction_selectors = Self::init_selectors(); - - loop { - let (compaction_group, task_type) = tokio::select! { - recv = sched_rx.recv() => { - match recv { - Some((compaction_group, task_type)) => (compaction_group, task_type), - None => { - tracing::warn!("Compactor Scheduler: The Hummock manager has dropped the connection, - it means it has either died or started a new session. Exiting."); - return; - } - } - }, - - _ = min_trigger_interval.tick() => { - // Disable periodic trigger for compaction_deterministic_test. - if self.env.opts.compaction_deterministic_test { - continue; - } - // Periodically trigger compaction for all compaction groups. - for cg_id in self.hummock_manager.compaction_group_ids().await { - if let Err(e) = sched_channel.try_sched_compaction(cg_id, compact_task::TaskType::Dynamic) { - tracing::warn!("Failed to schedule base compaction for compaction group {}. {}", cg_id, e); - } - } - continue; - }, - - _ = min_space_reclaim_trigger_interval.tick() => { - // Disable periodic trigger for compaction_deterministic_test. - if self.env.opts.compaction_deterministic_test { - continue; - } - // Periodically trigger space_reclaim compaction for all compaction groups. - for cg_id in self.hummock_manager.compaction_group_ids().await { - if let Err(e) = sched_channel.try_sched_compaction(cg_id, compact_task::TaskType::SpaceReclaim) { - tracing::warn!("Failed to schedule base compaction for compaction group {}. {}", cg_id, e); - } - } - continue; - } - - // Shutdown compactor scheduler - _ = &mut shutdown_rx => { - break; - } - }; - - sync_point::sync_point!("BEFORE_SCHEDULE_COMPACTION_TASK"); - sched_channel.unschedule(compaction_group); - - // Wait for a compactor to become available. - let compactor = loop { - if let Some(compactor) = self.hummock_manager.get_idle_compactor().await { - break compactor; - } else { - tracing::debug!("No available compactor, pausing compaction."); - tokio::select! { - _ = self.compaction_resume_notifier.notified() => {}, - _ = &mut shutdown_rx => { - return; - } - } - } - }; - let selector = compaction_selectors.get_mut(&task_type).unwrap(); - self.pick_and_assign(compaction_group, compactor, sched_channel.clone(), selector) - .await; - } + self.env.opts.periodic_ttl_reclaim_compaction_interval_sec, + self.env.opts.periodic_compaction_interval_sec, + ); + self.schedule_loop( + sched_channel.clone(), + shutdown_rx, + compaction_selectors, + schedule_event_stream, + ) + .await; } fn init_selectors() -> HashMap> { @@ -318,7 +263,12 @@ where ); } Err(err) => { - tracing::warn!("Failed to assign compaction task to compactor: {:#?}", err); + tracing::warn!( + "Failed to assign {:?} compaction task to compactor {} : {:#?}", + compact_task.task_type().as_str_name(), + compactor.context_id(), + err + ); match err { Error::CompactionTaskAlreadyAssigned(_, _) => { panic!("Compaction scheduler is the only tokio task that can assign task."); @@ -368,6 +318,207 @@ where } ScheduleStatus::Ok } + + async fn schedule_loop( + &self, + sched_channel: Arc, + shutdown_rx: Shared>, + mut compaction_selectors: HashMap>, + event_stream: impl Stream, + ) { + use futures::pin_mut; + pin_mut!(event_stream); + + loop { + let item = futures::future::select(event_stream.next(), shutdown_rx.clone()).await; + match item { + Either::Left((event, _)) => { + if let Some(event) = event { + match event { + SchedulerEvent::Channel((compaction_group, task_type)) => { + // recv + if !self + .on_handle_compact( + compaction_group, + &mut compaction_selectors, + task_type, + sched_channel.clone(), + shutdown_rx.clone(), + ) + .await + { + break; + } + } + SchedulerEvent::DynamicTrigger => { + // Disable periodic trigger for compaction_deterministic_test. + if self.env.opts.compaction_deterministic_test { + continue; + } + // Periodically trigger compaction for all compaction groups. + self.on_handle_trigger_multi_grouop( + sched_channel.clone(), + compact_task::TaskType::Dynamic, + ) + .await; + continue; + } + SchedulerEvent::SpaceReclaimTrigger => { + // Disable periodic trigger for compaction_deterministic_test. + if self.env.opts.compaction_deterministic_test { + continue; + } + // Periodically trigger compaction for all compaction groups. + self.on_handle_trigger_multi_grouop( + sched_channel.clone(), + compact_task::TaskType::SpaceReclaim, + ) + .await; + continue; + } + SchedulerEvent::TtlReclaimTrigger => { + // Disable periodic trigger for compaction_deterministic_test. + if self.env.opts.compaction_deterministic_test { + continue; + } + // Periodically trigger compaction for all compaction groups. + self.on_handle_trigger_multi_grouop( + sched_channel.clone(), + compact_task::TaskType::Ttl, + ) + .await; + continue; + } + } + } + } + + Either::Right((_, _shutdown)) => { + break; + } + } + } + } + + async fn on_handle_compact( + &self, + compaction_group: CompactionGroupId, + compaction_selectors: &mut HashMap>, + task_type: compact_task::TaskType, + sched_channel: Arc, + shutdown_rx: Shared>, + ) -> bool { + sync_point::sync_point!("BEFORE_SCHEDULE_COMPACTION_TASK"); + sched_channel.unschedule(compaction_group, task_type); + + self.task_dispatch( + compaction_group, + task_type, + compaction_selectors, + sched_channel, + shutdown_rx, + ) + .await + } + + async fn on_handle_trigger_multi_grouop( + &self, + sched_channel: Arc, + task_type: compact_task::TaskType, + ) { + for cg_id in self.hummock_manager.compaction_group_ids().await { + if let Err(e) = sched_channel.try_sched_compaction(cg_id, task_type) { + tracing::warn!( + "Failed to schedule {:?} compaction for compaction group {}. {}", + task_type, + cg_id, + e + ); + } + } + } + + async fn task_dispatch( + &self, + compaction_group: CompactionGroupId, + task_type: compact_task::TaskType, + compaction_selectors: &mut HashMap>, + sched_channel: Arc, + mut shutdown_rx: Shared>, + ) -> bool { + // Wait for a compactor to become available. + let compactor = loop { + if let Some(compactor) = self.hummock_manager.get_idle_compactor().await { + break compactor; + } else { + tracing::debug!("No available compactor, pausing compaction."); + tokio::select! { + _ = self.compaction_resume_notifier.notified() => {}, + _ = &mut shutdown_rx => { + return false; + } + } + } + }; + let selector = compaction_selectors.get_mut(&task_type).unwrap(); + self.pick_and_assign(compaction_group, compactor, sched_channel.clone(), selector) + .await; + + true + } +} + +enum SchedulerEvent { + Channel((CompactionGroupId, compact_task::TaskType)), + DynamicTrigger, + SpaceReclaimTrigger, + TtlReclaimTrigger, +} + +impl CompactionScheduler +where + S: MetaStore, +{ + fn scheduler_event_stream( + sched_rx: UnboundedReceiver<(CompactionGroupId, compact_task::TaskType)>, + periodic_space_reclaim_compaction_interval_sec: u64, + periodic_ttl_reclaim_compaction_interval_sec: u64, + periodic_compaction_interval_sec: u64, + ) -> impl Stream { + let dynamic_channel_trigger = + UnboundedReceiverStream::new(sched_rx).map(SchedulerEvent::Channel); + + let mut min_trigger_interval = + tokio::time::interval(Duration::from_secs(periodic_compaction_interval_sec)); + min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let dynamic_tick_trigger = + IntervalStream::new(min_trigger_interval).map(|_| SchedulerEvent::DynamicTrigger); + + let mut min_space_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs( + periodic_space_reclaim_compaction_interval_sec, + )); + + min_space_reclaim_trigger_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval) + .map(|_| SchedulerEvent::SpaceReclaimTrigger); + + let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs( + periodic_ttl_reclaim_compaction_interval_sec, + )); + min_ttl_reclaim_trigger_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval) + .map(|_| SchedulerEvent::TtlReclaimTrigger); + + select( + dynamic_channel_trigger, + select( + dynamic_tick_trigger, + select(space_reclaim_trigger, ttl_reclaim_trigger), + ), + ) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 73faf60da8f9..07fd5c8889e6 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -44,6 +44,7 @@ use risingwave_pb::hummock::{ version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats, IntraLevelDelta, LevelType, + TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::sync::oneshot::Sender; @@ -732,6 +733,11 @@ where compaction_group_id: CompactionGroupId, selector: &mut Box, ) -> Result> { + // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the + // lock in compaction_guard, take out all table_options in advance there may be a + // waste of resources here, need to add a more efficient filter in catalog_manager + let all_table_id_to_option = self.catalog_manager.get_all_table_options().await; + let mut compaction_guard = write_lock!(self, compaction).await; let compaction = compaction_guard.deref_mut(); let compaction_statuses = &mut compaction.compaction_statuses; @@ -777,12 +783,21 @@ where let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic); let mut stats = LocalSelectorStatistic::default(); + let member_table_ids = ¤t_version + .get_compaction_group_levels(compaction_group_id) + .member_table_ids; + let table_id_to_option: HashMap = all_table_id_to_option + .into_iter() + .filter(|(table_id, _)| member_table_ids.contains(table_id)) + .collect(); + let compact_task = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, &group_config, &mut stats, selector, + table_id_to_option.clone(), ); stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); let mut compact_task = match compact_task { @@ -828,12 +843,15 @@ where } } - compact_task.table_options = self - .catalog_manager - .get_table_options(&compact_task.existing_table_ids) - .await - .iter() - .map(|(k, v)| (*k, v.into())) + compact_task.table_options = table_id_to_option + .into_iter() + .filter_map(|(table_id, table_option)| { + if compact_task.existing_table_ids.contains(&table_id) { + return Some((table_id, TableOption::from(&table_option))); + } + + None + }) .collect(); compact_task.current_epoch_time = Epoch::now().0; compact_task.compaction_filter_mask = @@ -1266,9 +1284,10 @@ where if !deterministic_mode && matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) { + // only try send Dynamic compaction self.try_send_compaction_request( compact_task.compaction_group_id, - compact_task.task_type(), + compact_task::TaskType::Dynamic, ); } diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index c7b7c279aa05..4b5327f5e0d8 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -202,7 +202,7 @@ impl HummockMetaClient for MockHummockMetaClient { let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { while let Some((group, task_type)) = sched_rx.recv().await { - sched_channel.unschedule(group); + sched_channel.unschedule(group, task_type); let mut selector: Box = match task_type { compact_task::TaskType::Dynamic => default_level_selector(), diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index 8801012eb27c..77c5854cc2ad 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - pub mod compaction; mod compaction_schedule_policy; mod compaction_scheduler; diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 0a74ded9267b..8400d0a74308 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -163,6 +163,8 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec) -> Vec Pin + Send>> { periodic_space_reclaim_compaction_interval_sec: config .meta .periodic_space_reclaim_compaction_interval_sec, + periodic_ttl_reclaim_compaction_interval_sec: config + .meta + .periodic_ttl_reclaim_compaction_interval_sec, }, ) .await diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 014b8f641b58..f720d6f0dc1d 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -200,15 +200,10 @@ impl DatabaseManager { self.tables.values().cloned().collect_vec() } - pub fn get_table_options(&self, table_ids: &[TableId]) -> HashMap { + pub fn get_all_table_options(&self) -> HashMap { self.tables .iter() - .filter_map(|(id, table)| { - if table_ids.contains(id) { - return Some((*id, TableOption::build_table_option(&table.properties))); - } - None - }) + .map(|(id, table)| (*id, TableOption::build_table_option(&table.properties))) .collect() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index e9faf9c8441c..5e5c19e0cea3 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1459,8 +1459,8 @@ where self.core.lock().await.database.list_tables() } - pub async fn get_table_options(&self, table_ids: &[TableId]) -> HashMap { - self.core.lock().await.database.get_table_options(table_ids) + pub async fn get_all_table_options(&self) -> HashMap { + self.core.lock().await.database.get_all_table_options() } pub async fn list_table_ids(&self, schema_id: SchemaId) -> Vec { diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 631581b96266..c46cbcfc6114 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -113,6 +113,9 @@ pub struct MetaOpts { /// Schedule space_reclaim_compaction for all compaction groups with this interval. pub periodic_space_reclaim_compaction_interval_sec: u64, + + /// Schedule ttl_reclaim_compaction for all compaction groups with this interval. + pub periodic_ttl_reclaim_compaction_interval_sec: u64, } impl MetaOpts { @@ -141,6 +144,7 @@ impl MetaOpts { state_store: None, data_directory: "hummock_001".to_string(), periodic_space_reclaim_compaction_interval_sec: 60, + periodic_ttl_reclaim_compaction_interval_sec: 60, } } diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index b59b9a89daa5..ad441f0505b8 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -158,6 +158,8 @@ async fn test_read_version_basic() { stale_key_count: 1, total_key_count: 1, divide_version: 0, + min_epoch: 0, + max_epoch: 0, }), LocalSstableInfo::for_test(SstableInfo { id: 2, @@ -172,6 +174,8 @@ async fn test_read_version_basic() { stale_key_count: 1, total_key_count: 1, divide_version: 0, + min_epoch: 0, + max_epoch: 0, }), ], epoch_id_vec_for_clear, diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 31a8fbf14594..4459070df1b8 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -826,6 +826,8 @@ mod tests { stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch: 0, + max_epoch: 0, })] } diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 313c7fee698c..2c87f503f50e 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp; use std::collections::BTreeSet; use std::sync::Arc; @@ -115,7 +116,11 @@ pub struct SstableBuilder { /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats` /// by `finalize_last_table_stats` last_table_stats: TableStats, + filter_builder: F, + + min_epoch: u64, + max_epoch: u64, } impl SstableBuilder { @@ -163,6 +168,8 @@ impl SstableBuilder { total_key_count: 0, table_stats: Default::default(), last_table_stats: Default::default(), + min_epoch: u64::MAX, + max_epoch: u64::MIN, } } @@ -256,6 +263,9 @@ impl SstableBuilder { self.raw_key.clear(); self.raw_value.clear(); + self.min_epoch = cmp::min(self.min_epoch, full_key.epoch); + self.max_epoch = cmp::max(self.max_epoch, full_key.epoch); + if self.block_builder.approximate_len() >= self.options.block_capacity { self.build_block().await?; } @@ -333,6 +343,20 @@ impl SstableBuilder { range_tombstone_list: self.range_tombstones, }; meta.estimated_size = meta.encoded_size() as u32 + meta_offset as u32; + + // Expand the epoch of the whole sst by tombstone epoch + let (tombstone_min_epoch, tombstone_max_epoch) = { + let mut tombstone_min_epoch = u64::MAX; + let mut tombstone_max_epoch = u64::MIN; + + for tombstone in &meta.range_tombstone_list { + tombstone_min_epoch = cmp::min(tombstone_min_epoch, tombstone.sequence); + tombstone_max_epoch = cmp::max(tombstone_max_epoch, tombstone.sequence); + } + + (tombstone_min_epoch, tombstone_max_epoch) + }; + let sst_info = SstableInfo { id: self.sstable_id, key_range: Some(risingwave_pb::hummock::KeyRange { @@ -346,12 +370,17 @@ impl SstableBuilder { stale_key_count: self.stale_key_count, total_key_count: self.total_key_count, divide_version: 0, + min_epoch: cmp::min(self.min_epoch, tombstone_min_epoch), + max_epoch: cmp::max(self.max_epoch, tombstone_max_epoch), }; tracing::trace!( - "meta_size {} bloom_filter_size {} add_key_counts {} ", + "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {}", meta.encoded_size(), meta.bloom_filter.len(), self.total_key_count, + self.stale_key_count, + self.min_epoch, + self.max_epoch, ); let bloom_filter_size = meta.bloom_filter.len(); let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() { diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index d10574cb41db..208041815a08 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -211,6 +211,8 @@ impl Sstable { stale_key_count: 0, total_key_count: self.meta.key_count as u64, divide_version: 0, + min_epoch: 0, + max_epoch: 0, } } } diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index dbe8530da2e5..edc733b488df 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -110,6 +110,8 @@ pub fn gen_dummy_sst_info( stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch: 0, + max_epoch: 0, } } @@ -182,6 +184,8 @@ pub async fn put_sst( stale_key_count: 0, total_key_count: 0, divide_version: 0, + min_epoch: 0, + max_epoch: 0, }; let writer_output = writer.finish(meta).await?; writer_output.await.unwrap()?; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 853fdcc9cb91..14680804350d 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -92,7 +92,7 @@ stable_deref_trait = { version = "1" } strum = { version = "0.24", features = ["derive"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "load-shed", "retry", "timeout", "util"] } @@ -187,7 +187,7 @@ strum = { version = "0.24", features = ["derive"] } syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tonic-build = { version = "0.8" }