It is a new way to Capture data that has been added to, updated, or deleted from Oracle RDBMS with Change Data Capture (CDC).
Confluent’s Oracle CDC Source Connector is a plug-in for Kafka Connect, which (surprise) connects Oracle as a source into Kafka as a destination.
This uses Docker Compose to run the Kafka Connect worker and other kafka dependency.
In docker-compose.yml,I already added the both confluent oracle cdc source and s3 sink connectors (change version if you needed)
- Install Docker (for kafka)
- Running oracle and its details
Hostname
,username
,password
,port
,sid
,database
and other details. - Create the S3 bucket, make a note of the region
- Obtain your access key pair
- set
environment
variable forAWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
#In linux
export AWS_ACCESS_KEY_ID=xxxxxxxxxxxxxxxxxxx
export AWS_SECRET_ACCESS_KEY=yyyyyyyyyyyyyyyyyyyyyyy
- Clone this repo and Bring the Docker Compose up
docker-compose up -d
- Make sure everything is up and running
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafka-connect bash -c # Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp
echo "Installing ...
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp
- Create the oracle cdc Source connector
curl -d '{
"name": "CDC-oracle",
"config": {
"connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
"name": "CDC-oracle",
"tasks.max":1,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"oracle.server": "<hostname>",
"oracle.port": "1521",
"oracle.sid": "<sid>",
"oracle.username": "<username>",
"oracle.password": "<password>",
"start.from":"snapshot",
"redo.log.consumer.bootstrap.servers":"localhost:9092",
"table.inclusion.regex":"<databaseName>\\.<username>\\.<tableName>",
"table.topic.name.template": "${connectorName}-${tableName}",
"connection.pool.max.size": 20,
"confluent.topic.replication.factor":1,
"topic.creation.groups": "redo",
"topic.creation.redo.include": "oracle-redo-log-topic",
"topic.creation.redo.replication.factor": 1,
"topic.creation.redo.partitions": 1,
"topic.creation.redo.cleanup.policy": "delete",
"topic.creation.redo.retention.ms": 1209600000,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact"
}
}' -H 'Content-Type: application/json' http://0.0.0.0:8083/connectors/
customise the connector for your environment for more information about oracle cdc source connector
-
Check that required connectors are loaded successfully (oracle-12 is connector name) run this line:
curl http://0.0.0.0:8083/connectors/oracle-12/status | jq .
-
Now Check oracle data in kafka topic
Run ksqldb cli:
docker exec -it ksqldb ksql http://ksqldb:8088
- For show connector list
SHOW CONNECTORS;
- For show topics list
SHOW TOPICS;
- For show connector list
PRINT "<connectorName-tableName>" FROM BEGINNING;
for more details of ksqldb
- Now we make s3 sink connector(Kafka topic to s3)
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
-d '
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "<connectorName-tableName>",
"s3.region": "us-east-2",
"s3.bucket.name": "kafkatestdata",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE"
}'
Things to customise for your environment:
topics
: the source topic(s) you want to send to S3key.converter
: match the serialisation of your source data linkvalue.converter
: match the serialisation of your source data link- And many more
- Bravo 🎉🎉,All done now check data into S3.
References