Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): introduce dedup cache and append-only dedup executor #8874

Merged
merged 6 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
use futures::{stream, StreamExt};
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_storage::StateStore;

use super::cache::DedupCache;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorError;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message,
PkIndices, PkIndicesRef, StreamExecutorResult,
};
use crate::task::AtomicU64Ref;

/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous
/// messages. It only accepts append-only input, and its output will be append-only as well.
pub struct AppendOnlyDedupExecutor<S: StateStore> {
input: Option<BoxedExecutor>,
state_table: StateTable<S>,
cache: DedupCache<OwnedRow>,

pk_indices: PkIndices,
identity: String,
schema: Schema,
ctx: ActorContextRef,
}

impl<S: StateStore> AppendOnlyDedupExecutor<S> {
pub fn new(
input: BoxedExecutor,
state_table: StateTable<S>,
pk_indices: PkIndices,
executor_id: u64,
ctx: ActorContextRef,
watermark_epoch: AtomicU64Ref,
) -> Self {
let schema = input.schema().clone();
Self {
input: Some(input),
state_table,
cache: DedupCache::new(watermark_epoch),
pk_indices,
identity: format!("AppendOnlyDedupExecutor {:X}", executor_id),
schema,
ctx,
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn executor_inner(mut self) {
let mut input = self.input.take().unwrap().execute();

// Consume the first barrier message and initialize state table.
let barrier = expect_first_barrier(&mut input).await?;
self.state_table.init_epoch(barrier.epoch);

// The first barrier message should be propagated.
yield Message::Barrier(barrier);

#[for_await]
for msg in input {
match msg? {
Message::Chunk(chunk) => {
// Append-only dedup executor only receives INSERT messages.
debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));

// Extract pk for all rows (regardless of visibility) in the chunk.
let keys = chunk
.data_chunk()
.rows_with_holes()
.map(|row_ref| {
row_ref.map(|row| row.project(self.pk_indices()).to_owned_row())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make populate_cache directly take impl Row, so that we don't need to allocate here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll dedup_insert the key into the cache later, so it seems inevitable to make key an owned value.

for key in keys {
match key {
Some(key) => {
if self.cache.dedup_insert(key) {

})
.collect_vec();

// Ensure that if a key for a visible row exists before, then it is in the
// cache, by querying the storage.
self.populate_cache(keys.iter().flatten()).await?;

// Now check for duplication and insert new keys into the cache.
let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
for key in keys {
match key {
Some(key) => {
if self.cache.dedup_insert(key) {
// The key doesn't exist before. The row should be visible.
vis_builder.append(true);
} else {
// The key exists before. The row shouldn't be visible.
vis_builder.append(false);
}
}
None => {
// The row is originally invisible.
vis_builder.append(false);
}
}
}

let vis = vis_builder.finish();
if vis.count_ones() > 0 {
// Construct the new chunk and write the data to state table.
let (ops, columns, _) = chunk.into_inner();
let chunk = StreamChunk::new(ops, columns, Some(vis));
self.state_table.write_chunk(chunk.clone());

yield Message::Chunk(chunk);
}
}

Message::Barrier(barrier) => {
if barrier.checkpoint {
self.state_table.commit(barrier.epoch).await?;
} else {
self.state_table.commit_no_data_expected(barrier.epoch);
}

if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let (_prev_vnode_bitmap, cache_may_stale) =
self.state_table.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.cache.clear();
}
}

self.cache.evict();

yield Message::Barrier(barrier);
}

Message::Watermark(watermark) => {
yield Message::Watermark(watermark);
}
}
}
}

/// Populate the cache with keys that exist in storage before.
pub async fn populate_cache<'a>(
&mut self,
keys: impl Iterator<Item = &'a OwnedRow>,
) -> StreamExecutorResult<()> {
let mut futures = vec![];
for key in keys {
if self.cache.contains(key) {
continue;
}

let table = &self.state_table;
futures.push(async move { (key, table.get_encoded_row(key).await) });
}

let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
while let Some(result) = buffered.next().await {
let (key, value) = result;
if value?.is_some() {
// Only insert into the cache when we have this key in storage.
self.cache.insert(key.to_owned());
}
}

Ok(())
}
}

impl<S: StateStore> Executor for AppendOnlyDedupExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.executor_inner().boxed()
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&self.pk_indices
}

fn identity(&self) -> &str {
&self.identity
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_storage::memory::MemoryStateStore;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::executor::test_utils::MockSource;
use crate::executor::ActorContext;

#[tokio::test]
async fn test_dedup_executor() {
let table_id = TableId::new(1);
let column_descs = vec![
ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
];
let schema = Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]);
let pk_indices = vec![0];
let order_types = vec![OrderType::ascending()];

let state_store = MemoryStateStore::new();
let state_table = StateTable::new_without_distribution(
state_store,
table_id,
column_descs,
order_types,
pk_indices.clone(),
)
.await;

let (mut tx, input) = MockSource::channel(schema, pk_indices.clone());
let mut dedup_executor = Box::new(AppendOnlyDedupExecutor::new(
Box::new(input),
state_table,
pk_indices,
1,
ActorContext::create(123),
Arc::new(AtomicU64::new(0)),
))
.execute();

tx.push_barrier(1, false);
dedup_executor.next().await.unwrap().unwrap();

let chunk = StreamChunk::from_pretty(
" I I
+ 1 1
+ 2 2 D
+ 1 7",
);
tx.push_chunk(chunk);
let msg = dedup_executor.next().await.unwrap().unwrap();
assert_eq!(
msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 1 1
+ 2 2 D
+ 1 7 D",
)
);

tx.push_barrier(2, false);
dedup_executor.next().await.unwrap().unwrap();

let chunk = StreamChunk::from_pretty(
" I I
+ 3 9
+ 2 5
+ 1 20",
);
tx.push_chunk(chunk);
let msg = dedup_executor.next().await.unwrap().unwrap();
assert_eq!(
msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 3 9
+ 2 5
+ 1 20 D",
)
);
}
}
73 changes: 73 additions & 0 deletions src/stream/src/executor/dedup/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::hash::Hash;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

use crate::cache::{new_unbounded, ExecutorCache};
use crate::task::AtomicU64Ref;

/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
/// accepts a key without a value. This could be refined in the future to support k-v pairs.
pub struct DedupCache<K: Hash + Eq> {
inner: ExecutorCache<K, ()>,
}

impl<K: Hash + Eq> DedupCache<K> {
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = ExecutorCache::new(new_unbounded(watermark_epoch));
Self { inner: cache }
}

/// Insert a `key` into the cache only if the `key` doesn't exist in the cache before. Return
/// whether the `key` is successfully inserted.
pub fn dedup_insert(&mut self, key: K) -> bool {
if !self.inner.contains(&key) {
self.inner.push(key, ());
true
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved
} else {
false
}
}

/// Insert a `key` into the cache without checking for duplication.
pub fn insert(&mut self, key: K) {
self.inner.push(key, ());
}

/// Check whether the given key is in the cache.
pub fn contains(&self, key: &K) -> bool {
self.inner.contains(key)
}

/// Evict the inner LRU cache according to the watermark epoch.
pub fn evict(&mut self) {
self.inner.evict()
}

/// Clear everything in the cache.
pub fn clear(&mut self) {
self.inner.clear()
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use super::DedupCache;

#[test]
fn test_dedup_cache() {
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)));

cache.insert(10);
assert!(cache.contains(&10));
assert!(!cache.dedup_insert(10));

assert!(cache.dedup_insert(20));
assert!(cache.contains(&20));
assert!(!cache.dedup_insert(20));

cache.clear();
assert!(!cache.contains(&10));
assert!(!cache.contains(&20));
}
}
4 changes: 4 additions & 0 deletions src/stream/src/executor/dedup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod append_only_dedup;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved
mod cache;

pub use append_only_dedup::AppendOnlyDedupExecutor;
2 changes: 2 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod aggregation;
mod barrier_recv;
mod batch_query;
mod chain;
mod dedup;
mod dispatch;
pub mod dml;
mod dynamic_filter;
Expand Down Expand Up @@ -106,6 +107,7 @@ pub use backfill::*;
pub use barrier_recv::BarrierRecvExecutor;
pub use batch_query::BatchQueryExecutor;
pub use chain::ChainExecutor;
pub use dedup::AppendOnlyDedupExecutor;
pub use dispatch::{DispatchExecutor, DispatcherImpl};
pub use dynamic_filter::DynamicFilterExecutor;
pub use error::{StreamExecutorError, StreamExecutorResult};
Expand Down