Skip to content

Commit

Permalink
Merge branch 'main' into yiming/iceberg-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Feb 21, 2023
2 parents 1fa8354 + 672aad5 commit 762fe8a
Show file tree
Hide file tree
Showing 29 changed files with 1,268 additions and 220 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -119,6 +119,7 @@ incremental = false
[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "cc95ee3" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
10 changes: 10 additions & 0 deletions dashboard/proto/gen/hummock.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions grafana/risingwave-dashboard.dashboard.py
Expand Up @@ -2499,15 +2499,11 @@ def section_memory_manager(outer_panels):
],
),
panels.timeseries_ms(
"LRU manager watermark_time and physical_now",
"",
"LRU manager diff between watermark_time and now (ms)",
"watermark_time is the current lower watermark of cached data. physical_now is the current time of the machine. The diff (physical_now - watermark_time) shows how much data is cached.",
[
panels.target(
f"{metric('lru_current_watermark_time_ms')}",
"",
),
panels.target(
f"{metric('lru_physical_now_ms')}",
f"{metric('lru_physical_now_ms')} - {metric('lru_current_watermark_time_ms')}",
"",
),
],
Expand Down
2 changes: 2 additions & 0 deletions proto/hummock.proto
Expand Up @@ -19,6 +19,8 @@ message SstableInfo {
uint64 total_key_count = 7;
// When a SST is divided, its divide_version will increase one.
uint64 divide_version = 8;
uint64 min_epoch = 9;
uint64 max_epoch = 10;
}

enum LevelType {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Expand Up @@ -159,6 +159,10 @@ pub struct MetaConfig {
/// Schedule space_reclaim compaction for all compaction groups with this interval.
#[serde(default = "default::meta::periodic_space_reclaim_compaction_interval_sec")]
pub periodic_space_reclaim_compaction_interval_sec: u64,

/// Schedule ttl_reclaim compaction for all compaction groups with this interval.
#[serde(default = "default::meta::periodic_ttl_reclaim_compaction_interval_sec")]
pub periodic_ttl_reclaim_compaction_interval_sec: u64,
}

impl Default for MetaConfig {
Expand Down Expand Up @@ -507,6 +511,10 @@ mod default {
pub fn periodic_space_reclaim_compaction_interval_sec() -> u64 {
3600 // 60min
}

pub fn periodic_ttl_reclaim_compaction_interval_sec() -> u64 {
1800 // 30mi
}
}

pub mod server {
Expand Down
25 changes: 22 additions & 3 deletions src/meta/src/hummock/compaction/level_selector.rs
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
// (found in the LICENSE.Apache file in the root directory).
use std::sync::Arc;

use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig};
Expand Down Expand Up @@ -50,6 +51,7 @@ pub trait LevelSelector: Sync + Send {
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask>;

fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
Expand Down Expand Up @@ -246,6 +248,7 @@ impl LevelSelector for DynamicLevelSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core =
DynamicLevelSelectorCore::new(compaction_group.compaction_config.clone());
Expand Down Expand Up @@ -305,6 +308,7 @@ impl LevelSelector for ManualCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
Expand Down Expand Up @@ -360,6 +364,7 @@ impl LevelSelector for SpaceReclaimCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let mut picker = SpaceReclaimCompactionPicker::new(
Expand Down Expand Up @@ -404,11 +409,14 @@ impl LevelSelector for TtlCompactionSelector {
levels: &Levels,
level_handlers: &mut [LevelHandler],
_selector_stats: &mut LocalSelectorStatistic,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let picker =
TtlReclaimCompactionPicker::new(group.compaction_config.max_space_reclaim_bytes);
let picker = TtlReclaimCompactionPicker::new(
group.compaction_config.max_space_reclaim_bytes,
table_id_to_options,
);
let state = self
.state
.entry(group.group_id)
Expand Down Expand Up @@ -506,16 +514,21 @@ pub mod tests {
stale_key_count: 0,
total_key_count: 0,
divide_version: 0,
min_epoch: 0,
max_epoch: 0,
}
}

pub fn generate_table_with_table_ids(
#[allow(clippy::too_many_arguments)]
pub fn generate_table_with_ids_and_epochs(
id: u64,
table_prefix: u64,
left: usize,
right: usize,
epoch: u64,
table_ids: Vec<u32>,
min_epoch: u64,
max_epoch: u64,
) -> SstableInfo {
SstableInfo {
id,
Expand All @@ -530,6 +543,8 @@ pub mod tests {
stale_key_count: 0,
total_key_count: 0,
divide_version: 0,
min_epoch,
max_epoch,
}
}

Expand Down Expand Up @@ -726,6 +741,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
// trivial move.
Expand Down Expand Up @@ -753,6 +769,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&compaction, &levels_handlers);
Expand All @@ -771,6 +788,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&compaction, &levels_handlers);
Expand All @@ -791,6 +809,7 @@ pub mod tests {
&levels,
&mut levels_handlers,
&mut local_stats,
HashMap::default(),
);
assert!(compaction.is_none());
}
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Expand Up @@ -15,6 +15,7 @@
pub mod compaction_config;
mod level_selector;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};

Expand Down Expand Up @@ -121,12 +122,19 @@ impl CompactStatus {
group: &CompactionGroup,
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn LevelSelector>,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
let ret =
selector.pick_compaction(task_id, group, levels, &mut self.level_handlers, stats)?;
let ret = selector.pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
)?;
let target_level_id = ret.input.target_level;

let compression_algorithm = match ret.compression_algorithm.as_str() {
Expand Down
Expand Up @@ -319,7 +319,7 @@ impl CompactionPicker for ManualCompactionPicker {

#[cfg(test)]
pub mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use risingwave_pb::hummock::compact_task;
pub use risingwave_pb::hummock::{KeyRange, Level, LevelType};
Expand Down Expand Up @@ -1196,6 +1196,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1231,6 +1232,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1303,6 +1305,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1340,6 +1343,7 @@ pub mod tests {
&levels,
&mut levels_handler,
&mut local_stats,
HashMap::default(),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down

0 comments on commit 762fe8a

Please sign in to comment.