Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): introduce with option to configure cdc snapshot #16426

Merged
merged 24 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3766731
wip: optimize cdc backfill
StrikeW Apr 11, 2024
b74f43c
WIP: encapsulate snapshot read full table in a function
StrikeW Apr 15, 2024
5c6e8cf
refactor to start snapshot in a fixed interval
StrikeW Apr 15, 2024
adff2d4
WIP: test the upstream buffered events
StrikeW Apr 18, 2024
279fdd1
Merge remote-tracking branch 'origin/main' into siyuan/optimize-cdc-b…
StrikeW Apr 19, 2024
65df138
refine ut
StrikeW Apr 20, 2024
3faa90c
clean code
StrikeW Apr 21, 2024
d8f1d97
refactor cdc snapshot options
StrikeW Apr 21, 2024
2931d9c
minor
StrikeW Apr 22, 2024
4e3445c
Merge branch 'siyuan/optimize-cdc-backfill-new' into siyuan/cdc-snaps…
StrikeW Apr 22, 2024
3269095
set interval to 5 for test
StrikeW Apr 22, 2024
792ce11
solved mysql scan uncomplete problem
StrikeW Apr 25, 2024
b587a8b
fix mysql incomplete scan problem
StrikeW Apr 25, 2024
c03c3c1
Merge remote-tracking branch 'origin/main' into siyuan/optimize-cdc-b…
StrikeW Apr 25, 2024
833a8be
fix state commit
StrikeW Apr 25, 2024
3f35356
minor
StrikeW Apr 25, 2024
d5b5a9f
minor
StrikeW Apr 25, 2024
3d5be28
Merge remote-tracking branch 'origin/siyuan/optimize-cdc-backfill-new…
StrikeW Apr 26, 2024
0bf584a
set interval to 1 for test
StrikeW Apr 26, 2024
a9177e4
To test cdc-commit-offset
StrikeW Apr 26, 2024
e13726a
Merge remote-tracking branch 'origin/main' into siyuan/cdc-snapshot-o…
StrikeW Apr 28, 2024
9637b2b
minor
StrikeW Apr 28, 2024
a9996d7
backward compatibility
StrikeW Apr 28, 2024
afe3bd1
minor
StrikeW Apr 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ time.precision.mode=connect
# handle conflicts in the mview operator, thus we don't need to obey the above
# instructions. So we decrease the wait time here to reclaim jvm thread faster.
debezium.embedded.shutdown.pause.before.interrupt.ms=1
offset.flush.interval.ms=60000
16 changes: 14 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,16 @@ message StreamScanNode {
catalog.Table arrangement_table = 10;
}

// Config options for CDC backfill
message StreamCdcScanOptions {
// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 1;

uint32 snapshot_barrier_interval = 2;

uint32 snapshot_batch_size = 3;
}

message StreamCdcScanNode {
uint32 table_id = 1;

Expand All @@ -580,8 +590,10 @@ message StreamCdcScanNode {
// The rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
bool disable_backfill = 7;
reserved 7;
reserved "disable_backfill";
StrikeW marked this conversation as resolved.
Show resolved Hide resolved

StreamCdcScanOptions options = 8;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ mod tests {
let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
let parser = MySqlExternalTableReader::get_cdc_offset_parser();
println!("parsed offset: {:?}", parser(off0_str).unwrap());

let table_name = SchemaTableName {
schema_name: "mytest".to_string(),
table_name: "t1".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
// User can set snapshot='false' to disable cdc backfill
pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";

Expand Down
15 changes: 3 additions & 12 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -32,7 +31,6 @@ use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::{source, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
Expand Down Expand Up @@ -62,7 +60,7 @@ use crate::handler::create_source::{
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -850,20 +848,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

// disable backfill if 'snapshot=false'
let disable_backfill = match context.with_options().get(CDC_BACKFILL_ENABLE_KEY) {
None => false,
Some(v) => {
!(bool::from_str(v)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?)
}
};
let options = CdcScanOptions::from_with_options(context.with_options())?;

let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
disable_backfill,
options,
);

let scan_node: PlanRef = logical_scan.into();
Expand Down
65 changes: 63 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@
// limitations under the License.

use std::rc::Rc;
use std::str::FromStr;

use anyhow::anyhow;
use educe::Educe;
use fixedbitset::FixedBitSet;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_connector::source::cdc::{
CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY,
CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
};
use risingwave_pb::stream_plan::StreamCdcScanOptions;

use super::GenericPlanNode;
use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::WithOptions;

/// [`CdcScan`] reads rows of a table from an external upstream database
#[derive(Debug, Clone, Educe)]
Expand All @@ -40,7 +49,59 @@ pub struct CdcScan {
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,

pub options: CdcScanOptions,
}

#[derive(Debug, Clone, Hash, PartialEq)]
pub struct CdcScanOptions {
pub disable_backfill: bool,
pub snapshot_barrier_interval: u32,
pub snapshot_batch_size: u32,
}

impl Default for CdcScanOptions {
fn default() -> Self {
Self {
disable_backfill: false,
snapshot_barrier_interval: 1,
snapshot_batch_size: 1000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we follow the batch_size specified in the config file instead?

}
}
}

impl CdcScanOptions {
pub fn from_with_options(with_options: &WithOptions) -> Result<Self> {
// unspecified option will use default values
let mut scan_options = Self::default();

// disable backfill if 'snapshot=false'
if let Some(snapshot) = with_options.get(CDC_BACKFILL_ENABLE_KEY) {
scan_options.disable_backfill = !(bool::from_str(snapshot)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_ENABLE_KEY))?);
};

if let Some(snapshot_interval) = with_options.get(CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY) {
scan_options.snapshot_barrier_interval = u32::from_str(snapshot_interval)
.map_err(|_| anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY))?;
};

if let Some(snapshot_batch_size) = with_options.get(CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY) {
scan_options.snapshot_batch_size =
u32::from_str(snapshot_batch_size).map_err(|_| {
anyhow!("Invalid value for {}", CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY)
})?;
};

Ok(scan_options)
}

pub fn to_proto(&self) -> StreamCdcScanOptions {
StreamCdcScanOptions {
disable_backfill: self.disable_backfill,
snapshot_barrier_interval: self.snapshot_barrier_interval,
snapshot_batch_size: self.snapshot_batch_size,
}
}
}

impl CdcScan {
Expand Down Expand Up @@ -104,14 +165,14 @@ impl CdcScan {
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
disable_backfill,
options,
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::CdcScanOptions;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan,
ToStreamContext,
Expand Down Expand Up @@ -60,14 +61,14 @@ impl LogicalCdcScan {
table_name: String, // explain-only
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
disable_backfill: bool,
options: CdcScanOptions,
) -> Self {
generic::CdcScan::new(
table_name,
(0..cdc_table_desc.columns.len()).collect(),
cdc_table_desc,
ctx,
disable_backfill,
options,
)
.into()
}
Expand Down Expand Up @@ -96,7 +97,7 @@ impl LogicalCdcScan {
output_col_idx,
self.core.cdc_table_desc.clone(),
self.base.ctx().clone(),
self.core.disable_backfill,
self.core.options.clone(),
)
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl StreamCdcTableScan {
"stream cdc table scan output indices"
);

let options = self.core.options.to_proto();
let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
table_id: upstream_source_id,
upstream_column_ids,
Expand All @@ -255,7 +256,7 @@ impl StreamCdcTableScan {
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
disable_backfill: self.core.disable_backfill,
options: Some(options),
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
Expand Down
20 changes: 11 additions & 9 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ pub struct CdcBackfillExecutor<S: StateStore> {
/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

/// Whether to disable backfill
disable_backfill: bool,

// TODO: make these options configurable
/// Barreir interval to start a new snapshot read
snapshot_interval: u32,

snapshot_read_limit: u32,
/// Batch size for a snapshot read query
snapshot_batch_size: u32,
}

impl<S: StateStore> CdcBackfillExecutor<S> {
Expand All @@ -91,7 +91,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
rate_limit_rps: Option<u32>,
disable_backfill: bool,
snapshot_interval: u32,
snapshot_read_limit: u32,
snapshot_batch_size: u32,
) -> Self {
let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap();
let upstream_table_id = external_table.table_id().table_id;
Expand All @@ -112,7 +112,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
rate_limit_rps,
disable_backfill,
snapshot_interval,
snapshot_read_limit,
snapshot_batch_size,
}
}

Expand Down Expand Up @@ -200,10 +200,12 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
initial_binlog_offset = ?last_binlog_offset,
?current_pk_pos,
is_finished = state.is_finished,
disable_backfill = self.disable_backfill,
snapshot_row_count = total_snapshot_row_count,
rate_limit = self.rate_limit_rps,
"start cdc backfill"
disable_backfill = self.disable_backfill,
snapshot_interval = self.snapshot_interval,
snapshot_batch_size = self.snapshot_batch_size,
"start cdc backfill",
);

// CDC Backfill Algorithm:
Expand Down Expand Up @@ -269,7 +271,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
);

let right_snapshot = pin!(upstream_table_reader
.snapshot_read_full_table(read_args, self.snapshot_read_limit)
.snapshot_read_full_table(read_args, self.snapshot_batch_size)
.map(Either::Right));

let (right_snapshot, valve) = pausable(right_snapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader
use super::external::ExternalStorageTable;
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::backfill::utils::{get_new_pos, iter_chunks};
use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH};
use crate::executor::{StreamExecutorError, StreamExecutorResult};

pub trait UpstreamTableRead {
fn snapshot_read_full_table(
Expand All @@ -44,9 +44,7 @@ pub trait UpstreamTableRead {

#[derive(Debug, Clone, Default)]
pub struct SnapshotReadArgs {
pub epoch: u64,
pub current_pos: Option<OwnedRow>,
pub ordered: bool,
pub rate_limit_rps: Option<u32>,
pub pk_in_output_indices: Vec<usize>,
}
Expand All @@ -58,9 +56,7 @@ impl SnapshotReadArgs {
pk_in_output_indices: Vec<usize>,
) -> Self {
Self {
epoch: INVALID_EPOCH,
current_pos,
ordered: false,
rate_limit_rps,
pk_in_output_indices,
}
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::{Schema, TableId};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::cdc::external::{CdcTableType, SchemaTableName};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::stream_plan::StreamCdcScanNode;
use risingwave_pb::stream_plan::{StreamCdcScanNode, StreamCdcScanOptions};

use super::*;
use crate::common::table::state_table::StateTable;
Expand All @@ -44,7 +44,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
.collect_vec();

let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
let disable_backfill = node.disable_backfill;
let options: &StreamCdcScanOptions = node.get_options()?;

let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
assert_eq!(output_indices, (0..table_schema.len()).collect_vec());
Expand Down Expand Up @@ -97,9 +97,9 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
params.executor_stats,
state_table,
node.rate_limit,
disable_backfill,
1,
1000,
options.disable_backfill,
options.snapshot_barrier_interval,
options.snapshot_batch_size,
);
Ok((params.info, exec).into())
}
Expand Down
Loading