Skip to content

Commit

Permalink
coprocessor: avoid unnecessary vec allocation in collect_column_stats (
Browse files Browse the repository at this point in the history
…#14280)

ref #14231

When collect_column_stats handles each row, reuse column_vals and collation_key_vals to avoid allocating many small objects.

Signed-off-by: xuyifan <675434007@qq.com>
  • Loading branch information
xuyifangreeneyes committed Feb 26, 2023
1 parent f0af6ff commit 5f5bb76
Showing 1 changed file with 32 additions and 35 deletions.
67 changes: 32 additions & 35 deletions src/coprocessor/statistics/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,41 +391,36 @@ impl<S: Snapshot, F: KvFormat> RowSampleBuilder<S, F> {
is_drained = result.is_drained?;

let columns_slice = result.physical_columns.as_slice();

let mut column_vals: Vec<Vec<u8>> = vec![vec![]; self.columns_info.len()];
let mut collation_key_vals: Vec<Vec<u8>> = vec![vec![]; self.columns_info.len()];
for logical_row in &result.logical_rows {
let mut column_vals: Vec<Vec<u8>> = Vec::new();
let mut collation_key_vals: Vec<Vec<u8>> = Vec::new();
for i in 0..self.columns_info.len() {
let mut val = vec![];
column_vals[i].clear();
collation_key_vals[i].clear();
columns_slice[i].encode(
*logical_row,
&self.columns_info[i],
&mut EvalContext::default(),
&mut val,
&mut column_vals[i],
)?;
if self.columns_info[i].as_accessor().is_string_like() {
let sorted_val = match_template_collator! {
match_template_collator! {
TT, match self.columns_info[i].as_accessor().collation()? {
Collation::TT => {
let mut mut_val = &val[..];
let mut mut_val = &column_vals[i][..];
let decoded_val = table::decode_col_value(&mut mut_val, &mut EvalContext::default(), &self.columns_info[i])?;
if decoded_val == Datum::Null {
val.clone()
collation_key_vals[i].clone_from(&column_vals[i]);
} else {
// Only if the `decoded_val` is Datum::Null, `decoded_val` is a Ok(None).
// So it is safe the unwrap the Ok value.
let decoded_sorted_val = TT::sort_key(&decoded_val.as_string()?.unwrap())?;
decoded_sorted_val
TT::write_sort_key(&mut collation_key_vals[i], &decoded_val.as_string()?.unwrap())?;
}
}
}
};
collation_key_vals.push(sorted_val);
} else {
collation_key_vals.push(Vec::new());
}
read_size += val.len();
column_vals.push(val);
read_size += column_vals[i].len();
}
collector.mut_base().count += 1;
collector.collect_column_group(
Expand All @@ -434,7 +429,7 @@ impl<S: Snapshot, F: KvFormat> RowSampleBuilder<S, F> {
&self.columns_info,
&self.column_groups,
);
collector.collect_column(column_vals, collation_key_vals, &self.columns_info);
collector.collect_column(&column_vals, &collation_key_vals, &self.columns_info);
}
}

Expand Down Expand Up @@ -470,11 +465,11 @@ trait RowSampleCollector: Send {
);
fn collect_column(
&mut self,
columns_val: Vec<Vec<u8>>,
collation_keys_val: Vec<Vec<u8>>,
columns_val: &[Vec<u8>],
collation_keys_val: &[Vec<u8>],
columns_info: &[tipb::ColumnInfo],
);
fn sampling(&mut self, data: Vec<Vec<u8>>);
fn sampling(&mut self, data: &[Vec<u8>]);
fn to_proto(&mut self) -> tipb::RowSampleCollector;
fn get_reported_memory_usage(&mut self) -> usize {
self.mut_base().reported_memory_usage
Expand Down Expand Up @@ -662,22 +657,23 @@ impl RowSampleCollector for BernoulliRowSampleCollector {
}
fn collect_column(
&mut self,
columns_val: Vec<Vec<u8>>,
collation_keys_val: Vec<Vec<u8>>,
columns_val: &[Vec<u8>],
collation_keys_val: &[Vec<u8>],
columns_info: &[tipb::ColumnInfo],
) {
self.base
.collect_column(&columns_val, &collation_keys_val, columns_info);
.collect_column(columns_val, collation_keys_val, columns_info);
self.sampling(columns_val);
}
fn sampling(&mut self, data: Vec<Vec<u8>>) {
fn sampling(&mut self, data: &[Vec<u8>]) {
let cur_rng = self.base.rng.gen_range(0.0, 1.0);
if cur_rng >= self.sample_rate {
return;
}
self.base.memory_usage += data.iter().map(|x| x.capacity()).sum::<usize>();
let sample = data.to_vec();
self.base.memory_usage += sample.iter().map(|x| x.capacity()).sum::<usize>();
self.base.report_memory_usage(false);
self.samples.push(data);
self.samples.push(sample);
}
fn to_proto(&mut self) -> tipb::RowSampleCollector {
self.base.memory_usage = 0;
Expand Down Expand Up @@ -739,16 +735,16 @@ impl RowSampleCollector for ReservoirRowSampleCollector {

fn collect_column(
&mut self,
columns_val: Vec<Vec<u8>>,
collation_keys_val: Vec<Vec<u8>>,
columns_val: &[Vec<u8>],
collation_keys_val: &[Vec<u8>],
columns_info: &[tipb::ColumnInfo],
) {
self.base
.collect_column(&columns_val, &collation_keys_val, columns_info);
.collect_column(columns_val, collation_keys_val, columns_info);
self.sampling(columns_val);
}

fn sampling(&mut self, data: Vec<Vec<u8>>) {
fn sampling(&mut self, data: &[Vec<u8>]) {
// We should tolerate the abnormal case => `self.max_sample_size == 0`.
if self.max_sample_size == 0 {
return;
Expand All @@ -764,9 +760,10 @@ impl RowSampleCollector for ReservoirRowSampleCollector {
}

if need_push {
self.base.memory_usage += data.iter().map(|x| x.capacity()).sum::<usize>();
self.samples.push(Reverse((cur_rng, data)));
let sample = data.to_vec();
self.base.memory_usage += sample.iter().map(|x| x.capacity()).sum::<usize>();
self.base.report_memory_usage(false);
self.samples.push(Reverse((cur_rng, sample)));
}
}

Expand Down Expand Up @@ -1255,7 +1252,7 @@ mod tests {
for loop_i in 0..loop_cnt {
let mut collector = ReservoirRowSampleCollector::new(sample_num, 1000, 1);
for row in &nums {
collector.sampling([row.clone()].to_vec());
collector.sampling(&[row.clone()]);
}
assert_eq!(collector.samples.len(), sample_num);
for sample in &collector.samples {
Expand Down Expand Up @@ -1304,7 +1301,7 @@ mod tests {
let mut collector =
BernoulliRowSampleCollector::new(sample_num as f64 / row_num as f64, 1000, 1);
for row in &nums {
collector.sampling([row.clone()].to_vec());
collector.sampling(&[row.clone()]);
}
for sample in &collector.samples {
*item_cnt.entry(sample[0].clone()).or_insert(0) += 1;
Expand Down Expand Up @@ -1350,7 +1347,7 @@ mod tests {
// Test for ReservoirRowSampleCollector
let mut collector = ReservoirRowSampleCollector::new(sample_num, 1000, 1);
for row in &nums {
collector.sampling([row.clone()].to_vec());
collector.sampling(&[row.clone()]);
}
assert_eq!(collector.samples.len(), 0);
}
Expand All @@ -1359,7 +1356,7 @@ mod tests {
let mut collector =
BernoulliRowSampleCollector::new(sample_num as f64 / row_num as f64, 1000, 1);
for row in &nums {
collector.sampling([row.clone()].to_vec());
collector.sampling(&[row.clone()]);
}
assert_eq!(collector.samples.len(), 0);
}
Expand Down

0 comments on commit 5f5bb76

Please sign in to comment.