Skip to content

Commit

Permalink
feat(cdc): map upstream table schema automatically for cdc table (#16986
Browse files Browse the repository at this point in the history
)
  • Loading branch information
StrikeW committed Jun 10, 2024
1 parent 9e137fc commit 45d3b76
Show file tree
Hide file tree
Showing 14 changed files with 1,280 additions and 476 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ sea-orm = { version = "0.12.14", features = [
"sqlx-sqlite",
"runtime-tokio-native-tls",
] }
sqlx = "0.7"
tokio-util = "0.7"
tracing-opentelemetry = "0.22"
rand = { version = "0.8", features = ["small_rng"] }
Expand Down
118 changes: 118 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

# test case need to cover all data types
system ok
mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql --protocol=tcp -u root mytest -e "
DROP TABLE IF EXISTS mysql_types_test;
CREATE TABLE IF NOT EXISTS mysql_types_test(
c_boolean boolean,
c_bit bit,
c_tinyint tinyint,
c_smallint smallint,
c_mediumint mediumint,
c_integer integer,
c_Bigint bigint,
c_decimal decimal,
c_float float,
c_double double,
c_char_255 char(255),
c_varchar_10000 varchar(10000),
c_binary_255 binary(255),
c_varbinary_10000 varbinary(10000),
c_date date,
c_time time,
c_datetime datetime,
c_timestamp timestamp,
c_enum ENUM('happy','sad','ok'),
c_json JSON,
PRIMARY KEY (c_boolean,c_Bigint,c_date)
);
INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]');
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'root',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5601'
);


statement ok
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test';

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_mysql_types_test;
----
c_boolean smallint false NULL
c_bit boolean false NULL
c_tinyint smallint false NULL
c_smallint smallint false NULL
c_mediumint integer false NULL
c_integer integer false NULL
c_Bigint bigint false NULL
c_decimal numeric false NULL
c_float real false NULL
c_double double precision false NULL
c_char_255 character varying false NULL
c_varchar_10000 character varying false NULL
c_binary_255 bytea false NULL
c_varbinary_10000 bytea false NULL
c_date date false NULL
c_time time without time zone false NULL
c_datetime timestamp without time zone false NULL
c_timestamp timestamp with time zone false NULL
c_enum character varying false NULL
c_json jsonb false NULL
primary key c_boolean, c_Bigint, c_date NULL NULL
distribution key c_boolean, c_Bigint, c_date NULL NULL
table description rw_mysql_types_test NULL NULL

query TTTTTTTTTTTTT
SELECT
c_boolean,
c_bit,
c_tinyint,
c_smallint,
c_mediumint,
c_integer,
"c_Bigint",
c_decimal,
c_float,
c_double,
c_char_255,
c_varchar_10000,
c_binary_255
FROM rw_mysql_types_test order by c_boolean;
----
0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

query TTTTTTTT
SELECT
c_varbinary_10000,
c_date,
c_time,
c_datetime,
c_timestamp,
c_enum,
c_json
FROM rw_mysql_types_test order by c_boolean;
----
\x 1001-01-01 NULL 2000-01-01 00:00:00 NULL happy [1, 2]
\x 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 sad [3, 4]

statement ok
drop source mysql_source cascade;
168 changes: 168 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
control substitution on

# test case need to cover all data types
system ok
psql -c "
DROP TABLE IF EXISTS postgres_types_test;
CREATE TABLE IF NOT EXISTS postgres_types_test(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_uuid uuid,
c_enum mood,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid_array uuid[],
c_enum_array mood[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_types_test VALUES ( False, 0, 0, 0, 0, 0, 0, '', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]);
INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}');
INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
"

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'pg_slot'
);


statement ok
create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test';

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_postgres_types_test;
----
c_boolean boolean false NULL
c_smallint smallint false NULL
c_integer integer false NULL
c_bigint bigint false NULL
c_decimal numeric false NULL
c_real real false NULL
c_double_precision double precision false NULL
c_varchar character varying false NULL
c_bytea bytea false NULL
c_date date false NULL
c_time time without time zone false NULL
c_timestamp timestamp without time zone false NULL
c_timestamptz timestamp with time zone false NULL
c_interval interval false NULL
c_jsonb jsonb false NULL
c_uuid character varying false NULL
c_enum character varying false NULL
c_boolean_array boolean[] false NULL
c_smallint_array smallint[] false NULL
c_integer_array integer[] false NULL
c_bigint_array bigint[] false NULL
c_decimal_array numeric[] false NULL
c_real_array real[] false NULL
c_double_precision_array double precision[] false NULL
c_varchar_array character varying[] false NULL
c_bytea_array bytea[] false NULL
c_date_array date[] false NULL
c_time_array time without time zone[] false NULL
c_timestamp_array timestamp without time zone[] false NULL
c_timestamptz_array timestamp with time zone[] false NULL
c_interval_array interval[] false NULL
c_jsonb_array jsonb[] false NULL
c_uuid_array character varying[] false NULL
c_enum_array character varying[] false NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_postgres_types_test NULL NULL

query TTTTTTT
SELECT
c_boolean,
c_smallint,
c_integer,
c_bigint,
c_decimal,
c_real,
c_double_precision,
c_varchar,
c_bytea from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 d \x3030
f 1 123 1234567890 123.45 123.45 123.456 a_varchar \x4445414442454546

query TTTTT
SELECT
c_date,
c_time,
c_timestamp,
c_timestamptz,
c_interval from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day

query TTTTTTT
SELECT
c_jsonb,
c_uuid,
c_enum,
c_boolean_array,
c_smallint_array,
c_integer_array,
c_bigint_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
{} bb488f9b-330d-4012-b849-12adeb49e57e happy {f} {-32767} {-2147483647} {-9223372036854775807}
"hello" 123e4567-e89b-12d3-a456-426614174000 happy {NULL,t} {NULL,1} {NULL,123} {NULL,1234567890}

query TTTTTTTTTTTTT
SELECT
c_decimal_array,
c_real_array,
c_double_precision_array,
c_varchar_array,
c_bytea_array,
c_date_array,
c_time_array,
c_timestamp_array,
c_timestamptz_array,
c_interval_array,
c_jsonb_array,
c_uuid_array,
c_enum_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
{-10.0} {-10000} {-10000} {""} {"\\x3030"} {0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL {"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} {happy,ok,sad}
{NULL,123.45} {NULL,123.45} {NULL,123.456} {NULL,a_varchar} {NULL,"\\x4445414442454546"} {NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL {NULL,"\"hello\""} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL

statement ok
drop source pg_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ private void validateTableSchema() throws SQLException {
var field = res.getString(1);
var dataType = res.getString(2);
var key = res.getString(3);
schema.put(field.toLowerCase(), dataType);
schema.put(field, dataType);
if (key.equalsIgnoreCase("PRI")) {
// RisingWave always use lower case for column name
pkFields.add(field.toLowerCase());
pkFields.add(field);
}
}

Expand All @@ -208,7 +207,7 @@ private void validateTableSchema() throws SQLException {
if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) {
continue;
}
var dataType = schema.get(e.getKey().toLowerCase());
var dataType = schema.get(e.getKey());
if (dataType == null) {
throw ValidatorUtils.invalidArgument(
"Column '" + e.getKey() + "' not found in the upstream database");
Expand Down
3 changes: 2 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::mysql::MySqlOffset;
use risingwave_connector::source::cdc::external::{
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, MySqlOffset, SchemaTableName,
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName,
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = { version = "0.13.3", features = ["hints"] }
sqlx = { workspace = true }
strum = "0.26"
strum_macros = "0.26"
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def_anyhow_newtype! {
// Connector errors
opendal::Error => transparent, // believed to be self-explanatory

sqlx::Error => transparent, // believed to be self-explanatory
mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
apache_avro::Error => "Avro error",
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ macro_rules! impl_cdc_source_type {
}
)*

#[derive(Clone)]
pub enum CdcSourceType {
$(
$cdc_source_type,
Expand Down
Loading

0 comments on commit 45d3b76

Please sign in to comment.