diff --git a/Cargo.lock b/Cargo.lock index d35714277565..71eef8ad0ea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5648,6 +5648,7 @@ dependencies = [ "criterion", "darwin-libproc", "derivative", + "easy-ext", "fixedbitset", "futures", "futures-async-stream", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index c6a3fe1393e4..567c9e8aa796 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -28,6 +28,7 @@ clap = { version = "3", features = ["derive"] } comfy-table = "6" crc32fast = "1" derivative = "2" +easy-ext = "1" fixedbitset = { version = "0.4", features = ["std"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = "0.2" diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs new file mode 100644 index 000000000000..c007138feb76 --- /dev/null +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -0,0 +1,34 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::RangeInclusive; + +use crate::buffer::Bitmap; +use crate::hash::VirtualNode; + +/// An extension trait for `Bitmap` to support virtual node operations. +#[easy_ext::ext(VnodeBitmapExt)] +impl Bitmap { + /// Enumerates the virtual nodes set to 1 in the bitmap. + pub fn iter_vnodes(&self) -> impl Iterator + '_ { + self.iter_ones().map(VirtualNode::from_index) + } + + /// Returns an iterator which yields the position ranges of continuous virtual nodes set to 1 in + /// the bitmap. + pub fn vnode_ranges(&self) -> impl Iterator> + '_ { + self.high_ranges() + .map(|r| (VirtualNode::from_index(*r.start())..=VirtualNode::from_index(*r.end()))) + } +} diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index b9b9f467da3b..159c50cca880 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto}; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; +use super::bitmap::VnodeBitmapExt; use super::vnode::{ParallelUnitId, VirtualNode}; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::util::compress::compress_data; @@ -122,9 +123,9 @@ impl VnodeMapping { /// Returns `None` if the no virtual node is set in the bitmap. pub fn get_matched(&self, bitmap: &Bitmap) -> Option { bitmap - .iter_ones() + .iter_vnodes() .next() // only need to check the first one - .map(|i| self.get(VirtualNode::from_index(i))) + .map(|v| self.get(v)) } /// Iterate over all items in this mapping, in the order of vnodes. diff --git a/src/common/src/hash/consistent_hash/mod.rs b/src/common/src/hash/consistent_hash/mod.rs index 85339419f6e4..02d077b077dc 100644 --- a/src/common/src/hash/consistent_hash/mod.rs +++ b/src/common/src/hash/consistent_hash/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod bitmap; pub mod mapping; pub mod vnode; diff --git a/src/common/src/hash/mod.rs b/src/common/src/hash/mod.rs index 91f3e917b348..58dbada538a7 100644 --- a/src/common/src/hash/mod.rs +++ b/src/common/src/hash/mod.rs @@ -16,6 +16,7 @@ mod consistent_hash; // TODO: move this to a separate module mod dispatcher; mod key; +pub use consistent_hash::bitmap::*; pub use consistent_hash::mapping::*; pub use consistent_hash::vnode::*; pub use dispatcher::HashKeyDispatcher; diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index c9b0ab4e120e..b8227c72a788 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -28,7 +28,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, Schema, TableId, TableOption, }; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowDeserializer, RowExt}; use risingwave_common::util::ordered::*; use risingwave_common::util::sort_util::OrderType; @@ -391,9 +391,9 @@ impl StorageTable { // distributed tables. assert_eq!(vnode_hint.unwrap_or(DEFAULT_VNODE), DEFAULT_VNODE); - Either::Left(self.vnodes.high_ranges().map(|r| { - let start = Included(VirtualNode::from_index(*r.start()).to_be_bytes().to_vec()); - let end = end_bound_of_prefix(&VirtualNode::from_index(*r.end()).to_be_bytes()); + Either::Left(self.vnodes.vnode_ranges().map(|r| { + let start = Included(r.start().to_be_bytes().to_vec()); + let end = end_bound_of_prefix(&r.end().to_be_bytes()); assert_matches!(end, Excluded(_) | Unbounded); (start, end) })) @@ -403,7 +403,7 @@ impl StorageTable { // If `vnode_hint` is set, we can only access this single vnode. Some(vnode) => Either::Left(std::iter::once(vnode)), // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.vnodes.iter_ones().map(VirtualNode::from_index)), + None => Either::Right(self.vnodes.iter_vnodes()), }; Either::Right( vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())), diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3837a8de9eb0..cbe47e123fd0 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -23,7 +23,7 @@ use itertools::{izip, Itertools}; use risingwave_common::array::{Op, StreamChunk, Vis}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{get_dist_key_in_pk_indices, ColumnDesc, TableId, TableOption}; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowDeserializer, RowExt}; use risingwave_common::types::ScalarImpl; use risingwave_common::util::epoch::EpochPair; @@ -812,7 +812,7 @@ impl StateTable { } else { vec![] }; - for vnode in self.vnodes.iter_ones() { + for vnode in self.vnodes.iter_vnodes() { let mut range_begin = vnode.to_be_bytes().to_vec(); let mut range_end = range_begin.clone(); range_begin.extend(&range_begin_suffix); diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 1f83825522b2..a52b7f0bed4a 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -21,7 +21,7 @@ use risingwave_common::array::{Array, ArrayImpl, DataChunk, Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{once, OwnedRow as RowData, Row}; use risingwave_common::types::{DataType, Datum, ScalarImpl, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqDebug; @@ -386,11 +386,9 @@ impl DynamicFilterExecutor { let range = (Self::to_row_bound(range.0), Self::to_row_bound(range.1)); // TODO: prefetching for append-only case. - for vnode in self.left_table.vnodes().iter_ones() { - let row_stream = self - .left_table - .iter_with_pk_range(&range, VirtualNode::from_index(vnode)) - .await?; + for vnode in self.left_table.vnodes().iter_vnodes() { + let row_stream = + self.left_table.iter_with_pk_range(&range, vnode).await?; pin_mut!(row_stream); while let Some(res) = row_stream.next().await { let row = res?; diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 753945f2a6d0..66e29f16045d 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -20,7 +20,7 @@ use futures_async_stream::try_stream; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{self, AscentOwnedRow, OwnedRow, Row, RowExt}; use risingwave_common::types::{ScalarImpl, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -259,7 +259,7 @@ impl SortExecutor { curr_vnode_bitmap.to_owned() }; let mut values_per_vnode = Vec::new(); - for owned_vnode in newly_owned_vnodes.iter_ones() { + for owned_vnode in newly_owned_vnodes.iter_vnodes() { let value_iter = self .state_table .iter_with_pk_range( @@ -267,7 +267,7 @@ impl SortExecutor { Bound::::Unbounded, Bound::::Unbounded, ), - VirtualNode::from_index(owned_vnode), + owned_vnode, ) .await?; let value_iter = Box::pin(value_iter); diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index e7d96b8ffdb7..5b24422b009e 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -22,7 +22,7 @@ use futures_async_stream::for_await; use gen_iter::GenIter; use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::Schema; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{self, AscentOwnedRow, OwnedRow, Row, RowExt}; use risingwave_common::types::{ScalarImpl, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -77,11 +77,8 @@ impl SortBuffer { Bound::::Unbounded, Bound::::Unbounded, ); - let streams = stream::iter(vnodes.iter_ones()) - .map(|vnode| { - let vnode = VirtualNode::from_index(vnode); - state_table.iter_with_pk_range(&pk_range, vnode) - }) + let streams = stream::iter(vnodes.iter_vnodes()) + .map(|vnode| state_table.iter_with_pk_range(&pk_range, vnode)) .buffer_unordered(10) .try_collect::>() .await? diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 9ecd56330b39..5c9be4aba4b9 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -18,7 +18,7 @@ use futures::future::join_all; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::{bail, row}; @@ -212,8 +212,8 @@ impl WatermarkFilterExecutor { last_checkpoint_watermark = current_watermark.clone(); // Persist the watermark when checkpoint arrives. let vnodes = table.get_vnodes(); - for vnode in vnodes.iter_ones() { - let pk = Some(ScalarImpl::Int16(vnode as _)); + for vnode in vnodes.iter_vnodes() { + let pk = Some(ScalarImpl::Int16(vnode.to_scalar())); let row = [pk, Some(current_watermark.clone())]; // FIXME(yuhao): use upsert. table.insert(row);