Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): refactor emergency picker #15954

Merged
merged 15 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
pub use selector::CompactionSelector;

use self::selector::LocalSelectorStatistic;
use self::selector::{EmergencySelector, LocalSelectorStatistic};
use super::check_cg_write_limit;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::compaction::picker::CompactionInput;
use crate::hummock::level_handler::LevelHandler;
Expand Down Expand Up @@ -101,15 +102,34 @@ impl CompactStatus {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
selector.pick_compaction(
if let Some(task) = selector.pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
)
table_id_to_options.clone(),
developer_config.clone(),
) {
return Some(task);
} else {
let compaction_group_config = &group.compaction_config;
if check_cg_write_limit(levels, compaction_group_config.as_ref()).is_write_stop()
&& compaction_group_config.enable_emergency_picker
{
return EmergencySelector::default().pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
);
}
}

None
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
Expand Down
43 changes: 42 additions & 1 deletion src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::subscribe_compaction_event_request::{
self, Event as RequestEvent, PullTask,
};
use risingwave_pb::hummock::subscribe_compaction_event_response::{
Event as ResponseEvent, PullTaskAck,
};
use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment};
use risingwave_pb::hummock::{
CompactStatus as PbCompactStatus, CompactTaskAssignment, CompactionConfig,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Receiver as OneShotReceiver;
Expand Down Expand Up @@ -257,3 +260,41 @@ impl HummockManager {
}
}
}

pub fn check_cg_write_limit(
levels: &Levels,
compaction_config: &CompactionConfig,
) -> WriteLimitType {
let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len();
if threshold < l0_sub_level_number {
return WriteLimitType::WriteStop(l0_sub_level_number, threshold);
}

WriteLimitType::Unlimited
}

pub enum WriteLimitType {
Unlimited,

// (l0_level_count, threshold)
WriteStop(usize, usize),
}

impl WriteLimitType {
pub fn as_str(&self) -> String {
match self {
Self::Unlimited => "Unlimited".to_string(),
Self::WriteStop(l0_level_count, threshold) => {
format!(
"WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
l0_level_count, threshold
)
}
}
}

pub fn is_write_stop(&self) -> bool {
matches!(self, Self::WriteStop(_, _))
}
}
34 changes: 3 additions & 31 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ mod utils;
mod worker;

use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;

type Snapshot = ArcSwap<HummockSnapshot>;
Expand Down Expand Up @@ -2871,6 +2872,7 @@ impl HummockManager {
*self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition;
}

/// dedicated event runtime for CPU/IO bound event
pub fn compaction_event_loop(
hummock_manager: Arc<Self>,
mut compactor_streams_change_rx: UnboundedReceiver<(
Expand Down Expand Up @@ -3078,39 +3080,14 @@ impl HummockManager {

/// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
/// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
#[named]
pub async fn auto_pick_compaction_groups_and_type(
&self,
) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
let versioning_guard = read_lock!(self, versioning).await;
let versioning = versioning_guard.deref();
let mut compaction_group_ids =
get_compaction_group_ids(&versioning.current_version).collect_vec();
let mut compaction_group_ids = self.compaction_group_ids().await;
compaction_group_ids.shuffle(&mut thread_rng());

let mut normal_groups = vec![];
for cg_id in compaction_group_ids {
if versioning.write_limit.contains_key(&cg_id) {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let enable_emergency_picker = match self
.compaction_group_manager
.read()
.await
.try_get_compaction_group_config(cg_id)
{
Some(config) => config.compaction_config.enable_emergency_picker,
None => {
unreachable!("compaction-group {} not exist", cg_id)
}
};

if enable_emergency_picker {
if normal_groups.is_empty() {
return (vec![cg_id], TaskType::Emergency);
} else {
break;
}
}
}
if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
if pick_type == TaskType::Dynamic {
normal_groups.push(cg_id);
Expand Down Expand Up @@ -3491,10 +3468,6 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto
compact_task::TaskType::Tombstone,
Box::<TombstoneCompactionSelector>::default(),
);
compaction_selectors.insert(
compact_task::TaskType::Emergency,
Box::<EmergencySelector>::default(),
);
compaction_selectors
}

Expand All @@ -3503,7 +3476,6 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersion;
use tokio::sync::mpsc::error::SendError;

use super::compaction::selector::EmergencySelector;
use super::compaction::CompactionSelector;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::sequence::next_sstable_object_id;
Expand Down
19 changes: 7 additions & 12 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_pb::hummock::{
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use super::check_cg_write_limit;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
Expand Down Expand Up @@ -321,20 +322,14 @@ pub(super) fn calc_new_write_limits(
}
Some(levels) => levels,
};
// Add write limit conditions here.
let threshold = config
.compaction_config
.level0_stop_write_threshold_sub_level_number as usize;
let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len();
if threshold < l0_sub_level_number {

let write_limit_type = check_cg_write_limit(levels, config.compaction_config.as_ref());
if write_limit_type.is_write_stop() {
new_write_limits.insert(
*id,
WriteLimit {
table_ids: levels.member_table_ids.clone(),
reason: format!(
"too many L0 sub levels: {} > {}",
l0_sub_level_number, threshold
),
reason: write_limit_type.as_str(),
},
);
continue;
Expand Down Expand Up @@ -519,7 +514,7 @@ mod tests {
);
assert_eq!(
new_write_limits.get(&1).as_ref().unwrap().reason,
"too many L0 sub levels: 11 > 10"
"WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
);
assert_eq!(new_write_limits.len(), 2);

Expand All @@ -540,7 +535,7 @@ mod tests {
);
assert_eq!(
new_write_limits.get(&1).as_ref().unwrap().reason,
"too many L0 sub levels: 11 > 5"
"WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
);
}

Expand Down
Loading