Change Data Capture (CDC) source
that captures and streams change events from various databases.
Currently it supports MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
Build upon Debezium Embedded Connector, the CDC Source
allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
All Debezium configuration properties are supported. Just prefix the properties with the cdc.config.
prefix. For example to set the connector.class
use the cdc.config.connector.class
instead.
For continence some of the most relevant properties are provided with springified
shortcuts to allow easy configuration and auto-completion features. For example instead of cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
you can use the shorter cdc.connector=mysql
shortcut. Complete list of shortcuts and their Debezium counterparts are listed in the table below.
When both the Debezium (e.g. cdc.config.XXX
) and the shortcut are set for the same property then the cdc.config.XXX
has a precedence!
Also we have provided a new default BackingOffsetStore
based on the MetadataStore
service allowing to leverage the microservices friendly ways for storing the offset metadata.
- cdc.config
-
Spring pass-trough wrapper for the debezium configuration properties. All properties with 'cdc.config' prefix are converted into Debezium io.debezium.config.Configuration and the prefix is dropped. (Map<String, String>, default:
<none>
) - cdc.connector
-
Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other. (ConnectorType, default:
<none>
, possible values:mysql
,postgres
,mongodb
,oracle
,sqlserver
) - cdc.flattering.add-source-fields
-
Fields from the change event’s source structure to add as metadata (prefixed with "__") to the flattened record (String, default:
<none>
) - cdc.flattering.delete-handling-mode
-
How to handle delete records. Options are: (1) none - records are passed, (2) drop - records are removed and (3) rewrite - adds '__deleted' field to the records. (DeleteHandlingMode, default:
<none>
, possible values:drop
,rewrite
,none
) - cdc.flattering.drop-tombstones
-
Debezium by default generates a tombstone record to enable Kafka compaction after a delete record was generated. This record is usually filtered out to avoid duplicates as a delete record is converted to a tombstone record, too. (Boolean, default:
true
) - cdc.flattering.enabled
-
Enable flattering the source record events (https://debezium.io/docs/configuration/event-flattening). (Boolean, default:
true
) - cdc.flattering.operation-header
-
The adds the event operation (as obtained from the op field of the original record) as a message header called cdc_operation (Boolean, default:
false
) - cdc.name
-
Unique name for this sourceConnector instance. (String, default:
<none>
) - cdc.offset.commit-timeout
-
Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. (Duration, default:
5000ms
) - cdc.offset.flush-interval
-
Interval at which to try committing offsets. The default is 1 minute. (Duration, default:
60000ms
) - cdc.offset.policy
-
Offset storage commit policy. (OffsetPolicy, default:
<none>
) - cdc.offset.storage
-
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading. (OffsetStorageType, default:
<none>
, possible values:memory
,file
,kafka
,metadata
) - cdc.schema
-
If set then the value's schema is included as part of the the outbound message. (Boolean, default:
false
) - cdc.stream.header.offset
-
When true the source record's offset metadata is serialized into the outbound message header under cdc.offset. (Boolean, default:
false
) e == Database Support
The CDC Source
is based on Debezium, which currently support the following five datastores: MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
In order to run the CdcSourceIntegrationTests
integration tests you need to configure source required source databases.
Instructions below explains how to run pre-configured test databases form Docker images.
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
Tip
|
(optional) Use docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz'; |
Use following properties to connect the CDC Source to the MySQL DB:
cdc.connector=mysql # (1)
cdc.name=my-sql-connector # (2)
cdc.config.database.server.id=85744 # (2)
cdc.config.database.server.name=my-app-connector # (2)
cdc.config.database.user=debezium # (3)
cdc.config.database.password=dbz # (3)
cdc.config.database.hostname=localhost # (3)
cdc.config.database.port=3306 # (3)
cdc.schema=true # (4)
cdc.flattering.enabled=true # (5)
-
Configures the CDC Source to use MySqlConnector. (equivalent to setting
cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
). -
Metadata used to identify and dispatch the incoming events.
-
Connection to the MySQL server running on
localhost:3306
asdebezium
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattering.
You can run also the CdcSourceIntegrationTests#CdcMysqlTests
using this mysql configuration.
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the CDC Source to the PostgreSQL:
cdc.connector=postgres # (1)
cdc.offset.storage=memory #(2)
cdc.name=my-sql-connector # (3)
cdc.config.database.server.id=85744 # (3)
cdc.config.database.server.name=my-app-connector # (3)
cdc.config.database.user=postgres # (4)
cdc.config.database.password=postgres # (4)
cdc.config.database..dbname=postgres # (4)
cdc.config.database.hostname=localhost # (4)
cdc.config.database.port=5432 # (4)
cdc.schema=true # (5)
cdc.flattering.enabled=true # (6)
-
Configures
CDC Source
to use PostgresConnector. Equivalent for settingcdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the PostgreSQL server running on
localhost:5432
aspostgres
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattering.
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
Start a pre-configured mongodb from the debezium/example-mongodb:0.10
Docker image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the CDC Source to the MongoDB:
cdc.connector=mongodb # (1)
cdc.offset.storage=memory #(2)
cdc.config.mongodb.hosts=rs0/localhost:27017 # (3)
cdc.config.mongodb.name=dbserver1 # (3)
cdc.config.mongodb.user=debezium # (3)
cdc.config.mongodb.password=dbz # (3)
cdc.config.database.whitelist=inventory # (3)
cdc.config.tasks.max=1 # (4)
cdc.schema=true # (5)
cdc.flattering.enabled=true # (6)
-
Configures
CDC Source
to use MongoDB Connector. This maps intocdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Connection to the MongoDB running on
localhost:27017
asdebezium
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattering.
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium’s sqlserver tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the CDC Source to the SQLServer:
cdc.connector=sqlserver # (1)
cdc.offset.storage=memory #(2)
cdc.name=my-sql-connector # (3)
cdc.config.database.server.id=85744 # (3)
cdc.config.database.server.name=my-app-connector # (3)
cdc.config.database.user=sa # (4)
cdc.config.database.password=Password! # (4)
cdc.config.database..dbname=testDB # (4)
cdc.config.database.hostname=localhost # (4)
cdc.config.database.port=1433 # (4)
-
Configures
CDC Source
to use SqlServerConnector. Equivalent for settingcdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the SQL Server running on
localhost:1433
assa
user.
You can run also the CdcSourceIntegrationTests#CdcSqlServerTests
using this mysql configuration.
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium’s Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
Build involves two-stages. First build the apps and generate the binder specific app starters projects:
$ ./mvnw clean install -PgenerateApps
You can find the corresponding binder based projects in the apps
subfolder. You can then cd into the apps folder:
$ cd apps
and build all binder projects
$ ./mvnw clean package
java -jar cdc-debezium-source.jar --cdc.connector=mysql --cdc.name=my-sql-connector --cdc.config.database.server.id=85744 --cdc.config.database.server.name=my-app-connector --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.hostname=localhost --cdc.config.database.port=3306 --cdc.schema=true --cdc.flattering.enabled=true
Following table illustrates how the .Table Shortcut Properties Mapping
Shortcut | Original | Description |
---|---|---|
cdc.connector |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattering.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattering.delete-handling-mode |
cdc.config.delete.handling.mode |
|