diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 20e9cf21966f..a8f184af245a 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -530,8 +530,6 @@ where // If we have accumulated only a narrow band of keyspace, create an // image layer. Otherwise write a delta layer. - // FIXME: deal with the case of lots of values for same key - // FIXME: we are ignoring images here. Did we already divide the work // so that we won't encounter them here? @@ -550,39 +548,94 @@ where let mut new_jobs = Vec::new(); // Slide a window through the keyspace - let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream)); + let mut key_accum = + std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size)); let mut all_in_window: bool = false; let mut window = Window::new(); + + // Helper function to create a job for a new delta layer with given key-lsn + // rectangle. + let create_delta_job = |key_range, lsn_range: &Range, new_jobs: &mut Vec<_>| { + // The inputs for the job are all the input layers of the original job that + // overlap with the rectangle. + let batch_layers: Vec = job + .input_layers + .iter() + .filter(|layer_id| { + overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) + }) + .cloned() + .collect(); + assert!(!batch_layers.is_empty()); + new_jobs.push(CompactionJob { + key_range, + lsn_range: lsn_range.clone(), + strategy: CompactionStrategy::CreateDelta, + input_layers: batch_layers, + completed: false, + }); + }; + loop { - if all_in_window && window.elems.is_empty() { + if all_in_window && window.is_empty() { // All done! break; } + + // If we now have enough keyspace for next delta layer in the window, create a + // new delta layer if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) { - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) - }) - .cloned() - .collect(); - assert!(!batch_layers.is_empty()); - new_jobs.push(CompactionJob { - key_range, - lsn_range: job.lsn_range.clone(), - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); - } else { - assert!(!all_in_window); - if let Some(next_key) = key_accum.next().await.transpose()? { - window.feed(next_key.key, next_key.size); - } else { + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); + continue; + } + assert!(!all_in_window); + + // Process next key in the key space + match key_accum.next().await.transpose()? { + None => { all_in_window = true; } + Some(next_key) if next_key.partition_lsns.is_empty() => { + // Normal case: extend the window by the key + window.feed(next_key.key, next_key.size); + } + Some(next_key) => { + // A key with too large size impact for a single delta layer. This + // case occurs if you make a huge number of updates for a single key. + // + // Drain the window with has_more = false to make a clean cut before + // the key, and then make dedicated delta layers for the single key. + // + // We cannot cluster the key with the others, because we don't want + // layer files to overlap with each other in the lsn,key space (no + // overlaps for the rectangles). + let key = next_key.key; + debug!("key {key} with size impact larger than the layer size"); + while !window.is_empty() { + let has_more = false; + let key_range = window.choose_next_delta(self.target_file_size, has_more) + .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window"); + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); + } + + // Not really required: but here for future resilience: + // We make a "gap" here, so any structure the window holds should + // probably be reset. + window = Window::new(); + + let mut prior_lsn = job.lsn_range.start; + let mut lsn_ranges = Vec::new(); + for (lsn, _size) in next_key.partition_lsns.iter() { + lsn_ranges.push(prior_lsn..*lsn); + prior_lsn = *lsn; + } + lsn_ranges.push(prior_lsn..job.lsn_range.end); + for lsn_range in lsn_ranges { + let key_range = key..key.next(); + create_delta_job(key_range, &lsn_range, &mut new_jobs); + } + } } } @@ -803,6 +856,10 @@ where self.elems.front().unwrap().accum_size - self.splitoff_size } + fn is_empty(&self) -> bool { + self.elems.is_empty() + } + fn commit_upto(&mut self, mut upto: usize) { while upto > 1 { let popped = self.elems.pop_front().unwrap(); diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 06454ee1d050..2c922b0a4990 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -235,9 +235,14 @@ pub struct KeySize { pub key: K, pub num_values: u64, pub size: u64, + /// The lsns to partition at (if empty then no per-lsn partitioning) + pub partition_lsns: Vec<(Lsn, u64)>, } -pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream, E>> +pub fn accum_key_values<'a, I, K, D, E>( + input: I, + target_size: u64, +) -> impl Stream, E>> where K: Eq + PartialOrd + Display + Copy, I: Stream>, @@ -249,25 +254,35 @@ where if let Some(first) = input.next().await { let first = first?; + let mut part_size = first.size(); let mut accum: KeySize = KeySize { key: first.key(), num_values: 1, - size: first.size(), + size: part_size, + partition_lsns: Vec::new(), }; let mut last_key = accum.key; while let Some(this) = input.next().await { let this = this?; if this.key() == accum.key { - accum.size += this.size(); + let add_size = this.size(); + if part_size + add_size > target_size { + accum.partition_lsns.push((this.lsn(), part_size)); + part_size = 0; + } + part_size += add_size; + accum.size += add_size; accum.num_values += 1; } else { assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key); last_key = accum.key; yield accum; + part_size = this.size(); accum = KeySize { key: this.key(), num_values: 1, - size: this.size(), + size: part_size, + partition_lsns: Vec::new(), }; } } diff --git a/pageserver/compaction/src/identify_levels.rs b/pageserver/compaction/src/identify_levels.rs index 98dd46925ca9..1853afffdd9d 100644 --- a/pageserver/compaction/src/identify_levels.rs +++ b/pageserver/compaction/src/identify_levels.rs @@ -184,6 +184,12 @@ impl Level { } let mut events: Vec> = Vec::new(); for (idx, l) in self.layers.iter().enumerate() { + let key_range = l.key_range(); + if key_range.end == key_range.start.next() && l.is_delta() { + // Ignore single-key delta layers as they can be stacked on top of each other + // as that is the only way to cut further. + continue; + } events.push(Event { key: l.key_range().start, layer_idx: idx, diff --git a/pageserver/compaction/tests/tests.rs b/pageserver/compaction/tests/tests.rs index 7aa20e6863b4..bd8b54a286d8 100644 --- a/pageserver/compaction/tests/tests.rs +++ b/pageserver/compaction/tests/tests.rs @@ -20,10 +20,6 @@ pub(crate) fn setup_logging() { /// even if we produce an extremely narrow delta layer, spanning just that one /// key, we still too many records to fit in the target file size. We need to /// split in the LSN dimension too in that case. -/// -/// TODO: The code to avoid this problem has not been implemented yet! So the -/// assertion currently fails, but we need to make it not fail. -#[ignore] #[tokio::test] async fn test_many_updates_for_single_key() { setup_logging(); @@ -43,9 +39,9 @@ async fn test_many_updates_for_single_key() { } for l in executor.live_layers.iter() { assert!(l.file_size() < executor.target_file_size * 2); - // sanity check that none of the delta layers are stupidly small either + // Sanity check that none of the delta layers are empty either. if l.is_delta() { - assert!(l.file_size() > executor.target_file_size / 2); + assert!(l.file_size() > 0); } } }