diff --git a/Cargo.lock b/Cargo.lock index 4b0ea408da2d..d6446d6ffd93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5903,6 +5903,7 @@ dependencies = [ "serde_default", "serde_json", "serde_with 2.3.1", + "smallbitset", "static_assertions", "strum", "strum_macros", @@ -7448,6 +7449,16 @@ dependencies = [ "futures-io", ] +[[package]] +name = "smallbitset" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93fb6dc506bf9247d497b1cd9748015f444b9b16da6da79484f54b20df49a7c" +dependencies = [ + "num", + "paste", +] + [[package]] name = "smallvec" version = "1.10.0" @@ -8909,6 +8920,7 @@ dependencies = [ "memchr", "miniz_oxide", "multimap", + "num", "num-bigint", "num-integer", "num-traits", diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index bb4fc4ec5cc6..2b4957d428b8 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -18,7 +18,6 @@ use std::iter::empty; use std::marker::PhantomData; use std::sync::Arc; -use fixedbitset::FixedBitSet; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{Array, DataChunk, RowRef}; @@ -232,13 +231,8 @@ impl HashJoinExecutor { JoinHashMap::with_capacity_and_hasher(build_row_count, PrecomputedBuildHasher); let mut next_build_row_with_same_key = ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?; - let null_matched = { - let mut null_matched = FixedBitSet::with_capacity(self.null_matched.len()); - for (idx, col_null_matched) in self.null_matched.into_iter().enumerate() { - null_matched.set(idx, col_null_matched); - } - null_matched - }; + + let null_matched = self.null_matched.into(); // Build hash map for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { diff --git a/src/batch/src/executor/join/lookup_join_base.rs b/src/batch/src/executor/join/lookup_join_base.rs index 7dccc98a1eb7..83c14bb10bb4 100644 --- a/src/batch/src/executor/join/lookup_join_base.rs +++ b/src/batch/src/executor/join/lookup_join_base.rs @@ -14,14 +14,13 @@ use std::marker::PhantomData; -use fixedbitset::FixedBitSet; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::error::RwError; -use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; +use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher}; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -67,13 +66,7 @@ impl LookupJoinBase { pub async fn do_execute(mut self: Box) { let outer_side_schema = self.outer_side_input.schema().clone(); - let null_matched = { - let mut null_matched = FixedBitSet::with_capacity(self.null_safe.len()); - for (idx, col_null_matched) in self.null_safe.iter().copied().enumerate() { - null_matched.set(idx, col_null_matched); - } - null_matched - }; + let null_matched: NullBitmap = self.null_safe.into(); let mut outer_side_batch_read_stream: BoxedDataChunkListStream = utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS); diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 4c3bbf316b89..dc7b0189cdc4 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -65,6 +65,7 @@ serde = { version = "1", features = ["derive"] } serde_default = "0.1" serde_json = "1" serde_with = "2" +smallbitset = "0.6.1" static_assertions = "1" strum = "0.24" strum_macros = "0.24" diff --git a/src/common/benches/bench_hash_key_encoding.rs b/src/common/benches/bench_hash_key_encoding.rs index 104fc588161d..d75e17ce32ba 100644 --- a/src/common/benches/bench_hash_key_encoding.rs +++ b/src/common/benches/bench_hash_key_encoding.rs @@ -50,10 +50,10 @@ impl HashKeyDispatcher for HashKeyBenchCaseBuilder { for null_ratio in NULL_RATIOS { for chunk_size in CHUNK_SIZES { let id = format!( - "{} {:?}, {} rows, Pr[null]={}", + "{} rows, {} {:?}, Pr[null]={}", + chunk_size, self.describe, calc_hash_key_kind(self.data_types()), - chunk_size, null_ratio ); let input_chunk = gen_chunk(self.data_types(), *chunk_size, SEED, *null_ratio); @@ -194,11 +194,11 @@ fn case_builders() -> Vec { }, HashKeyBenchCaseBuilder { data_types: vec![DataType::Int32, DataType::Int32, DataType::Int32], - describe: "composite fixed".to_string(), + describe: "composite fixed, case 1".to_string(), }, HashKeyBenchCaseBuilder { data_types: vec![DataType::Int32, DataType::Int64, DataType::Int32], - describe: "composite fixed".to_string(), + describe: "composite fixed, case 2".to_string(), }, HashKeyBenchCaseBuilder { data_types: vec![DataType::Int32, DataType::Varchar], diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 928cb3cbc487..608999fec600 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -28,7 +28,7 @@ use std::hash::{BuildHasher, Hash, Hasher}; use std::io::{Cursor, Read}; use chrono::{Datelike, Timelike}; -use fixedbitset::FixedBitSet; +use smallbitset::Set64; use crate::array::serial_array::Serial; use crate::array::{ @@ -42,6 +42,59 @@ use crate::util::hash_util::Crc32FastBuilder; use crate::util::iter_util::ZipEqFast; use crate::util::value_encoding::{deserialize_datum, serialize_datum_into}; +pub static MAX_GROUP_KEYS: usize = 64; + +/// Bitmap for null values in key. +/// This is specialized for key, +/// since it usually has few group keys. +#[repr(transparent)] +#[derive(Clone, Debug, PartialEq)] +pub struct NullBitmap { + inner: Set64, +} + +impl NullBitmap { + fn empty() -> Self { + NullBitmap { + inner: Set64::empty(), + } + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + fn set_true(&mut self, idx: usize) { + self.inner.add_inplace(idx); + } + + fn contains(&self, x: usize) -> bool { + self.inner.contains(x) + } + + pub fn is_subset(&self, other: &NullBitmap) -> bool { + other.inner.contains_all(self.inner) + } +} + +impl EstimateSize for NullBitmap { + fn estimated_heap_size(&self) -> usize { + 0 + } +} + +impl + IntoIterator> From for NullBitmap { + fn from(value: T) -> Self { + let mut bitmap = NullBitmap::empty(); + for (idx, is_true) in value.into_iter().enumerate() { + if is_true { + bitmap.set_true(idx); + } + } + bitmap + } +} + /// A wrapper for u64 hash result. #[derive(Default, Clone, Copy, Debug, PartialEq)] pub struct HashCode(pub u64); @@ -143,10 +196,10 @@ pub trait HashKey: ) -> ArrayResult<()>; fn has_null(&self) -> bool { - !self.null_bitmap().is_clear() + !self.null_bitmap().is_empty() } - fn null_bitmap(&self) -> &FixedBitSet; + fn null_bitmap(&self) -> &NullBitmap; } /// Designed for hash keys with at most `N` serialized bytes. @@ -156,7 +209,7 @@ pub trait HashKey: pub struct FixedSizeKey { key: [u8; N], hash_code: u64, - null_bitmap: FixedBitSet, + null_bitmap: NullBitmap, } /// Designed for hash keys which can't be represented by [`FixedSizeKey`]. @@ -167,7 +220,7 @@ pub struct SerializedKey { // Key encoding. key: Vec, hash_code: u64, - null_bitmap: FixedBitSet, + null_bitmap: NullBitmap, } impl EstimateSize for FixedSizeKey { @@ -491,7 +544,7 @@ impl<'a> HashKeySerDe<'a> for ListRef<'a> { pub struct FixedSizeKeySerializer { buffer: [u8; N], - null_bitmap: FixedBitSet, + null_bitmap: NullBitmap, null_bitmap_idx: usize, data_len: usize, hash_code: u64, @@ -511,7 +564,7 @@ impl HashKeySerializer for FixedSizeKeySerializer { fn from_hash_code(hash_code: HashCode, _estimated_key_size: usize) -> Self { Self { buffer: [0u8; N], - null_bitmap: FixedBitSet::with_capacity(u8::BITS as usize), + null_bitmap: NullBitmap::empty(), null_bitmap_idx: 0, data_len: 0, hash_code: hash_code.0, @@ -528,7 +581,9 @@ impl HashKeySerializer for FixedSizeKeySerializer { self.buffer[self.data_len..(self.data_len + ret.len())].copy_from_slice(ret); self.data_len += ret.len(); } - None => self.null_bitmap.insert(self.null_bitmap_idx), + None => { + self.null_bitmap.set_true(self.null_bitmap_idx); + } }; self.null_bitmap_idx += 1; } @@ -544,7 +599,7 @@ impl HashKeySerializer for FixedSizeKeySerializer { pub struct FixedSizeKeyDeserializer { cursor: Cursor<[u8; N]>, - null_bitmap: FixedBitSet, + null_bitmap: NullBitmap, null_bitmap_idx: usize, } @@ -575,7 +630,8 @@ impl HashKeyDeserializer for FixedSizeKeyDeserializer { pub struct SerializedKeySerializer { buffer: Vec, hash_code: u64, - null_bitmap: FixedBitSet, + null_bitmap: NullBitmap, + null_bitmap_idx: usize, } impl HashKeySerializer for SerializedKeySerializer { @@ -585,22 +641,22 @@ impl HashKeySerializer for SerializedKeySerializer { Self { buffer: Vec::with_capacity(estimated_value_encoding_size), hash_code: hash_code.0, - null_bitmap: FixedBitSet::new(), + null_bitmap: NullBitmap::empty(), + null_bitmap_idx: 0, } } fn append<'a, D: HashKeySerDe<'a>>(&mut self, data: Option) { - let len_bitmap = self.null_bitmap.len(); - self.null_bitmap.grow(len_bitmap + 1); match data { Some(v) => { serialize_datum_into(&Some(v.to_owned_scalar().into()), &mut self.buffer); } None => { serialize_datum_into(&None, &mut self.buffer); - self.null_bitmap.insert(len_bitmap); + self.null_bitmap.set_true(self.null_bitmap_idx); } } + self.null_bitmap_idx += 1; } fn into_hash_key(self) -> SerializedKey { @@ -696,7 +752,7 @@ impl HashKey for FixedSizeKey { Ok(()) } - fn null_bitmap(&self) -> &FixedBitSet { + fn null_bitmap(&self) -> &NullBitmap { &self.null_bitmap } } @@ -726,7 +782,7 @@ impl HashKey for SerializedKey { Ok(()) } - fn null_bitmap(&self) -> &FixedBitSet { + fn null_bitmap(&self) -> &NullBitmap { &self.null_bitmap } } diff --git a/src/frontend/planner_test/tests/testdata/agg.yaml b/src/frontend/planner_test/tests/testdata/agg.yaml index 3d2ff0f1f709..4c97695dcb6f 100644 --- a/src/frontend/planner_test/tests/testdata/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/agg.yaml @@ -1247,3 +1247,8 @@ BatchExchange { order: [idx_desc.a DESC], dist: Single } └─BatchSortAgg { group_key: [idx_desc.a], aggs: [count] } └─BatchScan { table: idx_desc, columns: [idx_desc.a], distribution: UpstreamHashShard(idx_desc.a) } +- name: max group keys exceed 64 + sql: | + create table t(v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int, v10 int, v11 int, v12 int, v13 int, v14 int, v15 int, v16 int, v17 int, v18 int, v19 int, v20 int, v21 int, v22 int, v23 int, v24 int, v25 int, v26 int, v27 int, v28 int, v29 int, v30 int, v31 int, v32 int, v33 int, v34 int, v35 int, v36 int, v37 int, v38 int, v39 int, v40 int, v41 int, v42 int, v43 int, v44 int, v45 int, v46 int, v47 int, v48 int, v49 int, v50 int, v51 int, v52 int, v53 int, v54 int, v55 int, v56 int, v57 int, v58 int, v59 int, v60 int, v61 int, v62 int, v63 int, v64 int, v65 int); + select * from t group by v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v22, v23, v24, v25, v26, v27, v28, v29, v30, v31, v32, v33, v34, v35, v36, v37, v38, v39, v40, v41, v42, v43, v44, v45, v46, v47, v48, v49, v50, v51, v52, v53, v54, v55, v56, v57, v58, v59, v60, v61, v62, v63, v64, v65; + binder_error: 'Bind error: Number of Group Keys: 65, exceeded maximum: 64' diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index ad1f11356175..eacf17b501a3 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -16,7 +16,9 @@ use std::fmt::Debug; use itertools::Itertools; use risingwave_common::catalog::{Field, Schema, PG_CATALOG_SCHEMA_NAME}; +use risingwave_common::error::ErrorCode::BindError; use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::hash::MAX_GROUP_KEYS; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{DataType as AstDataType, Distinct, Expr, Select, SelectItem}; @@ -178,6 +180,14 @@ impl Binder { // Bind GROUP BY clause. self.context.clause = Some(Clause::GroupBy); + let number_of_group_keys = select.group_by.len(); + if number_of_group_keys > MAX_GROUP_KEYS { + return Err(BindError(format!( + "Number of Group Keys: {}, exceeded maximum: {}", + number_of_group_keys, MAX_GROUP_KEYS, + )) + .into()); + } let group_by = select .group_by .into_iter() diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 304edb989a94..8d6aba84a235 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -17,14 +17,13 @@ use std::sync::Arc; use std::time::Duration; use await_tree::InstrumentAwait; -use fixedbitset::FixedBitSet; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; -use risingwave_common::hash::HashKey; +use risingwave_common::hash::{HashKey, NullBitmap}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; @@ -538,13 +537,7 @@ impl HashJoinExecutor { /// Data types of the join key columns join_key_data_types: Vec, /// Null safe bitmap for each join pair - null_matched: FixedBitSet, + null_matched: NullBitmap, /// The memcomparable serializer of primary key. pk_serializer: OrderedRowSerde, /// State table. Contains the data from upstream. @@ -248,7 +247,7 @@ impl JoinHashMap { degree_all_data_types: Vec, degree_table: StateTable, degree_pk_indices: Vec, - null_matched: FixedBitSet, + null_matched: NullBitmap, need_degree_table: bool, pk_contained_in_jk: bool, metrics: Arc, @@ -562,7 +561,7 @@ impl JoinHashMap { self.inner.len() } - pub fn null_matched(&self) -> &FixedBitSet { + pub fn null_matched(&self) -> &NullBitmap { &self.null_matched } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index d1d57fd0ac90..d14210d3f116 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -64,6 +64,7 @@ madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "i memchr = { version = "2" } miniz_oxide = { version = "0.6", default-features = false, features = ["with-alloc"] } multimap = { version = "0.8" } +num = { version = "0.4" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } @@ -158,6 +159,7 @@ madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "i memchr = { version = "2" } miniz_oxide = { version = "0.6", default-features = false, features = ["with-alloc"] } multimap = { version = "0.8" } +num = { version = "0.4" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] }