Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): introduce with option to configure cdc snapshot #16426

Merged
merged 24 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3766731
wip: optimize cdc backfill
StrikeW Apr 11, 2024
b74f43c
WIP: encapsulate snapshot read full table in a function
StrikeW Apr 15, 2024
5c6e8cf
refactor to start snapshot in a fixed interval
StrikeW Apr 15, 2024
adff2d4
WIP: test the upstream buffered events
StrikeW Apr 18, 2024
279fdd1
Merge remote-tracking branch 'origin/main' into siyuan/optimize-cdc-b…
StrikeW Apr 19, 2024
65df138
refine ut
StrikeW Apr 20, 2024
3faa90c
clean code
StrikeW Apr 21, 2024
d8f1d97
refactor cdc snapshot options
StrikeW Apr 21, 2024
2931d9c
minor
StrikeW Apr 22, 2024
4e3445c
Merge branch 'siyuan/optimize-cdc-backfill-new' into siyuan/cdc-snaps…
StrikeW Apr 22, 2024
3269095
set interval to 5 for test
StrikeW Apr 22, 2024
792ce11
solved mysql scan uncomplete problem
StrikeW Apr 25, 2024
b587a8b
fix mysql incomplete scan problem
StrikeW Apr 25, 2024
c03c3c1
Merge remote-tracking branch 'origin/main' into siyuan/optimize-cdc-b…
StrikeW Apr 25, 2024
833a8be
fix state commit
StrikeW Apr 25, 2024
3f35356
minor
StrikeW Apr 25, 2024
d5b5a9f
minor
StrikeW Apr 25, 2024
3d5be28
Merge remote-tracking branch 'origin/siyuan/optimize-cdc-backfill-new…
StrikeW Apr 26, 2024
0bf584a
set interval to 1 for test
StrikeW Apr 26, 2024
a9177e4
To test cdc-commit-offset
StrikeW Apr 26, 2024
e13726a
Merge remote-tracking branch 'origin/main' into siyuan/cdc-snapshot-o…
StrikeW Apr 28, 2024
9637b2b
minor
StrikeW Apr 28, 2024
a9996d7
backward compatibility
StrikeW Apr 28, 2024
afe3bd1
minor
StrikeW Apr 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ time.precision.mode=connect
# handle conflicts in the mview operator, thus we don't need to obey the above
# instructions. So we decrease the wait time here to reclaim jvm thread faster.
debezium.embedded.shutdown.pause.before.interrupt.ms=1
offset.flush.interval.ms=60000
13 changes: 13 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,16 @@ message StreamScanNode {
catalog.Table arrangement_table = 10;
}

// Config options for CDC backfill
message StreamCdcScanOptions {
// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 1;

uint32 snapshot_barrier_interval = 2;

uint32 snapshot_batch_size = 3;
}

message StreamCdcScanNode {
uint32 table_id = 1;

Expand All @@ -581,7 +591,10 @@ message StreamCdcScanNode {
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
// keep it for backward compatibility, new stream plan will use `options.disable_backfill`
bool disable_backfill = 7;

StreamCdcScanOptions options = 8;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
11 changes: 7 additions & 4 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream,
CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, StreamExecutorError,
CdcBackfillExecutor, CdcScanOptions, Execute, Executor as StreamExecutor, ExecutorInfo,
ExternalStorageTable, MaterializeExecutor, Message, Mutation, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
Expand Down Expand Up @@ -230,8 +230,11 @@ async fn test_cdc_backfill() -> StreamResult<()> {
state_table,
Some(4), // limit a snapshot chunk to have <= 4 rows by rate limit
false,
1,
4,
Some(CdcScanOptions {
disable_backfill: false,
snapshot_interval: 1,
snapshot_batch_size: 4,
}),
)
.boxed(),
);
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ mod tests {
let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";

Expand Down
15 changes: 3 additions & 12 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -32,7 +31,6 @@ use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::{source, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
Expand Down Expand Up @@ -62,7 +60,7 @@ use crate::handler::create_source::{
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -850,20 +848,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};
let options = CdcScanOptions::from_with_options(context.with_options())?;

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
options,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
65 changes: 63 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@
// limitations under the License.

use std::rc::Rc;
use std::str::FromStr;

use anyhow::anyhow;
use educe::Educe;
use fixedbitset::FixedBitSet;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_connector::source::cdc::{
CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY,
CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
};
use risingwave_pb::stream_plan::StreamCdcScanOptions;

use super::GenericPlanNode;
use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::WithOptions;

/// [`CdcScan`] reads rows of a table from an external upstream database
#[derive(Debug, Clone, Educe)]
Expand All @@ -40,7 +49,59 @@ pub struct CdcScan {
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub options: CdcScanOptions,
}

#[derive(Debug, Clone, Hash, PartialEq)]
pub struct CdcScanOptions {
pub disable_backfill: bool,
pub snapshot_barrier_interval: u32,
pub snapshot_batch_size: u32,
}

impl Default for CdcScanOptions {
fn default() -> Self {
Self {
disable_backfill: false,
snapshot_barrier_interval: 1,
snapshot_batch_size: 1000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we follow the batch_size specified in the config file instead?

}
}
}

impl CdcScanOptions {
pub fn from_with_options(with_options: &WithOptions) -> Result<Self> {
// unspecified option will use default values
let mut scan_options = Self::default();

// disable backfill if 'snapshot=false'
if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
scan_options.disable_backfill = !(bool::from_str(snapshot)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
};

if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
};

if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
scan_options.snapshot_batch_size =
u32::from_str(snapshot_batch_size).map_err(|_| {
anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY)
})?;
};

Ok(scan_options)
}

pub fn to_proto(&self) -> StreamCdcScanOptions {
StreamCdcScanOptions {
disable_backfill: self.disable_backfill,
snapshot_barrier_interval: self.snapshot_barrier_interval,
snapshot_batch_size: self.snapshot_batch_size,
}
}
}

impl CdcScan {
Expand Down Expand Up @@ -104,14 +165,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
options,
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::CdcScanOptions;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan,
ToStreamContext,
Expand Down Expand Up @@ -60,14 +61,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
options,
)
.into()
}
Expand Down Expand Up @@ -96,7 +97,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
self.core.options.clone(),
)
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl StreamCdcTableScan {
"stream cdc table scan output indices"
);

let options = self.core.options.to_proto();
let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
table_id: upstream_source_id,
upstream_column_ids,
Expand All @@ -255,7 +256,8 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
disable_backfill: options.disable_backfill,
options: Some(options),
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
36 changes: 18 additions & 18 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::executor::backfill::cdc::upstream_table::snapshot::{
use crate::executor::backfill::utils::{
get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
};
use crate::executor::backfill::CdcScanOptions;
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

Expand Down Expand Up @@ -70,12 +71,7 @@ pub struct CdcBackfillExecutor<S: StateStore> {
/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

disable_backfill: bool,

// TODO: make these options configurable
snapshot_interval: u32,

snapshot_read_limit: u32,
options: CdcScanOptions,
}

impl<S: StateStore> CdcBackfillExecutor<S> {
Expand All @@ -89,9 +85,8 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
metrics: Arc<StreamingMetrics>,
state_table: StateTable<S>,
rate_limit_rps: Option<u32>,
disable_backfill: bool,
snapshot_interval: u32,
snapshot_read_limit: u32,
disable_backfill: bool, // backward compatibility
scan_options: Option<CdcScanOptions>,
) -> Self {
let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap();
let upstream_table_id = external_table.table_id().table_id;
Expand All @@ -101,6 +96,11 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
pk_in_output_indices.len() + METADATA_STATE_LEN,
);

let options = scan_options.unwrap_or(CdcScanOptions {
disable_backfill,
..Default::default()
});
Comment on lines +99 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest moving this into from_proto so that we can accept a single field of scan_options: CdcScanOptions in new.


Self {
actor_ctx,
external_table,
Expand All @@ -110,9 +110,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
progress,
metrics,
rate_limit_rps,
disable_backfill,
snapshot_interval,
snapshot_read_limit,
options,
}
}

Expand Down Expand Up @@ -176,7 +174,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let state = state_impl.restore_state().await?;
current_pk_pos = state.current_pk_pos.clone();

let to_backfill = !self.disable_backfill && !state.is_finished;
let to_backfill = !self.options.disable_backfill && !state.is_finished;

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
Expand All @@ -200,10 +198,12 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
initial_binlog_offset = ?last_binlog_offset,
?current_pk_pos,
is_finished = state.is_finished,
disable_backfill = self.disable_backfill,
snapshot_row_count = total_snapshot_row_count,
rate_limit = self.rate_limit_rps,
"start cdc backfill"
disable_backfill = self.options.disable_backfill,
snapshot_interval = self.options.snapshot_interval,
snapshot_batch_size = self.options.snapshot_batch_size,
"start cdc backfill",
);

// CDC Backfill Algorithm:
Expand Down Expand Up @@ -269,7 +269,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
);

let right_snapshot = pin!(upstream_table_reader
.snapshot_read_full_table(read_args, self.snapshot_read_limit)
.snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
.map(Either::Right));

let (right_snapshot, valve) = pausable(right_snapshot);
Expand Down Expand Up @@ -298,7 +298,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// increase the barrier count and check whether need to start a new snapshot
barrier_count += 1;
let can_start_new_snapshot =
barrier_count == self.snapshot_interval;
barrier_count == self.options.snapshot_interval;

if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
Expand Down Expand Up @@ -567,7 +567,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
state_impl.commit_state(pending_barrier.epoch).await?;
yield Message::Barrier(pending_barrier);
}
} else if self.disable_backfill {
} else if self.options.disable_backfill {
// If backfill is disabled, we just mark the backfill as finished
tracing::info!(
upstream_table_id,
Expand Down
Loading
Loading