diff --git a/Cargo.lock b/Cargo.lock index 5d73cdf712f5..eb2ae2ed948c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10639,12 +10639,14 @@ dependencies = [ "rustls-pemfile 2.1.1", "rustls-pki-types", "rw_futures_util", + "sea-schema", "serde", "serde_derive", "serde_json", "serde_with", "serde_yaml", "simd-json", + "sqlx", "strum 0.26.1", "strum_macros 0.26.1", "syn 1.0.109", @@ -12451,7 +12453,9 @@ checksum = "30d148608012d25222442d1ebbfafd1228dbc5221baf4ec35596494e27a2394e" dependencies = [ "futures", "sea-query", + "sea-query-binder", "sea-schema-derive", + "sqlx", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2380045fb0c5..8e64f7074c9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,6 +177,7 @@ sea-orm = { version = "0.12.14", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", ] } +sqlx = "0.7" tokio-util = "0.7" tracing-opentelemetry = "0.22" rand = { version = "0.8", features = ["small_rng"] } diff --git a/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt new file mode 100644 index 000000000000..5d27c73765e8 --- /dev/null +++ b/e2e_test/source/cdc_inline/auto_schema_map_mysql.slt @@ -0,0 +1,118 @@ +control substitution on + +# test case need to cover all data types +system ok +mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;" + +system ok +mysql --protocol=tcp -u root mytest -e " + DROP TABLE IF EXISTS mysql_types_test; + CREATE TABLE IF NOT EXISTS mysql_types_test( + c_boolean boolean, + c_bit bit, + c_tinyint tinyint, + c_smallint smallint, + c_mediumint mediumint, + c_integer integer, + c_Bigint bigint, + c_decimal decimal, + c_float float, + c_double double, + c_char_255 char(255), + c_varchar_10000 varchar(10000), + c_binary_255 binary(255), + c_varbinary_10000 varbinary(10000), + c_date date, + c_time time, + c_datetime datetime, + c_timestamp timestamp, + c_enum ENUM('happy','sad','ok'), + c_json JSON, + PRIMARY KEY (c_boolean,c_Bigint,c_date) + ); + INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]'); + INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]'); + " + + +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'root', + password = '${MYSQL_PWD:}', + database.name = 'mytest', + server.id = '5601' +); + + +statement ok +create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test'; + +sleep 3s + +# Name, Type, Is Hidden, Description +query TTTT +describe rw_mysql_types_test; +---- +c_boolean smallint false NULL +c_bit boolean false NULL +c_tinyint smallint false NULL +c_smallint smallint false NULL +c_mediumint integer false NULL +c_integer integer false NULL +c_Bigint bigint false NULL +c_decimal numeric false NULL +c_float real false NULL +c_double double precision false NULL +c_char_255 character varying false NULL +c_varchar_10000 character varying false NULL +c_binary_255 bytea false NULL +c_varbinary_10000 bytea false NULL +c_date date false NULL +c_time time without time zone false NULL +c_datetime timestamp without time zone false NULL +c_timestamp timestamp with time zone false NULL +c_enum character varying false NULL +c_json jsonb false NULL +primary key c_boolean, c_Bigint, c_date NULL NULL +distribution key c_boolean, c_Bigint, c_date NULL NULL +table description rw_mysql_types_test NULL NULL + +query TTTTTTTTTTTTT +SELECT + c_boolean, + c_bit, + c_tinyint, + c_smallint, + c_mediumint, + c_integer, + "c_Bigint", + c_decimal, + c_float, + c_double, + c_char_255, + c_varchar_10000, + c_binary_255 +FROM rw_mysql_types_test order by c_boolean; +---- +0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 +1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 + +query TTTTTTTT +SELECT + c_varbinary_10000, + c_date, + c_time, + c_datetime, + c_timestamp, + c_enum, + c_json +FROM rw_mysql_types_test order by c_boolean; +---- +\x 1001-01-01 NULL 2000-01-01 00:00:00 NULL happy [1, 2] +\x 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 sad [3, 4] + +statement ok +drop source mysql_source cascade; diff --git a/e2e_test/source/cdc_inline/auto_schema_map_pg.slt b/e2e_test/source/cdc_inline/auto_schema_map_pg.slt new file mode 100644 index 000000000000..8183a617293b --- /dev/null +++ b/e2e_test/source/cdc_inline/auto_schema_map_pg.slt @@ -0,0 +1,168 @@ +control substitution on + +# test case need to cover all data types +system ok +psql -c " + DROP TABLE IF EXISTS postgres_types_test; + CREATE TABLE IF NOT EXISTS postgres_types_test( + c_boolean boolean, + c_smallint smallint, + c_integer integer, + c_bigint bigint, + c_decimal decimal, + c_real real, + c_double_precision double precision, + c_varchar varchar, + c_bytea bytea, + c_date date, + c_time time, + c_timestamp timestamp, + c_timestamptz timestamptz, + c_interval interval, + c_jsonb jsonb, + c_uuid uuid, + c_enum mood, + c_boolean_array boolean[], + c_smallint_array smallint[], + c_integer_array integer[], + c_bigint_array bigint[], + c_decimal_array decimal[], + c_real_array real[], + c_double_precision_array double precision[], + c_varchar_array varchar[], + c_bytea_array bytea[], + c_date_array date[], + c_time_array time[], + c_timestamp_array timestamp[], + c_timestamptz_array timestamptz[], + c_interval_array interval[], + c_jsonb_array jsonb[], + c_uuid_array uuid[], + c_enum_array mood[], + PRIMARY KEY (c_boolean,c_bigint,c_date) + ); + INSERT INTO postgres_types_test VALUES ( False, 0, 0, 0, 0, 0, 0, '', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]); + INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}'); + INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]); + INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + " + +statement ok +create source pg_source with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'pg_slot' +); + + +statement ok +create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test'; + +sleep 3s + +# Name, Type, Is Hidden, Description +query TTTT +describe rw_postgres_types_test; +---- +c_boolean boolean false NULL +c_smallint smallint false NULL +c_integer integer false NULL +c_bigint bigint false NULL +c_decimal numeric false NULL +c_real real false NULL +c_double_precision double precision false NULL +c_varchar character varying false NULL +c_bytea bytea false NULL +c_date date false NULL +c_time time without time zone false NULL +c_timestamp timestamp without time zone false NULL +c_timestamptz timestamp with time zone false NULL +c_interval interval false NULL +c_jsonb jsonb false NULL +c_uuid character varying false NULL +c_enum character varying false NULL +c_boolean_array boolean[] false NULL +c_smallint_array smallint[] false NULL +c_integer_array integer[] false NULL +c_bigint_array bigint[] false NULL +c_decimal_array numeric[] false NULL +c_real_array real[] false NULL +c_double_precision_array double precision[] false NULL +c_varchar_array character varying[] false NULL +c_bytea_array bytea[] false NULL +c_date_array date[] false NULL +c_time_array time without time zone[] false NULL +c_timestamp_array timestamp without time zone[] false NULL +c_timestamptz_array timestamp with time zone[] false NULL +c_interval_array interval[] false NULL +c_jsonb_array jsonb[] false NULL +c_uuid_array character varying[] false NULL +c_enum_array character varying[] false NULL +primary key c_boolean, c_bigint, c_date NULL NULL +distribution key c_boolean, c_bigint, c_date NULL NULL +table description rw_postgres_types_test NULL NULL + +query TTTTTTT +SELECT + c_boolean, + c_smallint, + c_integer, + c_bigint, + c_decimal, + c_real, + c_double_precision, + c_varchar, + c_bytea from rw_postgres_types_test where c_enum = 'happy' order by c_integer; +---- +f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 d \x3030 +f 1 123 1234567890 123.45 123.45 123.456 a_varchar \x4445414442454546 + +query TTTTT +SELECT + c_date, + c_time, + c_timestamp, + c_timestamptz, + c_interval from rw_postgres_types_test where c_enum = 'happy' order by c_integer; +---- +0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 +0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day + +query TTTTTTT +SELECT + c_jsonb, + c_uuid, + c_enum, + c_boolean_array, + c_smallint_array, + c_integer_array, + c_bigint_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer; +---- +{} bb488f9b-330d-4012-b849-12adeb49e57e happy {f} {-32767} {-2147483647} {-9223372036854775807} +"hello" 123e4567-e89b-12d3-a456-426614174000 happy {NULL,t} {NULL,1} {NULL,123} {NULL,1234567890} + +query TTTTTTTTTTTTT +SELECT + c_decimal_array, + c_real_array, + c_double_precision_array, + c_varchar_array, + c_bytea_array, + c_date_array, + c_time_array, + c_timestamp_array, + c_timestamptz_array, + c_interval_array, + c_jsonb_array, + c_uuid_array, + c_enum_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer; +---- +{-10.0} {-10000} {-10000} {""} {"\\x3030"} {0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL {"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} {happy,ok,sad} +{NULL,123.45} {NULL,123.45} {NULL,123.456} {NULL,a_varchar} {NULL,"\\x4445414442454546"} {NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL {NULL,"\"hello\""} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL + +statement ok +drop source pg_source cascade; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index a130c7107499..a2f63a28bbd7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -195,10 +195,9 @@ private void validateTableSchema() throws SQLException { var field = res.getString(1); var dataType = res.getString(2); var key = res.getString(3); - schema.put(field.toLowerCase(), dataType); + schema.put(field, dataType); if (key.equalsIgnoreCase("PRI")) { - // RisingWave always use lower case for column name - pkFields.add(field.toLowerCase()); + pkFields.add(field); } } @@ -208,7 +207,7 @@ private void validateTableSchema() throws SQLException { if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) { continue; } - var dataType = schema.get(e.getKey().toLowerCase()); + var dataType = schema.get(e.getKey()); if (dataType == null) { throw ValidatorUtils.invalidArgument( "Column '" + e.getKey() + "' not found in the upstream database"); diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 03045fb4e0f6..26f822bae558 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -33,8 +33,9 @@ use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader; +use risingwave_connector::source::cdc::external::mysql::MySqlOffset; use risingwave_connector::source::cdc::external::{ - DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, MySqlOffset, SchemaTableName, + DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName, }; use risingwave_connector::source::cdc::DebeziumCdcSplit; use risingwave_connector::source::SplitImpl; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 5ed9fd933c35..1464fcc215b3 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -124,11 +124,13 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } +sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } simd-json = { version = "0.13.3", features = ["hints"] } +sqlx = { workspace = true } strum = "0.26" strum_macros = "0.26" tempfile = "3" diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index c3a438c16838..ab4b3e7bc37b 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -46,6 +46,7 @@ def_anyhow_newtype! { // Connector errors opendal::Error => transparent, // believed to be self-explanatory + sqlx::Error => transparent, // believed to be self-explanatory mysql_async::Error => "MySQL error", tokio_postgres::Error => "Postgres error", apache_avro::Error => "Avro error", diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index c964d99be7a4..48f4e16c747b 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -240,6 +240,7 @@ macro_rules! impl_cdc_source_type { } )* + #[derive(Clone)] pub enum CdcSourceType { $( $cdc_source_type, diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index aab3dfbee7df..65bd4adb4813 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -13,33 +13,34 @@ // limitations under the License. pub mod mock_external_table; -mod postgres; +pub mod postgres; #[cfg(not(madsim))] mod maybe_tls_connector; +pub mod mysql; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; -use anyhow::Context; +use anyhow::anyhow; +use futures::pin_mut; use futures::stream::BoxStream; -use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; -use itertools::Itertools; -use mysql_async::prelude::*; -use mysql_common::params::Params; -use mysql_common::value::Value; use risingwave_common::bail; -use risingwave_common::catalog::{Schema, OFFSET_COLUMN_NAME}; +use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::row::OwnedRow; -use risingwave_common::types::DataType; -use risingwave_common::util::iter_util::ZipEqFast; use serde_derive::{Deserialize, Serialize}; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::mysql_row_to_owned_row; use crate::source::cdc::external::mock_external_table::MockExternalTableReader; -use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; +use crate::source::cdc::external::mysql::{ + MySqlExternalTable, MySqlExternalTableReader, MySqlOffset, +}; +use crate::source::cdc::external::postgres::{ + PostgresExternalTable, PostgresExternalTableReader, PostgresOffset, +}; +use crate::source::cdc::CdcSourceType; use crate::WithPropertiesExt; #[derive(Debug)] @@ -120,18 +121,6 @@ impl SchemaTableName { } } -#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)] -pub struct MySqlOffset { - pub filename: String, - pub position: u64, -} - -impl MySqlOffset { - pub fn new(filename: String, position: u64) -> Self { - Self { filename, position } - } -} - #[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)] pub enum CdcOffset { MySql(MySqlOffset), @@ -181,24 +170,6 @@ pub struct DebeziumSourceOffset { pub tx_usec: Option, } -impl MySqlOffset { - pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { - let dbz_offset: DebeziumOffset = serde_json::from_str(offset) - .with_context(|| format!("invalid upstream offset: {}", offset))?; - - Ok(Self { - filename: dbz_offset - .source_offset - .file - .context("binlog file not found in offset")?, - position: dbz_offset - .source_offset - .pos - .context("binlog position not found in offset")?, - }) - } -} - pub type CdcOffsetParseFunc = Box ConnectorResult + Send>; pub trait ExternalTableReader { @@ -219,15 +190,10 @@ pub enum ExternalTableReaderImpl { Mock(MockExternalTableReader), } -pub struct MySqlExternalTableReader { - rw_schema: Schema, - field_names: String, - // use mutex to provide shared mutable access to the connection - conn: tokio::sync::Mutex, -} - #[derive(Debug, Clone, Deserialize)] pub struct ExternalTableConfig { + pub connector: String, + #[serde(rename = "hostname")] pub host: String, pub port: String, @@ -246,6 +212,16 @@ pub struct ExternalTableConfig { pub sslmode: SslMode, } +impl ExternalTableConfig { + pub fn try_from_btreemap( + connect_properties: BTreeMap, + ) -> ConnectorResult { + let json_value = serde_json::to_value(connect_properties)?; + let config = serde_json::from_value::(json_value)?; + Ok(config) + } +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] pub enum SslMode { @@ -271,229 +247,6 @@ impl fmt::Display for SslMode { } } -impl ExternalTableReader for MySqlExternalTableReader { - async fn current_cdc_offset(&self) -> ConnectorResult { - let mut conn = self.conn.lock().await; - - let sql = "SHOW MASTER STATUS".to_string(); - let mut rs = conn.query::(sql).await?; - let row = rs - .iter_mut() - .exactly_one() - .ok() - .context("expect exactly one row when reading binlog offset")?; - - Ok(CdcOffset::MySql(MySqlOffset { - filename: row.take("File").unwrap(), - position: row.take("Position").unwrap(), - })) - } - - fn snapshot_read( - &self, - table_name: SchemaTableName, - start_pk: Option, - primary_keys: Vec, - limit: u32, - ) -> BoxStream<'_, ConnectorResult> { - self.snapshot_read_inner(table_name, start_pk, primary_keys, limit) - } -} - -impl MySqlExternalTableReader { - pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { - let mut opts_builder = mysql_async::OptsBuilder::default() - .user(Some(config.username)) - .pass(Some(config.password)) - .ip_or_hostname(config.host) - .tcp_port(config.port.parse::().unwrap()) - .db_name(Some(config.database)); - - opts_builder = match config.sslmode { - SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None), - SslMode::Required => { - let ssl_without_verify = mysql_async::SslOpts::default() - .with_danger_accept_invalid_certs(true) - .with_danger_skip_domain_validation(true); - opts_builder.ssl_opts(Some(ssl_without_verify)) - } - }; - - let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?; - - let field_names = rw_schema - .fields - .iter() - .filter(|f| f.name != OFFSET_COLUMN_NAME) - .map(|f| Self::quote_column(f.name.as_str())) - .join(","); - - Ok(Self { - rw_schema, - field_names, - conn: tokio::sync::Mutex::new(conn), - }) - } - - pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String { - // schema name is the database name in mysql - format!("`{}`.`{}`", table_name.schema_name, table_name.table_name) - } - - pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { - Box::new(move |offset| { - Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( - offset, - )?)) - }) - } - - #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] - async fn snapshot_read_inner( - &self, - table_name: SchemaTableName, - start_pk_row: Option, - primary_keys: Vec, - limit: u32, - ) { - let order_key = primary_keys - .iter() - .map(|col| Self::quote_column(col)) - .join(","); - let sql = if start_pk_row.is_none() { - format!( - "SELECT {} FROM {} ORDER BY {} LIMIT {limit}", - self.field_names, - Self::get_normalized_table_name(&table_name), - order_key, - ) - } else { - let filter_expr = Self::filter_expression(&primary_keys); - format!( - "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}", - self.field_names, - Self::get_normalized_table_name(&table_name), - filter_expr, - order_key, - ) - }; - - tracing::debug!("snapshot sql: {}", sql); - let mut conn = self.conn.lock().await; - - // Set session timezone to UTC - conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; - - if start_pk_row.is_none() { - let rs_stream = sql.stream::(&mut *conn).await?; - let row_stream = rs_stream.map(|row| { - // convert mysql row into OwnedRow - let mut row = row?; - Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) - }); - - pin_mut!(row_stream); - #[for_await] - for row in row_stream { - let row = row?; - yield row; - } - } else { - let field_map = self - .rw_schema - .fields - .iter() - .map(|f| (f.name.as_str(), f.data_type.clone())) - .collect::>(); - - // fill in start primary key params - let params: Vec<_> = primary_keys - .iter() - .zip_eq_fast(start_pk_row.unwrap().into_iter()) - .map(|(pk, datum)| { - if let Some(value) = datum { - let ty = field_map.get(pk.as_str()).unwrap(); - let val = match ty { - DataType::Boolean => Value::from(value.into_bool()), - DataType::Int16 => Value::from(value.into_int16()), - DataType::Int32 => Value::from(value.into_int32()), - DataType::Int64 => Value::from(value.into_int64()), - DataType::Float32 => Value::from(value.into_float32().into_inner()), - DataType::Float64 => Value::from(value.into_float64().into_inner()), - DataType::Varchar => Value::from(String::from(value.into_utf8())), - DataType::Date => Value::from(value.into_date().0), - DataType::Time => Value::from(value.into_time().0), - DataType::Timestamp => Value::from(value.into_timestamp().0), - _ => bail!("unsupported primary key data type: {}", ty), - }; - ConnectorResult::Ok((pk.clone(), val)) - } else { - bail!("primary key {} cannot be null", pk); - } - }) - .try_collect::<_, _, ConnectorError>()?; - - let rs_stream = sql - .with(Params::from(params)) - .stream::(&mut *conn) - .await?; - - let row_stream = rs_stream.map(|row| { - // convert mysql row into OwnedRow - let mut row = row?; - Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) - }); - - pin_mut!(row_stream); - #[for_await] - for row in row_stream { - let row = row?; - yield row; - } - }; - } - - // mysql cannot leverage the given key to narrow down the range of scan, - // we need to rewrite the comparison conditions by our own. - // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y)) - fn filter_expression(columns: &[String]) -> String { - let mut conditions = vec![]; - // push the first condition - conditions.push(format!( - "({} > :{})", - Self::quote_column(&columns[0]), - columns[0] - )); - for i in 2..=columns.len() { - // '=' condition - let mut condition = String::new(); - for (j, col) in columns.iter().enumerate().take(i - 1) { - if j == 0 { - condition.push_str(&format!("({} = :{})", Self::quote_column(col), col)); - } else { - condition.push_str(&format!(" AND ({} = :{})", Self::quote_column(col), col)); - } - } - // '>' condition - condition.push_str(&format!( - " AND ({} > :{})", - Self::quote_column(&columns[i - 1]), - columns[i - 1] - )); - conditions.push(format!("({})", condition)); - } - if columns.len() > 1 { - conditions.join(" OR ") - } else { - conditions.join("") - } - } - - fn quote_column(column: &str) -> String { - format!("`{}`", column) - } -} - impl ExternalTableReader for ExternalTableReaderImpl { async fn current_cdc_offset(&self) -> ConnectorResult { match self { @@ -554,98 +307,36 @@ impl ExternalTableReaderImpl { } } -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use futures::pin_mut; - use futures_async_stream::for_await; - use maplit::{convert_args, hashmap}; - use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; - use risingwave_common::types::DataType; - - use crate::source::cdc::external::{ - CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, - SchemaTableName, - }; - - #[test] - fn test_mysql_filter_expr() { - let cols = vec!["id".to_string()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!(expr, "(`id` > :id)"); - - let cols = vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!( - expr, - "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" - ); - } +pub enum ExternalTableImpl { + MySql(MySqlExternalTable), + Postgres(PostgresExternalTable), +} - #[test] - fn test_mysql_binlog_offset() { - let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; - let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#; - let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#; - let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; - let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; - - let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap()); - let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap()); - let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap()); - let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap()); - let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap()); - - assert!(off0 <= off1); - assert!(off1 > off2); - assert!(off2 < off3); - assert_eq!(off3, off4); +impl ExternalTableImpl { + pub async fn connect(config: ExternalTableConfig) -> ConnectorResult { + let cdc_source_type = CdcSourceType::from(config.connector.as_str()); + match cdc_source_type { + CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql( + MySqlExternalTable::connect(config).await?, + )), + CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres( + PostgresExternalTable::connect(config).await?, + )), + _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()), + } } - // manual test case - #[ignore] - #[tokio::test] - async fn test_mysql_table_reader() { - let columns = vec![ - ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), - ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal), - ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar), - ColumnDesc::named("v4", ColumnId::new(4), DataType::Date), - ]; - let rw_schema = Schema { - fields: columns.iter().map(Field::from).collect(), - }; - let props: HashMap = convert_args!(hashmap!( - "hostname" => "localhost", - "port" => "8306", - "username" => "root", - "password" => "123456", - "database.name" => "mytest", - "table.name" => "t1")); - - let config = - serde_json::from_value::(serde_json::to_value(props).unwrap()) - .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema) - .await - .unwrap(); - let offset = reader.current_cdc_offset().await.unwrap(); - println!("BinlogOffset: {:?}", offset); - - let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; - let parser = MySqlExternalTableReader::get_cdc_offset_parser(); - println!("parsed offset: {:?}", parser(off0_str).unwrap()); - let table_name = SchemaTableName { - schema_name: "mytest".to_string(), - table_name: "t1".to_string(), - }; + pub fn column_descs(&self) -> &Vec { + match self { + ExternalTableImpl::MySql(mysql) => mysql.column_descs(), + ExternalTableImpl::Postgres(postgres) => postgres.column_descs(), + } + } - let stream = reader.snapshot_read(table_name, None, vec!["v1".to_string()], 1000); - pin_mut!(stream); - #[for_await] - for row in stream { - println!("OwnedRow: {:?}", row); + pub fn pk_names(&self) -> &Vec { + match self { + ExternalTableImpl::MySql(mysql) => mysql.pk_names(), + ExternalTableImpl::Postgres(postgres) => postgres.pk_names(), } } } diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs new file mode 100644 index 000000000000..37d855a9513e --- /dev/null +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -0,0 +1,562 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use anyhow::{anyhow, Context}; +use futures::stream::BoxStream; +use futures::{pin_mut, StreamExt}; +use futures_async_stream::try_stream; +use itertools::Itertools; +use mysql_async::prelude::*; +use mysql_common::params::Params; +use mysql_common::value::Value; +use risingwave_common::bail; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAME}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; +use sea_schema::mysql::def::{CharSet, Collation, ColumnKey, ColumnType, StorageEngine, TableInfo}; +use sea_schema::mysql::discovery::SchemaDiscovery; +use serde_derive::{Deserialize, Serialize}; +use sqlx::mysql::MySqlConnectOptions; +use sqlx::MySqlPool; + +use crate::error::{ConnectorError, ConnectorResult}; +use crate::source::cdc::external::{ + mysql_row_to_owned_row, CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, + ExternalTableReader, SchemaTableName, SslMode, +}; + +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)] +pub struct MySqlOffset { + pub filename: String, + pub position: u64, +} + +impl MySqlOffset { + pub fn new(filename: String, position: u64) -> Self { + Self { filename, position } + } +} + +impl MySqlOffset { + pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; + + Ok(Self { + filename: dbz_offset + .source_offset + .file + .context("binlog file not found in offset")?, + position: dbz_offset + .source_offset + .pos + .context("binlog position not found in offset")?, + }) + } +} + +pub struct MySqlExternalTable { + column_descs: Vec, + pk_names: Vec, +} + +impl MySqlExternalTable { + pub async fn connect(config: ExternalTableConfig) -> ConnectorResult { + tracing::debug!("connect to mysql"); + let options = MySqlConnectOptions::new() + .username(&config.username) + .password(&config.password) + .host(&config.host) + .port(config.port.parse::().unwrap()) + .database(&config.database) + .ssl_mode(match config.sslmode { + SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled, + SslMode::Required => sqlx::mysql::MySqlSslMode::Required, + }); + + let connection = MySqlPool::connect_with(options).await?; + let schema_discovery = SchemaDiscovery::new(connection, config.database.as_str()); + + let table_schema = schema_discovery + .discover_table(TableInfo { + name: config.table.clone(), + engine: StorageEngine::InnoDb, + auto_increment: None, + char_set: CharSet::Utf8Mb4, + collation: Collation::Utf8Mb40900AiCi, + comment: "".to_string(), + }) + .await?; + + let mut column_descs = vec![]; + let mut pk_names = vec![]; + for col in &table_schema.columns { + let data_type = type_to_rw_type(&col.col_type)?; + column_descs.push(ColumnDesc::named( + col.name.clone(), + ColumnId::placeholder(), + data_type, + )); + if matches!(col.key, ColumnKey::Primary) { + pk_names.push(col.name.clone()); + } + } + + Ok(Self { + column_descs, + pk_names, + }) + } + + pub fn column_descs(&self) -> &Vec { + &self.column_descs + } + + pub fn pk_names(&self) -> &Vec { + &self.pk_names + } +} + +fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { + let dtype = match col_type { + ColumnType::Serial => DataType::Int32, + ColumnType::Bit(attr) => { + if let Some(1) = attr.maximum { + DataType::Boolean + } else { + return Err( + anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(), + ); + } + } + ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16, + ColumnType::Bool => DataType::Boolean, + ColumnType::MediumInt(_) => DataType::Int32, + ColumnType::Int(_) => DataType::Int32, + ColumnType::BigInt(_) => DataType::Int64, + ColumnType::Decimal(_) => DataType::Decimal, + ColumnType::Float(_) => DataType::Float32, + ColumnType::Double(_) => DataType::Float64, + ColumnType::Date => DataType::Date, + ColumnType::Time(_) => DataType::Time, + ColumnType::DateTime(_) => DataType::Timestamp, + ColumnType::Timestamp(_) => DataType::Timestamptz, + ColumnType::Year => DataType::Int32, + ColumnType::Char(_) + | ColumnType::NChar(_) + | ColumnType::Varchar(_) + | ColumnType::NVarchar(_) => DataType::Varchar, + ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea, + ColumnType::Text(_) + | ColumnType::TinyText(_) + | ColumnType::MediumText(_) + | ColumnType::LongText(_) => DataType::Varchar, + ColumnType::Blob(_) + | ColumnType::TinyBlob + | ColumnType::MediumBlob + | ColumnType::LongBlob => DataType::Bytea, + ColumnType::Enum(_) => DataType::Varchar, + ColumnType::Json => DataType::Jsonb, + ColumnType::Set(_) => { + return Err(anyhow!("SET type not supported").into()); + } + ColumnType::Geometry(_) => { + return Err(anyhow!("GEOMETRY type not supported").into()); + } + ColumnType::Point(_) => { + return Err(anyhow!("POINT type not supported").into()); + } + ColumnType::LineString(_) => { + return Err(anyhow!("LINE string type not supported").into()); + } + ColumnType::Polygon(_) => { + return Err(anyhow!("POLYGON type not supported").into()); + } + ColumnType::MultiPoint(_) => { + return Err(anyhow!("MULTI POINT type not supported").into()); + } + ColumnType::MultiLineString(_) => { + return Err(anyhow!("MULTI LINE STRING type not supported").into()); + } + ColumnType::MultiPolygon(_) => { + return Err(anyhow!("MULTI POLYGON type not supported").into()); + } + ColumnType::GeometryCollection(_) => { + return Err(anyhow!("GEOMETRY COLLECTION type not supported").into()); + } + ColumnType::Unknown(_) => { + return Err(anyhow!("Unknown MySQL data type").into()); + } + }; + + Ok(dtype) +} + +pub struct MySqlExternalTableReader { + rw_schema: Schema, + field_names: String, + // use mutex to provide shared mutable access to the connection + conn: tokio::sync::Mutex, +} + +impl ExternalTableReader for MySqlExternalTableReader { + async fn current_cdc_offset(&self) -> ConnectorResult { + let mut conn = self.conn.lock().await; + + let sql = "SHOW MASTER STATUS".to_string(); + let mut rs = conn.query::(sql).await?; + let row = rs + .iter_mut() + .exactly_one() + .ok() + .context("expect exactly one row when reading binlog offset")?; + + Ok(CdcOffset::MySql(MySqlOffset { + filename: row.take("File").unwrap(), + position: row.take("Position").unwrap(), + })) + } + + fn snapshot_read( + &self, + table_name: SchemaTableName, + start_pk: Option, + primary_keys: Vec, + limit: u32, + ) -> BoxStream<'_, ConnectorResult> { + self.snapshot_read_inner(table_name, start_pk, primary_keys, limit) + } +} + +impl MySqlExternalTableReader { + pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { + let mut opts_builder = mysql_async::OptsBuilder::default() + .user(Some(config.username)) + .pass(Some(config.password)) + .ip_or_hostname(config.host) + .tcp_port(config.port.parse::().unwrap()) + .db_name(Some(config.database)); + + opts_builder = match config.sslmode { + SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None), + SslMode::Required => { + let ssl_without_verify = mysql_async::SslOpts::default() + .with_danger_accept_invalid_certs(true) + .with_danger_skip_domain_validation(true); + opts_builder.ssl_opts(Some(ssl_without_verify)) + } + }; + + let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?; + + let field_names = rw_schema + .fields + .iter() + .filter(|f| f.name != OFFSET_COLUMN_NAME) + .map(|f| Self::quote_column(f.name.as_str())) + .join(","); + + Ok(Self { + rw_schema, + field_names, + conn: tokio::sync::Mutex::new(conn), + }) + } + + pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String { + // schema name is the database name in mysql + format!("`{}`.`{}`", table_name.schema_name, table_name.table_name) + } + + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) + }) + } + + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] + async fn snapshot_read_inner( + &self, + table_name: SchemaTableName, + start_pk_row: Option, + primary_keys: Vec, + limit: u32, + ) { + let order_key = primary_keys + .iter() + .map(|col| Self::quote_column(col)) + .join(","); + let sql = if start_pk_row.is_none() { + format!( + "SELECT {} FROM {} ORDER BY {} LIMIT {limit}", + self.field_names, + Self::get_normalized_table_name(&table_name), + order_key, + ) + } else { + let filter_expr = Self::filter_expression(&primary_keys); + format!( + "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}", + self.field_names, + Self::get_normalized_table_name(&table_name), + filter_expr, + order_key, + ) + }; + + let mut conn = self.conn.lock().await; + + // Set session timezone to UTC + conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; + + if start_pk_row.is_none() { + let rs_stream = sql.stream::(&mut *conn).await?; + let row_stream = rs_stream.map(|row| { + // convert mysql row into OwnedRow + let mut row = row?; + Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) + }); + + pin_mut!(row_stream); + #[for_await] + for row in row_stream { + let row = row?; + yield row; + } + } else { + let field_map = self + .rw_schema + .fields + .iter() + .map(|f| (f.name.as_str(), f.data_type.clone())) + .collect::>(); + + // fill in start primary key params + let params: Vec<_> = primary_keys + .iter() + .zip_eq_fast(start_pk_row.unwrap().into_iter()) + .map(|(pk, datum)| { + if let Some(value) = datum { + let ty = field_map.get(pk.as_str()).unwrap(); + let val = match ty { + DataType::Boolean => Value::from(value.into_bool()), + DataType::Int16 => Value::from(value.into_int16()), + DataType::Int32 => Value::from(value.into_int32()), + DataType::Int64 => Value::from(value.into_int64()), + DataType::Float32 => Value::from(value.into_float32().into_inner()), + DataType::Float64 => Value::from(value.into_float64().into_inner()), + DataType::Varchar => Value::from(String::from(value.into_utf8())), + DataType::Date => Value::from(value.into_date().0), + DataType::Time => Value::from(value.into_time().0), + DataType::Timestamp => Value::from(value.into_timestamp().0), + _ => bail!("unsupported primary key data type: {}", ty), + }; + ConnectorResult::Ok((pk.to_lowercase(), val)) + } else { + bail!("primary key {} cannot be null", pk); + } + }) + .try_collect::<_, _, ConnectorError>()?; + + tracing::debug!("snapshot read params: {:?}", ¶ms); + let rs_stream = sql + .with(Params::from(params)) + .stream::(&mut *conn) + .await?; + + let row_stream = rs_stream.map(|row| { + // convert mysql row into OwnedRow + let mut row = row?; + Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) + }); + + pin_mut!(row_stream); + #[for_await] + for row in row_stream { + let row = row?; + yield row; + } + }; + } + + // mysql cannot leverage the given key to narrow down the range of scan, + // we need to rewrite the comparison conditions by our own. + // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y)) + fn filter_expression(columns: &[String]) -> String { + let mut conditions = vec![]; + // push the first condition + conditions.push(format!( + "({} > :{})", + Self::quote_column(&columns[0]), + columns[0].to_lowercase() + )); + for i in 2..=columns.len() { + // '=' condition + let mut condition = String::new(); + for (j, col) in columns.iter().enumerate().take(i - 1) { + if j == 0 { + condition.push_str(&format!( + "({} = :{})", + Self::quote_column(col), + col.to_lowercase() + )); + } else { + condition.push_str(&format!( + " AND ({} = :{})", + Self::quote_column(col), + col.to_lowercase() + )); + } + } + // '>' condition + condition.push_str(&format!( + " AND ({} > :{})", + Self::quote_column(&columns[i - 1]), + columns[i - 1].to_lowercase() + )); + conditions.push(format!("({})", condition)); + } + if columns.len() > 1 { + conditions.join(" OR ") + } else { + conditions.join("") + } + } + + fn quote_column(column: &str) -> String { + format!("`{}`", column) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use futures::pin_mut; + use futures_async_stream::for_await; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; + use risingwave_common::types::DataType; + + use crate::source::cdc::external::mysql::MySqlExternalTable; + use crate::source::cdc::external::{ + CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, + SchemaTableName, + }; + + #[ignore] + #[tokio::test] + async fn test_mysql_schema() { + let config = ExternalTableConfig { + connector: "mysql-cdc".to_string(), + host: "localhost".to_string(), + port: "8306".to_string(), + username: "root".to_string(), + password: "123456".to_string(), + database: "mydb".to_string(), + schema: "".to_string(), + table: "part".to_string(), + sslmode: Default::default(), + }; + + let table = MySqlExternalTable::connect(config).await.unwrap(); + println!("columns: {:?}", &table.column_descs); + println!("primary keys: {:?}", &table.pk_names); + } + + #[test] + fn test_mysql_filter_expr() { + let cols = vec!["id".to_string()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(`id` > :id)"); + + let cols = vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!( + expr, + "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" + ); + } + + #[test] + fn test_mysql_binlog_offset() { + let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; + let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#; + let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#; + let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; + let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; + + let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap()); + let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap()); + let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap()); + let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap()); + let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap()); + + assert!(off0 <= off1); + assert!(off1 > off2); + assert!(off2 < off3); + assert_eq!(off3, off4); + } + + // manual test case + #[ignore] + #[tokio::test] + async fn test_mysql_table_reader() { + let columns = vec![ + ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), + ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal), + ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar), + ColumnDesc::named("v4", ColumnId::new(4), DataType::Date), + ]; + let rw_schema = Schema { + fields: columns.iter().map(Field::from).collect(), + }; + let props: HashMap = convert_args!(hashmap!( + "hostname" => "localhost", + "port" => "8306", + "username" => "root", + "password" => "123456", + "database.name" => "mytest", + "table.name" => "t1")); + + let config = + serde_json::from_value::(serde_json::to_value(props).unwrap()) + .unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema) + .await + .unwrap(); + let offset = reader.current_cdc_offset().await.unwrap(); + println!("BinlogOffset: {:?}", offset); + + let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; + let parser = MySqlExternalTableReader::get_cdc_offset_parser(); + println!("parsed offset: {:?}", parser(off0_str).unwrap()); + let table_name = SchemaTableName { + schema_name: "mytest".to_string(), + table_name: "t1".to_string(), + }; + + let stream = reader.snapshot_read(table_name, None, vec!["v1".to_string()], 1000); + pin_mut!(stream); + #[for_await] + for row in stream { + println!("OwnedRow: {:?}", row); + } + } +} diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 6b937b713ccd..3d827bafbccb 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -13,18 +13,24 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::HashMap; -use anyhow::Context; +use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use postgres_openssl::MakeTlsConnector; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; +use sea_schema::postgres::def::{ColumnType, TableInfo}; +use sea_schema::postgres::discovery::SchemaDiscovery; use serde_derive::{Deserialize, Serialize}; +use sqlx::postgres::{PgConnectOptions, PgSslMode}; +use sqlx::PgPool; use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; use tokio_postgres::{NoTls, Statement}; @@ -72,9 +78,158 @@ impl PostgresOffset { } } +pub struct PostgresExternalTable { + column_descs: Vec, + pk_names: Vec, +} + +impl PostgresExternalTable { + pub async fn connect(config: ExternalTableConfig) -> ConnectorResult { + tracing::debug!("connect to postgres external table"); + let options = PgConnectOptions::new() + .username(&config.username) + .password(&config.password) + .host(&config.host) + .port(config.port.parse::().unwrap()) + .database(&config.database) + .ssl_mode(match config.sslmode { + SslMode::Disabled => PgSslMode::Disable, + SslMode::Preferred => PgSslMode::Prefer, + SslMode::Required => PgSslMode::Require, + }); + + let connection = PgPool::connect_with(options).await?; + let schema_discovery = SchemaDiscovery::new(connection, config.schema.as_str()); + // fetch column schema and primary key + let empty_map = HashMap::new(); + let table_schema = schema_discovery + .discover_table( + TableInfo { + name: config.table.clone(), + of_type: None, + }, + &empty_map, + ) + .await?; + + let mut column_descs = vec![]; + for col in &table_schema.columns { + let data_type = type_to_rw_type(&col.col_type)?; + column_descs.push(ColumnDesc::named( + col.name.clone(), + ColumnId::placeholder(), + data_type, + )); + } + + if table_schema.primary_key_constraints.is_empty() { + return Err(anyhow!("Postgres table doesn't define the primary key").into()); + } + let mut pk_names = vec![]; + table_schema.primary_key_constraints.iter().for_each(|pk| { + pk_names.extend(pk.columns.clone()); + }); + + Ok(Self { + column_descs, + pk_names, + }) + } + + pub fn column_descs(&self) -> &Vec { + &self.column_descs + } + + pub fn pk_names(&self) -> &Vec { + &self.pk_names + } +} + +fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { + let dtype = match col_type { + ColumnType::SmallInt | ColumnType::SmallSerial => DataType::Int16, + ColumnType::Integer | ColumnType::Serial => DataType::Int32, + ColumnType::BigInt | ColumnType::BigSerial => DataType::Int64, + ColumnType::Money | ColumnType::Decimal(_) | ColumnType::Numeric(_) => DataType::Decimal, + ColumnType::Real => DataType::Float32, + ColumnType::DoublePrecision => DataType::Float64, + ColumnType::Varchar(_) | ColumnType::Char(_) | ColumnType::Text => DataType::Varchar, + ColumnType::Bytea => DataType::Bytea, + ColumnType::Timestamp(_) => DataType::Timestamp, + ColumnType::TimestampWithTimeZone(_) => DataType::Timestamptz, + ColumnType::Date => DataType::Date, + ColumnType::Time(_) | ColumnType::TimeWithTimeZone(_) => DataType::Time, + ColumnType::Interval(_) => DataType::Interval, + ColumnType::Boolean => DataType::Boolean, + ColumnType::Point => DataType::Struct(StructType::new(vec![ + ("x", DataType::Float32), + ("y", DataType::Float32), + ])), + ColumnType::Uuid => DataType::Varchar, + ColumnType::Xml => DataType::Varchar, + ColumnType::Json => DataType::Jsonb, + ColumnType::JsonBinary => DataType::Jsonb, + ColumnType::Array(def) => { + let item_type = match def.col_type.as_ref() { + Some(ty) => type_to_rw_type(ty.as_ref())?, + None => { + return Err(anyhow!("ARRAY type missing element type").into()); + } + }; + + DataType::List(Box::new(item_type)) + } + ColumnType::PgLsn => DataType::Int64, + ColumnType::Cidr + | ColumnType::Inet + | ColumnType::MacAddr + | ColumnType::MacAddr8 + | ColumnType::Int4Range + | ColumnType::Int8Range + | ColumnType::NumRange + | ColumnType::TsRange + | ColumnType::TsTzRange + | ColumnType::DateRange + | ColumnType::Enum(_) => DataType::Varchar, + + ColumnType::Line => { + return Err(anyhow!("LINE type not supported").into()); + } + ColumnType::Lseg => { + return Err(anyhow!("LSEG type not supported").into()); + } + ColumnType::Box => { + return Err(anyhow!("BOX type not supported").into()); + } + ColumnType::Path => { + return Err(anyhow!("PATH type not supported").into()); + } + ColumnType::Polygon => { + return Err(anyhow!("POLYGON type not supported").into()); + } + ColumnType::Circle => { + return Err(anyhow!("CIRCLE type not supported").into()); + } + ColumnType::Bit(_) => { + return Err(anyhow!("BIT type not supported").into()); + } + ColumnType::TsVector => { + return Err(anyhow!("TSVECTOR type not supported").into()); + } + ColumnType::TsQuery => { + return Err(anyhow!("TSQUERY type not supported").into()); + } + ColumnType::Unknown(name) => { + // NOTES: user-defined enum type is classified as `Unknown` + tracing::warn!("Unknown Postgres data type: {name}, map to varchar"); + DataType::Varchar + } + }; + + Ok(dtype) +} + pub struct PostgresExternalTableReader { - #[expect(dead_code)] - config: ExternalTableConfig, rw_schema: Schema, field_names: String, prepared_scan_stmt: Statement, @@ -202,7 +357,6 @@ impl PostgresExternalTableReader { }; Ok(Self { - config, rw_schema, field_names, prepared_scan_stmt, @@ -298,6 +452,7 @@ impl PostgresExternalTableReader { #[cfg(test)] mod tests { + use std::collections::HashMap; use futures::pin_mut; @@ -307,9 +462,32 @@ mod tests { use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; - use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; + use crate::source::cdc::external::postgres::{ + PostgresExternalTable, PostgresExternalTableReader, PostgresOffset, + }; use crate::source::cdc::external::{ExternalTableConfig, ExternalTableReader, SchemaTableName}; + #[ignore] + #[tokio::test] + async fn test_postgres_schema() { + let config = ExternalTableConfig { + connector: "postgres-cdc".to_string(), + host: "localhost".to_string(), + port: "8432".to_string(), + username: "myuser".to_string(), + password: "123456".to_string(), + database: "mydb".to_string(), + schema: "public".to_string(), + table: "mytest".to_string(), + sslmode: Default::default(), + }; + + let table = PostgresExternalTable::connect(config).await.unwrap(); + + println!("columns: {:?}", &table.column_descs); + println!("primary keys: {:?}", &table.pk_names); + } + #[test] fn test_postgres_offset() { let off1 = PostgresOffset { txid: 4, lsn: 2 }; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 991f0fd6210e..edeb4ae5441d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; @@ -30,7 +30,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source; use risingwave_connector::source::cdc::external::{ - DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, + ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -731,72 +731,37 @@ fn gen_table_plan_inner( } #[allow(clippy::too_many_arguments)] -pub(crate) fn gen_create_table_plan_for_cdc_source( - context: OptimizerContextRef, - source_name: ObjectName, - table_name: ObjectName, +pub(crate) fn gen_create_table_plan_for_cdc_table( + handler_args: HandlerArgs, + explain_options: ExplainOptions, + source: Arc, external_table_name: String, - column_defs: Vec, - constraints: Vec, + mut columns: Vec, + pk_names: Vec, + connect_properties: BTreeMap, mut col_id_gen: ColumnIdGenerator, on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, + resolved_table_name: String, + database_id: DatabaseId, + schema_id: SchemaId, ) -> Result<(PlanRef, PbTable)> { - // cdc table must have primary key constraint or primary key column - if !constraints.iter().any(|c| { - matches!( - c, - TableConstraint::Unique { - is_primary: true, - .. - } - ) - }) && !column_defs.iter().any(|col| { - col.options - .iter() - .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true })) - }) { - return Err(ErrorCode::NotSupported( - "CDC table without primary key constraint is not supported".to_owned(), - "Please define a primary key".to_owned(), - ) - .into()); - } - + let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); let session = context.session_ctx().clone(); - let db_name = session.database(); - let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let (database_id, schema_id) = - session.get_database_and_schema_id_for_create(schema_name.clone())?; - // cdc table cannot be append-only - let append_only = false; - let (source_schema, source_name) = Binder::resolve_schema_qualified_name(db_name, source_name)?; - - let source = { - let catalog_reader = session.env().catalog_reader().read_guard(); - let schema_name = source_schema - .clone() - .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (source, _) = catalog_reader.get_source_by_name( - db_name, - SchemaPath::Name(schema_name.as_str()), - source_name.as_str(), - )?; - source.clone() - }; - - let mut columns = bind_sql_columns(&column_defs)?; - let with_properties = source.with_properties.clone(); // append additional columns to the end - handle_addition_columns(&with_properties, include_column_options, &mut columns, true)?; + handle_addition_columns( + &connect_properties, + include_column_options, + &mut columns, + true, + )?; for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; let (columns, pk_column_ids, _row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -817,9 +782,6 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( .map(|idx| ColumnOrder::new(*idx, OrderType::ascending())) .collect(); - let connect_properties = - derive_connect_properties(source.as_ref(), external_table_name.clone())?; - let cdc_table_desc = CdcTableDesc { table_id: TableId::placeholder(), // will be filled in meta node source_id: source.id.into(), // id of cdc source streaming job @@ -827,7 +789,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), stream_key: pk_column_indices, - connect_properties, + connect_properties: connect_properties.into_iter().collect(), }; tracing::debug!(?cdc_table_desc, "create cdc table"); @@ -853,12 +815,12 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( let materialize = plan_root.gen_table_plan( context, - name, + resolved_table_name, columns, definition, pk_column_ids, None, - append_only, + false, on_conflict, with_version_column, vec![], @@ -875,13 +837,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( } fn derive_connect_properties( - source: &SourceCatalog, + source_with_properties: &BTreeMap, external_table_name: String, ) -> Result> { use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR}; // we should remove the prefix from `full_table_name` - let mut connect_properties = source.with_properties.clone(); - if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY) { + let mut connect_properties = source_with_properties.clone(); + if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) { let table_name = match connector.as_str() { MYSQL_CDC_CONNECTOR => { let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| { @@ -912,7 +874,7 @@ fn derive_connect_properties( }; connect_properties.insert(TABLE_NAME_KEY.into(), table_name.into()); } - Ok(connect_properties.into_iter().collect()) + Ok(connect_properties) } #[allow(clippy::too_many_arguments)] @@ -978,52 +940,65 @@ pub(super) async fn handle_create_table_plan( } (None, Some(cdc_table)) => { - let context = OptimizerContext::new(handler_args, explain_options); - if append_only { - return Err(ErrorCode::NotSupported( - "append only modifier on the table created from a CDC source".into(), - "Remove the APPEND ONLY clause".into(), - ) - .into()); - } + sanity_check_for_cdc_table( + append_only, + &column_defs, + &wildcard_idx, + &constraints, + &source_watermarks, + )?; - if !source_watermarks.is_empty() { - return Err(ErrorCode::NotSupported( - "watermark defined on the table created from a CDC source".into(), - "Remove the Watermark definitions".into(), - ) - .into()); - } - if wildcard_idx.is_some() { - return Err(ErrorCode::NotSupported( - "star(\"*\") defined on the table created from a CDC source".into(), - "Remove the star(\"*\") in the column list".into(), - ) - .into()); - } - for c in &column_defs { - for op in &c.options { - if let ColumnOption::GeneratedColumns(_) = op.option { - return Err(ErrorCode::NotSupported( - "generated column defined on the table created from a CDC source" - .into(), - "Remove the generated column in the column list".into(), - ) - .into()); - } - } - } - let (plan, table) = gen_create_table_plan_for_cdc_source( - context.into(), - cdc_table.source_name.clone(), - table_name.clone(), + let session = &handler_args.session; + let db_name = session.database(); + let (schema_name, resolved_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name)?; + let (database_id, schema_id) = + session.get_database_and_schema_id_for_create(schema_name.clone())?; + + // cdc table cannot be append-only + let (source_schema, source_name) = + Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; + + let source = { + let catalog_reader = session.env().catalog_reader().read_guard(); + let schema_name = source_schema + .clone() + .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (source, _) = catalog_reader.get_source_by_name( + db_name, + SchemaPath::Name(schema_name.as_str()), + source_name.as_str(), + )?; + source.clone() + }; + let connect_properties = derive_connect_properties( + &source.with_properties, cdc_table.external_table_name.clone(), - column_defs, - constraints, + )?; + + let (columns, pk_names) = derive_schema_for_cdc_table( + &column_defs, + &constraints, + connect_properties.clone(), + wildcard_idx.is_some(), + ) + .await?; + + let (plan, table) = gen_create_table_plan_for_cdc_table( + handler_args, + explain_options, + source, + cdc_table.external_table_name.clone(), + columns, + pk_names, + connect_properties, col_id_gen, on_conflict, with_version_column, include_column_options, + resolved_table_name, + database_id, + schema_id, )?; ((plan, None, table), TableJobType::SharedCdcSource) @@ -1038,6 +1013,108 @@ pub(super) async fn handle_create_table_plan( Ok((plan, source, table, job_type)) } +fn sanity_check_for_cdc_table( + append_only: bool, + column_defs: &Vec, + wildcard_idx: &Option, + constraints: &Vec, + source_watermarks: &Vec, +) -> Result<()> { + for c in column_defs { + for op in &c.options { + if let ColumnOption::GeneratedColumns(_) = op.option { + return Err(ErrorCode::NotSupported( + "generated column defined on the table created from a CDC source".into(), + "Remove the generated column in the column list".into(), + ) + .into()); + } + } + } + + // wildcard cannot be used with column definitions + if wildcard_idx.is_some() && !column_defs.is_empty() { + return Err(ErrorCode::NotSupported( + "wildcard(*) and column definitions cannot be used together".to_owned(), + "Remove the wildcard or column definitions".to_owned(), + ) + .into()); + } + + // cdc table must have primary key constraint or primary key column + if !wildcard_idx.is_some() + && !constraints.iter().any(|c| { + matches!( + c, + TableConstraint::Unique { + is_primary: true, + .. + } + ) + }) + && !column_defs.iter().any(|col| { + col.options + .iter() + .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true })) + }) + { + return Err(ErrorCode::NotSupported( + "CDC table without primary key constraint is not supported".to_owned(), + "Please define a primary key".to_owned(), + ) + .into()); + } + if append_only { + return Err(ErrorCode::NotSupported( + "append only modifier on the table created from a CDC source".into(), + "Remove the APPEND ONLY clause".into(), + ) + .into()); + } + + if !source_watermarks.is_empty() { + return Err(ErrorCode::NotSupported( + "watermark defined on the table created from a CDC source".into(), + "Remove the Watermark definitions".into(), + ) + .into()); + } + + Ok(()) +} + +async fn derive_schema_for_cdc_table( + column_defs: &Vec, + constraints: &Vec, + connect_properties: BTreeMap, + need_auto_schema_map: bool, +) -> Result<(Vec, Vec)> { + // read cdc table schema from external db or parsing the schema from SQL definitions + if need_auto_schema_map { + let config = ExternalTableConfig::try_from_btreemap(connect_properties) + .context("failed to extract external table config")?; + + let table = ExternalTableImpl::connect(config).await?; + Ok(( + table + .column_descs() + .iter() + .cloned() + .map(|column_desc| ColumnCatalog { + column_desc, + is_hidden: false, + }) + .collect(), + table.pk_names().clone(), + )) + } else { + Ok(( + bind_sql_columns(column_defs)?, + bind_sql_pk_names(column_defs, constraints)?, + )) + } +} + #[allow(clippy::too_many_arguments)] pub async fn handle_create_table( handler_args: HandlerArgs, diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index bc54772f498f..85c2f6b2ab17 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -264,8 +264,9 @@ mod tests { use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + use risingwave_connector::source::cdc::external::mysql::MySqlExternalTableReader; use risingwave_connector::source::cdc::external::{ - ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, SchemaTableName, + ExternalTableConfig, ExternalTableReader, SchemaTableName, }; use crate::executor::backfill::utils::{get_new_pos, iter_chunks};