From 23bc699d1c1ffe54f4bc42bb4cfe13baf95545f8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 Mar 2023 15:22:13 +0800 Subject: [PATCH 1/9] feat(sink): introduce log trait for sink --- src/stream/src/common/log_store/mod.rs | 270 +++++++++++++++++++++++++ src/stream/src/common/mod.rs | 1 + src/stream/src/executor/error.rs | 11 + src/stream/src/executor/sink.rs | 150 ++++++++++---- 4 files changed, 391 insertions(+), 41 deletions(-) create mode 100644 src/stream/src/common/log_store/mod.rs diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs new file mode 100644 index 000000000000..6cbb3afde055 --- /dev/null +++ b/src/stream/src/common/log_store/mod.rs @@ -0,0 +1,270 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::future::Future; + +use risingwave_common::array::StreamChunk; +use risingwave_common::util::epoch::INVALID_EPOCH; +use tokio::sync::mpsc::{ + channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, +}; +use tokio::sync::oneshot; + +use crate::common::log_store::LogReaderEpochProgress::{AwaitingTruncate, Consuming}; + +#[derive(thiserror::Error, Debug)] +pub enum LogStoreError { + #[error("EndOfLogStream")] + EndOfLogStream, +} + +pub type LogStoreResult = Result; + +#[derive(Debug)] +pub enum LogStoreReadItem { + StreamChunk(StreamChunk), + Barrier { + next_epoch: u64, + is_checkpoint: bool, + }, +} + +pub trait LogWriter { + type InitFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type WriteChunkFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type FlushCurrentEpoch<'a>: Future> + Send + 'a + where + Self: 'a; + + fn init(&mut self, epoch: u64) -> Self::InitFuture<'_>; + fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_>; + fn flush_current_epoch( + &mut self, + next_epoch: u64, + is_checkpoint: bool, + ) -> Self::FlushCurrentEpoch<'_>; +} + +pub trait LogReader { + type InitFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type NextItemFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type TruncateFuture<'a>: Future> + Send + 'a + where + Self: 'a; + + fn init(&mut self) -> Self::InitFuture<'_>; + fn next_item(&mut self) -> Self::NextItemFuture<'_>; + fn truncate(&mut self) -> Self::TruncateFuture<'_>; +} + +pub trait LogStoreFactory { + type Reader: LogReader; + type Writer: LogWriter; + + fn build(self) -> (Self::Reader, Self::Writer); +} + +pub struct BoundedInMemLogStoreWriter { + curr_epoch: Option, + + init_epoch_tx: Option>, + item_tx: Sender, + truncated_epoch_rx: UnboundedReceiver, +} + +#[derive(Eq, PartialEq, Debug)] +enum LogReaderEpochProgress { + Consuming(u64), + AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 }, +} + +const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH); + +pub struct BoundedInMemLogStoreReader { + epoch_progress: LogReaderEpochProgress, + + init_epoch_rx: Option>, + item_rx: Receiver, + truncated_epoch_tx: UnboundedSender, +} + +pub struct BoundedInMemLogStoreFactory { + bound: usize, +} + +impl BoundedInMemLogStoreFactory { + pub fn new(bound: usize) -> Self { + Self { bound } + } +} + +impl LogStoreFactory for BoundedInMemLogStoreFactory { + type Reader = BoundedInMemLogStoreReader; + type Writer = BoundedInMemLogStoreWriter; + + fn build(self) -> (Self::Reader, Self::Writer) { + let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); + let (item_tx, item_rx) = channel(self.bound); + let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); + let reader = BoundedInMemLogStoreReader { + epoch_progress: UNINITIALIZED, + init_epoch_rx: Some(init_epoch_rx), + item_rx, + truncated_epoch_tx, + }; + let writer = BoundedInMemLogStoreWriter { + curr_epoch: None, + init_epoch_tx: Some(init_epoch_tx), + item_tx, + truncated_epoch_rx, + }; + (reader, writer) + } +} + +impl LogReader for BoundedInMemLogStoreReader { + type InitFuture<'a> = impl Future> + 'a; + type NextItemFuture<'a> = impl Future> + 'a; + type TruncateFuture<'a> = impl Future> + 'a; + + fn init(&mut self) -> Self::InitFuture<'_> { + async { + let init_epoch_rx = self + .init_epoch_rx + .take() + .expect("should not init for twice"); + // TODO: should return the error + let epoch = init_epoch_rx.await.expect("should be able to init"); + assert_eq!(self.epoch_progress, UNINITIALIZED); + self.epoch_progress = LogReaderEpochProgress::Consuming(epoch); + Ok(epoch) + } + } + + fn next_item(&mut self) -> Self::NextItemFuture<'_> { + async { + match self.item_rx.recv().await { + Some(item) => { + if let LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + } = &item + { + match self.epoch_progress { + LogReaderEpochProgress::Consuming(current_epoch) => { + if *is_checkpoint { + self.epoch_progress = AwaitingTruncate { + next_epoch: *next_epoch, + sealed_epoch: current_epoch, + }; + } else { + self.epoch_progress = Consuming(*next_epoch); + } + } + LogReaderEpochProgress::AwaitingTruncate { .. } => { + unreachable!("should not be awaiting for when barrier comes") + } + } + } + Ok(item) + } + None => Err(LogStoreError::EndOfLogStream), + } + } + } + + fn truncate(&mut self) -> Self::TruncateFuture<'_> { + async move { + let sealed_epoch = match self.epoch_progress { + Consuming(_) => unreachable!("should be awaiting truncate"), + LogReaderEpochProgress::AwaitingTruncate { + sealed_epoch, + next_epoch, + } => { + self.epoch_progress = Consuming(next_epoch); + sealed_epoch + } + }; + // TODO: should return error + self.truncated_epoch_tx + .send(sealed_epoch) + .expect("should not error"); + Ok(()) + } + } +} + +impl LogWriter for BoundedInMemLogStoreWriter { + type FlushCurrentEpoch<'a> = impl Future> + 'a; + type InitFuture<'a> = impl Future> + 'a; + type WriteChunkFuture<'a> = impl Future> + 'a; + + fn init(&mut self, epoch: u64) -> Self::InitFuture<'_> { + let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice"); + // TODO: return the error + init_epoch_tx.send(epoch).unwrap(); + self.curr_epoch = Some(epoch); + async { Ok(()) } + } + + fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_> { + async { + // TODO: return the sender error + self.item_tx + .send(LogStoreReadItem::StreamChunk(chunk)) + .await + .expect("should be able to send"); + Ok(()) + } + } + + fn flush_current_epoch( + &mut self, + next_epoch: u64, + is_checkpoint: bool, + ) -> Self::FlushCurrentEpoch<'_> { + async move { + // TODO: return the sender error + self.item_tx + .send(LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + }) + .await + .expect("should be able to send"); + + let prev_epoch = self + .curr_epoch + .replace(next_epoch) + .expect("should have epoch"); + + if is_checkpoint { + // TODO: return err at None + let truncated_epoch = self.truncated_epoch_rx.recv().await.unwrap(); + assert_eq!(truncated_epoch, prev_epoch); + } + + Ok(()) + } + } +} diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index 026af3735399..d92e84639e91 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -19,4 +19,5 @@ pub use infallible_expr::*; mod builder; mod column_mapping; mod infallible_expr; +pub mod log_store; pub mod table; diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index eafb17e977ae..64c6ab638e1b 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -26,6 +26,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_storage::error::StorageError; use super::Barrier; +use crate::common::log_store::LogStoreError; #[derive(thiserror::Error, Debug)] enum Inner { @@ -36,6 +37,9 @@ enum Inner { StorageError, ), + #[error("Log store error: {0}")] + LogStoreError(LogStoreError), + #[error("Chunk operation error: {0}")] EvalError(Either), @@ -122,6 +126,13 @@ impl From for StreamExecutorError { } } +/// Log store error +impl From for StreamExecutorError { + fn from(e: LogStoreError) -> Self { + Inner::LogStoreError(e).into() + } +} + /// Chunk operation error. impl From for StreamExecutorError { fn from(e: ArrayError) -> Self { diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index f0c82b4e397c..623ee89c542f 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -15,8 +15,10 @@ use std::sync::Arc; use std::time::Instant; -use futures::StreamExt; +use futures::stream::select; +use futures::{FutureExt, Stream, StreamExt}; use futures_async_stream::try_stream; +use prometheus::Histogram; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; @@ -28,8 +30,11 @@ use risingwave_connector::ConnectorParams; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message}; +use crate::common::log_store::{ + BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, +}; use crate::executor::monitor::StreamingMetrics; -use crate::executor::PkIndices; +use crate::executor::{expect_first_barrier, Barrier, PkIndices}; pub struct SinkExecutor { input: BoxedExecutor, @@ -48,10 +53,8 @@ async fn build_sink( pk_indices: PkIndices, connector_params: ConnectorParams, sink_type: SinkType, -) -> StreamExecutorResult> { - Ok(Box::new( - SinkImpl::new(config, schema, pk_indices, connector_params, sink_type).await?, - )) +) -> StreamExecutorResult { + Ok(SinkImpl::new(config, schema, pk_indices, connector_params, sink_type).await?) } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -93,32 +96,57 @@ impl SinkExecutor { } } - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(self) { - // the flag is required because kafka transaction requires at least one - // message, so we should abort the transaction if the flag is true. - let mut empty_checkpoint_flag = true; - let mut in_transaction = false; - let mut epoch = 0; - let data_types = self.schema.data_types(); + fn execute_inner( + self, + log_store_factory: impl LogStoreFactory, + ) -> impl Stream> { + let config = self.config.clone(); + let schema = self.schema.clone(); + + let (log_reader, log_writer) = log_store_factory.build(); - let mut sink = build_sink( - self.config.clone(), - self.schema, + let metrics = self + .metrics + .sink_commit_duration + .with_label_values(&[self.identity.as_str(), self.config.get_connector()]); + let consume_log_stream = Self::execute_consume_log( + config, + schema, self.pk_indices, self.connector_params, self.sink_type, - ) - .await?; + log_reader, + metrics, + ); + + let write_log_stream = + Self::execute_write_log(self.input, log_writer, self.schema, self.sink_type); - let input = self.input.execute(); + select(consume_log_stream.into_stream(), write_log_stream) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_write_log( + input: BoxedExecutor, + mut log_writer: impl LogWriter, + schema: Schema, + sink_type: SinkType, + ) { + let data_types = schema.data_types(); + let mut input = input.execute(); + + let Barrier { + epoch: epoch_pair, .. + } = expect_first_barrier(&mut input).await?; + + log_writer.init(epoch_pair.curr).await?; #[for_await] for msg in input { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly { + let visible_chunk = if sink_type == SinkType::ForceAppendOnly { // Force append-only by dropping UPDATE/DELETE messages. We do this when the // user forces the sink to be append-only while it is actually not based on // the frontend derivation result. @@ -133,22 +161,65 @@ impl SinkExecutor { // At this point (instead of the point above when we receive the upstream // data chunk), we make sure that we do have data to send out, and we can // thus mark the txn as started. - if !in_transaction { - sink.begin_epoch(epoch).await?; - in_transaction = true; - } - - if let Err(e) = sink.write_batch(chunk.clone()).await { - sink.abort().await?; - return Err(e.into()); - } - empty_checkpoint_flag = false; + log_writer.write_chunk(chunk.clone()).await?; yield Message::Chunk(chunk); } } Message::Barrier(barrier) => { - if barrier.checkpoint { + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.checkpoint) + .await?; + yield Message::Barrier(barrier); + } + } + } + } + + async fn execute_consume_log( + config: SinkConfig, + schema: Schema, + pk_indices: Vec, + connector_params: ConnectorParams, + sink_type: SinkType, + mut log_reader: R, + sink_commit_duration_metrics: Histogram, + ) -> StreamExecutorResult { + let mut sink = build_sink(config, schema, pk_indices, connector_params, sink_type).await?; + + let mut epoch = log_reader.init().await?; + + // the flag is required because kafka transaction requires at least one + // message, so we should abort the transaction if the flag is true. + let mut empty_checkpoint_flag = true; + let mut in_transaction = false; + + loop { + let item: LogStoreReadItem = log_reader.next_item().await?; + match item { + LogStoreReadItem::StreamChunk(chunk) => { + // NOTE: We start the txn here because a force-append-only sink might + // receive a data chunk full of DELETE messages and then drop all of them. + // At this point (instead of the point above when we receive the upstream + // data chunk), we make sure that we do have data to send out, and we can + // thus mark the txn as started. + if !in_transaction { + sink.begin_epoch(epoch).await?; + in_transaction = true; + } + + if let Err(e) = sink.write_batch(chunk.clone()).await { + sink.abort().await?; + return Err(e.into()); + } + empty_checkpoint_flag = false; + } + LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + } => { + assert!(next_epoch > epoch); + if is_checkpoint { if in_transaction { if empty_checkpoint_flag { sink.abort().await?; @@ -159,20 +230,15 @@ impl SinkExecutor { } else { let start_time = Instant::now(); sink.commit().await?; - self.metrics - .sink_commit_duration - .with_label_values(&[ - self.identity.as_str(), - self.config.get_connector(), - ]) + sink_commit_duration_metrics .observe(start_time.elapsed().as_millis() as f64); } } + log_reader.truncate().await?; in_transaction = false; empty_checkpoint_flag = true; } - epoch = barrier.epoch.curr; - yield Message::Barrier(barrier); + epoch = next_epoch; } } } @@ -181,7 +247,9 @@ impl SinkExecutor { impl Executor for SinkExecutor { fn execute(self: Box) -> super::BoxedMessageStream { - self.execute_inner().boxed() + // TODO: dispatch in enum + let bounded_log_store_factory = BoundedInMemLogStoreFactory::new(1); + self.execute_inner(bounded_log_store_factory).boxed() } fn schema(&self) -> &Schema { From b509dcdb8d12414f01b4fff1d27cce37f170be24 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 21 Mar 2023 11:50:43 +0800 Subject: [PATCH 2/9] fix unit test --- src/stream/src/executor/sink.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 623ee89c542f..372f82997bff 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -34,7 +34,7 @@ use crate::common::log_store::{ BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, }; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{expect_first_barrier, Barrier, PkIndices}; +use crate::executor::{expect_first_barrier, PkIndices}; pub struct SinkExecutor { input: BoxedExecutor, @@ -135,12 +135,15 @@ impl SinkExecutor { let data_types = schema.data_types(); let mut input = input.execute(); - let Barrier { - epoch: epoch_pair, .. - } = expect_first_barrier(&mut input).await?; + let barrier = expect_first_barrier(&mut input).await?; + + let epoch_pair = barrier.epoch; log_writer.init(epoch_pair.curr).await?; + // Propagate the first barrier + yield Message::Barrier(barrier); + #[for_await] for msg in input { match msg? { @@ -355,11 +358,12 @@ mod test { schema.clone(), pk.clone(), vec![ + Message::Barrier(Barrier::new_test_barrier(1)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I + 3 2", ))), - Message::Barrier(Barrier::new_test_barrier(1)), + Message::Barrier(Barrier::new_test_barrier(2)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I U- 3 2 @@ -387,6 +391,9 @@ mod test { let mut executor = SinkExecutor::execute(Box::new(sink_executor)); + // Barrier message. + executor.next().await.unwrap().unwrap(); + let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( chunk_msg.into_chunk().unwrap(), From 8e8ee3e1fb32e92a4925e5aca2c414a5f8016871 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 28 Mar 2023 18:14:51 +0800 Subject: [PATCH 3/9] add doc and comment --- src/stream/src/common/log_store/mod.rs | 31 ++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index 6cbb3afde055..cfc812680d05 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -52,8 +52,13 @@ pub trait LogWriter { where Self: 'a; + /// Initialized the log writer with an initialize fn init(&mut self, epoch: u64) -> Self::InitFuture<'_>; + + /// Write a stream chunk to the log writer fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_>; + + /// Mark current epoch as finished and sealed, and flush the unconsumed log data. fn flush_current_epoch( &mut self, next_epoch: u64, @@ -72,8 +77,14 @@ pub trait LogReader { where Self: 'a; + /// Initialize the log reader. Usually function as waiting for log writer to be initialized. fn init(&mut self) -> Self::InitFuture<'_>; + + /// Emit the next item. fn next_item(&mut self) -> Self::NextItemFuture<'_>; + + /// Mark that all items emitted so far have been consumed and it is safe to truncate the log + /// from the current offset. fn truncate(&mut self) -> Self::TruncateFuture<'_>; } @@ -84,27 +95,47 @@ pub trait LogStoreFactory { fn build(self) -> (Self::Reader, Self::Writer); } +/// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded +/// mpsc channel. +/// +/// Since it is in-memory, when `flush_current_epoch` with checkpoint epoch, it should wait for the +/// reader to finish consuming all the data in current checkpoint epoch. pub struct BoundedInMemLogStoreWriter { + /// Current epoch. Should be `Some` after `init` curr_epoch: Option, + /// Holder of oneshot channel to send the initial epoch to the associated log reader. init_epoch_tx: Option>, + + /// Sending log store item to log reader item_tx: Sender, + + /// Receiver for the epoch consumed by log reader. truncated_epoch_rx: UnboundedReceiver, } #[derive(Eq, PartialEq, Debug)] enum LogReaderEpochProgress { + /// In progress of consuming data in current epoch. Consuming(u64), + /// Finished emitting the data in checkpoint epoch, and waiting for a call on `truncate`. AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 }, } const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH); pub struct BoundedInMemLogStoreReader { + /// Current progress of log reader. Can be either consuming an epoch, or has finished consuming + /// an epoch and waiting to be truncated. epoch_progress: LogReaderEpochProgress, + /// Holder for oneshot channel to receive the initial epoch init_epoch_rx: Option>, + + /// Receiver to fetch log store item item_rx: Receiver, + + /// Sender of consumed epoch to the log writer truncated_epoch_tx: UnboundedSender, } From 691d55afcaf0c04864993e300c0b07252c071ef2 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 28 Mar 2023 18:52:33 +0800 Subject: [PATCH 4/9] add update_vnode_bitmap in trait --- src/stream/src/common/log_store/mod.rs | 9 +++++++++ src/stream/src/executor/sink.rs | 21 ++++++++++++++++++--- src/stream/src/from_proto/sink.rs | 1 + 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index cfc812680d05..680a574dc723 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -14,8 +14,10 @@ use std::fmt::Debug; use std::future::Future; +use std::sync::Arc; use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::INVALID_EPOCH; use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, @@ -64,6 +66,9 @@ pub trait LogWriter { next_epoch: u64, is_checkpoint: bool, ) -> Self::FlushCurrentEpoch<'_>; + + /// Update the vnode bitmap of the log writer + fn update_vnode_bitmap(&mut self, new_vnodes: Arc); } pub trait LogReader { @@ -298,4 +303,8 @@ impl LogWriter for BoundedInMemLogStoreWriter { Ok(()) } } + + fn update_vnode_bitmap(&mut self, _new_vnodes: Arc) { + // Since this is in memory, we don't need to handle the vnode bitmap + } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 905649b9b3ed..2a54d116c021 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -34,7 +34,7 @@ use crate::common::log_store::{ BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, }; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{expect_first_barrier, PkIndices}; +use crate::executor::{expect_first_barrier, ActorContextRef, PkIndices}; pub struct SinkExecutor { input: BoxedExecutor, @@ -45,6 +45,7 @@ pub struct SinkExecutor { schema: Schema, pk_indices: Vec, sink_type: SinkType, + actor_context: ActorContextRef, } async fn build_sink( @@ -83,6 +84,7 @@ impl SinkExecutor { schema: Schema, pk_indices: Vec, sink_type: SinkType, + actor_context: ActorContextRef, ) -> Self { Self { input: materialize_executor, @@ -93,6 +95,7 @@ impl SinkExecutor { schema, connector_params, sink_type, + actor_context, } } @@ -119,8 +122,13 @@ impl SinkExecutor { metrics, ); - let write_log_stream = - Self::execute_write_log(self.input, log_writer, self.schema, self.sink_type); + let write_log_stream = Self::execute_write_log( + self.input, + log_writer, + self.schema, + self.sink_type, + self.actor_context, + ); select(consume_log_stream.into_stream(), write_log_stream) } @@ -131,6 +139,7 @@ impl SinkExecutor { mut log_writer: impl LogWriter, schema: Schema, sink_type: SinkType, + actor_context: ActorContextRef, ) { let data_types = schema.data_types(); let mut input = input.execute(); @@ -173,6 +182,9 @@ impl SinkExecutor { log_writer .flush_current_epoch(barrier.epoch.curr, barrier.checkpoint) .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { + log_writer.update_vnode_bitmap(vnode_bitmap); + } yield Message::Barrier(barrier); } } @@ -272,6 +284,7 @@ impl Executor for SinkExecutor { mod test { use super::*; use crate::executor::test_utils::*; + use crate::executor::ActorContext; #[ignore] #[tokio::test] @@ -324,6 +337,7 @@ mod test { schema.clone(), pk.clone(), SinkType::AppendOnly, + ActorContext::create(0), ); let mut executor = SinkExecutor::execute(Box::new(sink_executor)); @@ -387,6 +401,7 @@ mod test { schema.clone(), pk.clone(), SinkType::ForceAppendOnly, + ActorContext::create(0), ); let mut executor = SinkExecutor::execute(Box::new(sink_executor)); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index e524963187f1..1da199a51a43 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -59,6 +59,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { schema, pk_indices, sink_type, + params.actor_context, ))) } } From 63652edab31d20d084f8c2c3dc4a54f9c98f40d8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 29 Mar 2023 16:41:46 +0800 Subject: [PATCH 5/9] make factory build async and add log store generic on sink executor --- src/stream/src/common/log_store/mod.rs | 48 +++++++++++++++----------- src/stream/src/executor/sink.rs | 42 ++++++++++++---------- src/stream/src/from_proto/sink.rs | 27 +++++++++------ 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index 680a574dc723..f1577dce7066 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -93,11 +93,13 @@ pub trait LogReader { fn truncate(&mut self) -> Self::TruncateFuture<'_>; } -pub trait LogStoreFactory { - type Reader: LogReader; - type Writer: LogWriter; +pub trait LogStoreFactory: 'static { + type Reader: LogReader + Send + 'static; + type Writer: LogWriter + Send + 'static; - fn build(self) -> (Self::Reader, Self::Writer); + type BuildFuture: Future + Send; + + fn build(self) -> Self::BuildFuture; } /// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded @@ -158,23 +160,27 @@ impl LogStoreFactory for BoundedInMemLogStoreFactory { type Reader = BoundedInMemLogStoreReader; type Writer = BoundedInMemLogStoreWriter; - fn build(self) -> (Self::Reader, Self::Writer) { - let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); - let (item_tx, item_rx) = channel(self.bound); - let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); - let reader = BoundedInMemLogStoreReader { - epoch_progress: UNINITIALIZED, - init_epoch_rx: Some(init_epoch_rx), - item_rx, - truncated_epoch_tx, - }; - let writer = BoundedInMemLogStoreWriter { - curr_epoch: None, - init_epoch_tx: Some(init_epoch_tx), - item_tx, - truncated_epoch_rx, - }; - (reader, writer) + type BuildFuture = impl Future; + + fn build(self) -> Self::BuildFuture { + async move { + let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); + let (item_tx, item_rx) = channel(self.bound); + let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); + let reader = BoundedInMemLogStoreReader { + epoch_progress: UNINITIALIZED, + init_epoch_rx: Some(init_epoch_rx), + item_rx, + truncated_epoch_tx, + }; + let writer = BoundedInMemLogStoreWriter { + curr_epoch: None, + init_epoch_tx: Some(init_epoch_tx), + item_tx, + truncated_epoch_rx, + }; + (reader, writer) + } } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 2a54d116c021..b6f84c26e525 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -30,13 +30,11 @@ use risingwave_connector::ConnectorParams; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message}; -use crate::common::log_store::{ - BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, -}; +use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{expect_first_barrier, ActorContextRef, PkIndices}; -pub struct SinkExecutor { +pub struct SinkExecutor { input: BoxedExecutor, metrics: Arc, config: SinkConfig, @@ -46,6 +44,8 @@ pub struct SinkExecutor { pk_indices: Vec, sink_type: SinkType, actor_context: ActorContextRef, + log_reader: F::Reader, + log_writer: F::Writer, } async fn build_sink( @@ -73,9 +73,9 @@ fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option SinkExecutor { #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( materialize_executor: BoxedExecutor, metrics: Arc, config: SinkConfig, @@ -85,7 +85,9 @@ impl SinkExecutor { pk_indices: Vec, sink_type: SinkType, actor_context: ActorContextRef, + log_store_factory: F, ) -> Self { + let (log_reader, log_writer) = log_store_factory.build().await; Self { input: materialize_executor, metrics, @@ -96,18 +98,15 @@ impl SinkExecutor { connector_params, sink_type, actor_context, + log_reader, + log_writer, } } - fn execute_inner( - self, - log_store_factory: impl LogStoreFactory, - ) -> impl Stream> { + fn execute_inner(self) -> impl Stream> { let config = self.config.clone(); let schema = self.schema.clone(); - let (log_reader, log_writer) = log_store_factory.build(); - let metrics = self .metrics .sink_commit_duration @@ -118,13 +117,13 @@ impl SinkExecutor { self.pk_indices, self.connector_params, self.sink_type, - log_reader, + self.log_reader, metrics, ); let write_log_stream = Self::execute_write_log( self.input, - log_writer, + self.log_writer, self.schema, self.sink_type, self.actor_context, @@ -260,11 +259,10 @@ impl SinkExecutor { } } -impl Executor for SinkExecutor { +impl Executor for SinkExecutor { fn execute(self: Box) -> super::BoxedMessageStream { // TODO: dispatch in enum - let bounded_log_store_factory = BoundedInMemLogStoreFactory::new(1); - self.execute_inner(bounded_log_store_factory).boxed() + self.execute_inner().boxed() } fn schema(&self) -> &Schema { @@ -283,6 +281,7 @@ impl Executor for SinkExecutor { #[cfg(test)] mod test { use super::*; + use crate::common::log_store::BoundedInMemLogStoreFactory; use crate::executor::test_utils::*; use crate::executor::ActorContext; @@ -328,6 +327,7 @@ mod test { ); let config = SinkConfig::from_hashmap(properties).unwrap(); + let bounded_log_store_factory = BoundedInMemLogStoreFactory::new(1); let sink_executor = SinkExecutor::new( Box::new(mock), Arc::new(StreamingMetrics::unused()), @@ -338,7 +338,9 @@ mod test { pk.clone(), SinkType::AppendOnly, ActorContext::create(0), - ); + bounded_log_store_factory, + ) + .await; let mut executor = SinkExecutor::execute(Box::new(sink_executor)); @@ -402,7 +404,9 @@ mod test { pk.clone(), SinkType::ForceAppendOnly, ActorContext::create(0), - ); + BoundedInMemLogStoreFactory::new(1), + ) + .await; let mut executor = SinkExecutor::execute(Box::new(sink_executor)); diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 1da199a51a43..2233cf44a39a 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -17,6 +17,7 @@ use risingwave_connector::sink::SinkConfig; use risingwave_pb::stream_plan::SinkNode; use super::*; +use crate::common::log_store::BoundedInMemLogStoreFactory; use crate::executor::{SinkExecutor, StreamExecutorError}; pub struct SinkExecutorBuilder; @@ -50,16 +51,20 @@ impl ExecutorBuilder for SinkExecutorBuilder { ); let config = SinkConfig::from_hashmap(properties).map_err(StreamExecutorError::from)?; - Ok(Box::new(SinkExecutor::new( - materialize_executor, - stream.streaming_metrics.clone(), - config, - params.executor_id, - params.env.connector_params(), - schema, - pk_indices, - sink_type, - params.actor_context, - ))) + Ok(Box::new( + SinkExecutor::new( + materialize_executor, + stream.streaming_metrics.clone(), + config, + params.executor_id, + params.env.connector_params(), + schema, + pk_indices, + sink_type, + params.actor_context, + BoundedInMemLogStoreFactory::new(1), + ) + .await, + )) } } From 3f60de3dac0781a3f99332c1960416b4b40da0a6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sat, 8 Apr 2023 15:34:53 +0200 Subject: [PATCH 6/9] fix comment and add unit test --- src/stream/src/common/log_store/mod.rs | 78 +++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index f1577dce7066..133d2fc7573f 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -54,7 +54,7 @@ pub trait LogWriter { where Self: 'a; - /// Initialized the log writer with an initialize + /// Initialize the log writer with an epoch fn init(&mut self, epoch: u64) -> Self::InitFuture<'_>; /// Write a stream chunk to the log writer @@ -314,3 +314,79 @@ impl LogWriter for BoundedInMemLogStoreWriter { // Since this is in memory, we don't need to handle the vnode bitmap } } + +#[cfg(test)] +mod tests { + use risingwave_common::array::{Op, StreamChunk}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + + use crate::common::log_store::{BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter}; + + #[tokio::test] + async fn test_in_memory_log_store() { + let factory = BoundedInMemLogStoreFactory::new(4); + let (mut reader, mut writer) = factory.build().await; + + let init_epoch = 233; + let epoch1 = init_epoch + 1; + let epoch2 = init_epoch + 2; + + + + let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete]; + let mut builder = DataChunkBuilder::new(vec![DataType::Int64, DataType::Varchar], 10000); + for i in 0..ops.len() { + assert!(builder + .append_one_row(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(i as i64)), + Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str())) + ])) + .is_none()); + } + let data_chunk = builder.consume_all().unwrap(); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + let stream_chunk_clone = stream_chunk.clone(); + + tokio::spawn(async move { + writer.init(init_epoch).await.unwrap(); + writer.write_chunk(stream_chunk_clone.clone()).await.unwrap(); + writer.flush_current_epoch(epoch1, false).await.unwrap(); + writer.write_chunk(stream_chunk_clone).await.unwrap(); + writer.flush_current_epoch(epoch2, true).await.unwrap(); + }); + + + assert_eq!(init_epoch, reader.init().await.unwrap()); + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(chunk) => { + assert_eq!(&chunk, &stream_chunk); + } + LogStoreReadItem::Barrier { .. } => unreachable!() + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(_) => unreachable!(), + LogStoreReadItem::Barrier { is_checkpoint, next_epoch } => { + assert!(!is_checkpoint); + assert_eq!(next_epoch, epoch1); + } + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(chunk) => { + assert_eq!(&chunk, &stream_chunk); + } + LogStoreReadItem::Barrier { .. } => unreachable!() + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(_) => unreachable!(), + LogStoreReadItem::Barrier { is_checkpoint, next_epoch } => { + assert!(is_checkpoint); + assert_eq!(next_epoch, epoch2); + } + } + } +} From 27673f22b4dc3bf68c880be7655a059465a1feac Mon Sep 17 00:00:00 2001 From: William Wen Date: Sat, 8 Apr 2023 15:36:40 +0200 Subject: [PATCH 7/9] fmt --- src/stream/src/common/log_store/mod.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index 133d2fc7573f..7619e7b3b179 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -322,7 +322,9 @@ mod tests { use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; - use crate::common::log_store::{BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter}; + use crate::common::log_store::{ + BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, + }; #[tokio::test] async fn test_in_memory_log_store() { @@ -333,8 +335,6 @@ mod tests { let epoch1 = init_epoch + 1; let epoch2 = init_epoch + 2; - - let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete]; let mut builder = DataChunkBuilder::new(vec![DataType::Int64, DataType::Varchar], 10000); for i in 0..ops.len() { @@ -351,24 +351,29 @@ mod tests { tokio::spawn(async move { writer.init(init_epoch).await.unwrap(); - writer.write_chunk(stream_chunk_clone.clone()).await.unwrap(); + writer + .write_chunk(stream_chunk_clone.clone()) + .await + .unwrap(); writer.flush_current_epoch(epoch1, false).await.unwrap(); writer.write_chunk(stream_chunk_clone).await.unwrap(); writer.flush_current_epoch(epoch2, true).await.unwrap(); }); - assert_eq!(init_epoch, reader.init().await.unwrap()); match reader.next_item().await.unwrap() { LogStoreReadItem::StreamChunk(chunk) => { assert_eq!(&chunk, &stream_chunk); } - LogStoreReadItem::Barrier { .. } => unreachable!() + LogStoreReadItem::Barrier { .. } => unreachable!(), } match reader.next_item().await.unwrap() { LogStoreReadItem::StreamChunk(_) => unreachable!(), - LogStoreReadItem::Barrier { is_checkpoint, next_epoch } => { + LogStoreReadItem::Barrier { + is_checkpoint, + next_epoch, + } => { assert!(!is_checkpoint); assert_eq!(next_epoch, epoch1); } @@ -378,12 +383,15 @@ mod tests { LogStoreReadItem::StreamChunk(chunk) => { assert_eq!(&chunk, &stream_chunk); } - LogStoreReadItem::Barrier { .. } => unreachable!() + LogStoreReadItem::Barrier { .. } => unreachable!(), } match reader.next_item().await.unwrap() { LogStoreReadItem::StreamChunk(_) => unreachable!(), - LogStoreReadItem::Barrier { is_checkpoint, next_epoch } => { + LogStoreReadItem::Barrier { + is_checkpoint, + next_epoch, + } => { assert!(is_checkpoint); assert_eq!(next_epoch, epoch2); } From 51713147015086428fcc2aaa552833dc6da0d671 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sat, 8 Apr 2023 16:12:22 +0200 Subject: [PATCH 8/9] wait writer finish in unit test --- src/stream/src/common/log_store/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index 7619e7b3b179..4df99419f802 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -349,7 +349,7 @@ mod tests { let stream_chunk = StreamChunk::from_parts(ops, data_chunk); let stream_chunk_clone = stream_chunk.clone(); - tokio::spawn(async move { + let join_handle = tokio::spawn(async move { writer.init(init_epoch).await.unwrap(); writer .write_chunk(stream_chunk_clone.clone()) @@ -396,5 +396,7 @@ mod tests { assert_eq!(next_epoch, epoch2); } } + + join_handle.await.unwrap(); } } From 4701e2cdd28f2f761204eee834f443fd2a976f55 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 9 Apr 2023 00:56:50 +0200 Subject: [PATCH 9/9] add truncate in unit test --- src/stream/src/common/log_store/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs index 4df99419f802..13af6722e554 100644 --- a/src/stream/src/common/log_store/mod.rs +++ b/src/stream/src/common/log_store/mod.rs @@ -397,6 +397,7 @@ mod tests { } } + reader.truncate().await.unwrap(); join_handle.await.unwrap(); } }