Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(common): Optimize null_bitmap with Set64 #8941

Merged
merged 28 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::iter::empty;
use std::marker::PhantomData;
use std::sync::Arc;

use fixedbitset::FixedBitSet;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Array, DataChunk, RowRef};
Expand Down Expand Up @@ -232,13 +231,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
JoinHashMap::with_capacity_and_hasher(build_row_count, PrecomputedBuildHasher);
let mut next_build_row_with_same_key =
ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_matched.len());
for (idx, col_null_matched) in self.null_matched.into_iter().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};

let null_matched = self.null_matched.into();

// Build hash map
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
Expand Down
11 changes: 2 additions & 9 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::marker::PhantomData;

use fixedbitset::FixedBitSet;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::RwError;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -67,13 +66,7 @@ impl<K: HashKey> LookupJoinBase<K> {
pub async fn do_execute(mut self: Box<Self>) {
let outer_side_schema = self.outer_side_input.schema().clone();

let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_safe.len());
for (idx, col_null_matched) in self.null_safe.iter().copied().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};
let null_matched: NullBitmap = self.null_safe.into();

let mut outer_side_batch_read_stream: BoxedDataChunkListStream =
utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS);
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ serde = { version = "1", features = ["derive"] }
serde_default = "0.1"
serde_json = "1"
serde_with = "2"
smallbitset = "0.6.1"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
Expand Down
8 changes: 4 additions & 4 deletions src/common/benches/bench_hash_key_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ impl HashKeyDispatcher for HashKeyBenchCaseBuilder {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let id = format!(
"{} {:?}, {} rows, Pr[null]={}",
"{} rows, {} {:?}, Pr[null]={}",
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
chunk_size,
self.describe,
calc_hash_key_kind(self.data_types()),
chunk_size,
null_ratio
);
let input_chunk = gen_chunk(self.data_types(), *chunk_size, SEED, *null_ratio);
Expand Down Expand Up @@ -194,11 +194,11 @@ fn case_builders() -> Vec<HashKeyBenchCaseBuilder> {
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int32, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 1".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int64, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 2".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Varchar],
Expand Down
93 changes: 77 additions & 16 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::hash::{BuildHasher, Hash, Hasher};
use std::io::{Cursor, Read};

use chrono::{Datelike, Timelike};
use fixedbitset::FixedBitSet;
use smallbitset::Set64;

use crate::array::serial_array::Serial;
use crate::array::{
Expand All @@ -42,6 +42,63 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::iter_util::ZipEqFast;
use crate::util::value_encoding::{deserialize_datum, serialize_datum_into};

pub static MAX_GROUP_KEYS: usize = 64;

/// Bitmap for null values in key.
/// This is specialized for key,
/// since it usually has few group keys.
#[repr(transparent)]
#[derive(Clone, Debug, PartialEq)]
pub struct NullBitmap {
inner: Set64,
}

impl NullBitmap {
fn empty() -> Self {
NullBitmap {
inner: Set64::empty(),
}
}

fn is_empty(&self) -> bool {
self.inner.is_empty()
}

fn set_true(&mut self, idx: usize) {
self.inner.add_inplace(idx);
}

fn len(&self) -> usize {
self.inner.len()
}

fn contains(&self, x: usize) -> bool {
self.inner.contains(x)
}

pub fn is_subset(&self, other: &NullBitmap) -> bool {
other.inner.contains_all(self.inner)
}
}

impl EstimateSize for NullBitmap {
fn estimated_heap_size(&self) -> usize {
0
}
}

impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for NullBitmap {
fn from(value: T) -> Self {
let mut bitmap = NullBitmap::empty();
for (idx, is_true) in value.into_iter().enumerate() {
if is_true {
bitmap.set_true(idx);
}
}
bitmap
}
}

/// A wrapper for u64 hash result.
#[derive(Default, Clone, Copy, Debug, PartialEq)]
pub struct HashCode(pub u64);
Expand Down Expand Up @@ -143,10 +200,10 @@ pub trait HashKey:
) -> ArrayResult<()>;

fn has_null(&self) -> bool {
!self.null_bitmap().is_clear()
!self.null_bitmap().is_empty()
}

fn null_bitmap(&self) -> &FixedBitSet;
fn null_bitmap(&self) -> &NullBitmap;
}

/// Designed for hash keys with at most `N` serialized bytes.
Expand All @@ -156,7 +213,7 @@ pub trait HashKey:
pub struct FixedSizeKey<const N: usize> {
key: [u8; N],
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
}

/// Designed for hash keys which can't be represented by [`FixedSizeKey`].
Expand All @@ -167,7 +224,7 @@ pub struct SerializedKey {
// Key encoding.
key: Vec<u8>,
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
}

impl<const N: usize> EstimateSize for FixedSizeKey<N> {
Expand Down Expand Up @@ -491,7 +548,7 @@ impl<'a> HashKeySerDe<'a> for ListRef<'a> {

pub struct FixedSizeKeySerializer<const N: usize> {
buffer: [u8; N],
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
data_len: usize,
hash_code: u64,
Expand All @@ -511,7 +568,7 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {
fn from_hash_code(hash_code: HashCode, _estimated_key_size: usize) -> Self {
Self {
buffer: [0u8; N],
null_bitmap: FixedBitSet::with_capacity(u8::BITS as usize),
null_bitmap: NullBitmap::empty(),
null_bitmap_idx: 0,
data_len: 0,
hash_code: hash_code.0,
Expand All @@ -528,7 +585,9 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {
self.buffer[self.data_len..(self.data_len + ret.len())].copy_from_slice(ret);
self.data_len += ret.len();
}
None => self.null_bitmap.insert(self.null_bitmap_idx),
None => {
self.null_bitmap.set_true(self.null_bitmap_idx);
}
};
self.null_bitmap_idx += 1;
}
Expand All @@ -544,7 +603,7 @@ impl<const N: usize> HashKeySerializer for FixedSizeKeySerializer<N> {

pub struct FixedSizeKeyDeserializer<const N: usize> {
cursor: Cursor<[u8; N]>,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
}

Expand Down Expand Up @@ -575,7 +634,8 @@ impl<const N: usize> HashKeyDeserializer for FixedSizeKeyDeserializer<N> {
pub struct SerializedKeySerializer {
buffer: Vec<u8>,
hash_code: u64,
null_bitmap: FixedBitSet,
null_bitmap: NullBitmap,
null_bitmap_idx: usize,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

impl HashKeySerializer for SerializedKeySerializer {
Expand All @@ -585,22 +645,23 @@ impl HashKeySerializer for SerializedKeySerializer {
Self {
buffer: Vec::with_capacity(estimated_value_encoding_size),
hash_code: hash_code.0,
null_bitmap: FixedBitSet::new(),
null_bitmap: NullBitmap::empty(),
null_bitmap_idx: 0,
}
}

fn append<'a, D: HashKeySerDe<'a>>(&mut self, data: Option<D>) {
let len_bitmap = self.null_bitmap.len();
self.null_bitmap.grow(len_bitmap + 1);
let _len_bitmap = self.null_bitmap.len();
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
match data {
Some(v) => {
serialize_datum_into(&Some(v.to_owned_scalar().into()), &mut self.buffer);
}
None => {
serialize_datum_into(&None, &mut self.buffer);
self.null_bitmap.insert(len_bitmap);
self.null_bitmap.set_true(self.null_bitmap_idx);
}
}
self.null_bitmap_idx += 1;
}

fn into_hash_key(self) -> SerializedKey {
Expand Down Expand Up @@ -696,7 +757,7 @@ impl<const N: usize> HashKey for FixedSizeKey<N> {
Ok(())
}

fn null_bitmap(&self) -> &FixedBitSet {
fn null_bitmap(&self) -> &NullBitmap {
&self.null_bitmap
}
}
Expand Down Expand Up @@ -726,7 +787,7 @@ impl HashKey for SerializedKey {
Ok(())
}

fn null_bitmap(&self) -> &FixedBitSet {
fn null_bitmap(&self) -> &NullBitmap {
&self.null_bitmap
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1247,3 +1247,8 @@
BatchExchange { order: [idx_desc.a DESC], dist: Single }
└─BatchSortAgg { group_key: [idx_desc.a], aggs: [count] }
└─BatchScan { table: idx_desc, columns: [idx_desc.a], distribution: UpstreamHashShard(idx_desc.a) }
- name: max group keys exceed 64
sql: |
create table t(v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int, v10 int, v11 int, v12 int, v13 int, v14 int, v15 int, v16 int, v17 int, v18 int, v19 int, v20 int, v21 int, v22 int, v23 int, v24 int, v25 int, v26 int, v27 int, v28 int, v29 int, v30 int, v31 int, v32 int, v33 int, v34 int, v35 int, v36 int, v37 int, v38 int, v39 int, v40 int, v41 int, v42 int, v43 int, v44 int, v45 int, v46 int, v47 int, v48 int, v49 int, v50 int, v51 int, v52 int, v53 int, v54 int, v55 int, v56 int, v57 int, v58 int, v59 int, v60 int, v61 int, v62 int, v63 int, v64 int, v65 int);
select * from t group by v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v22, v23, v24, v25, v26, v27, v28, v29, v30, v31, v32, v33, v34, v35, v36, v37, v38, v39, v40, v41, v42, v43, v44, v45, v46, v47, v48, v49, v50, v51, v52, v53, v54, v55, v56, v57, v58, v59, v60, v61, v62, v63, v64, v65;
binder_error: 'Bind error: Number of Group Keys: 65, exceeded maximum: 64'
10 changes: 10 additions & 0 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::fmt::Debug;

use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema, PG_CATALOG_SCHEMA_NAME};
use risingwave_common::error::ErrorCode::BindError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::hash::MAX_GROUP_KEYS;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{DataType as AstDataType, Distinct, Expr, Select, SelectItem};
Expand Down Expand Up @@ -178,6 +180,14 @@ impl Binder {

// Bind GROUP BY clause.
self.context.clause = Some(Clause::GroupBy);
let number_of_group_keys = select.group_by.len();
if number_of_group_keys > MAX_GROUP_KEYS {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this can cover all cases. For example, the columns in OVER (PARTITION BY <cols>) may also be used as a group key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With workaround listed below, I think we can remove this, since no more max group keys.

This comment was marked as resolved.

return Err(BindError(format!(
"Number of Group Keys: {}, exceeded maximum: {}",
number_of_group_keys, MAX_GROUP_KEYS,
))
.into());
}
let group_by = select
.group_by
.into_iter()
Expand Down
11 changes: 2 additions & 9 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use await_tree::InstrumentAwait;
use fixedbitset::FixedBitSet;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use multimap::MultiMap;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::HashKey;
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
Expand Down Expand Up @@ -538,13 +537,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.map(|&idx| original_schema[idx].clone())
.collect();

let null_matched: FixedBitSet = {
let mut null_matched = FixedBitSet::with_capacity(null_safe.len());
for (idx, col_null_matched) in null_safe.into_iter().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};
let null_matched: NullBitmap = null_safe.into();

let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
Expand Down