Skip to content

Debezium Source Connector from SQL Server to Apache Kafka

Notifications You must be signed in to change notification settings

kayvansol/kafka-source

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Debezium source connector from SQL Server to Apache Kafka

alt text

Docker file for the connector that installing debezium-connector-sqlserver library Dockerfile:

FROM docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest

RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.6 --no-prompt
RUN confluent-hub install debezium/debezium-connector-sqlserver:2.4.2 --no-prompt

the build command :

docker build -t docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest .

alt text

cd kafka-source

Docker Compose file docker-compose.yml :

---
version: '3'

services:

  zookeeper2:
    image: docker.arvancloud.ir/confluentinc/cp-zookeeper:latest
    hostname: zookeeper2
    container_name: zookeeper2
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-network1

  kafka1:
    image: docker.arvancloud.ir/confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    depends_on:
      - zookeeper2
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper2:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka1:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper2:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
    networks:
      - kafka-network1

  connect1:
    image: docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest
    hostname: connect1
    container_name: connect1
    depends_on:
      - zookeeper2
      - kafka1
    ports:
      - '8083:8083'
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka1:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper2:2181'
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: 'io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor'
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: 'io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components'
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    networks:
      - kafka-network1

  akhq1:
    image: tchiotludo/akhq
    hostname: web-ui1
    container_name: web-ui1
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka1:29092"              
              connect:
                - name: "connect"
                  url: "http://connect1:8083"
#             ksqldb:
#               - name: "ksqldb"
#                 url: "http://ksqldb-server:8088"

    ports:
      - 8080:8080
    links:
      - kafka1
    networks:
      - kafka-network1

networks:
  kafka-network1:
    driver: bridge
    name: kafka-network1
docker compose up

Docker Desktop :

alt text

then check all the container's logs for being healthy.

Configuring Microsoft SQL Server to Enable CDC :


CREATE DATABASE Test_DB;
GO
USE Test_DB;
EXEC sys.sp_cdc_enable_db;ALTER DATABASE Test_DB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)


CREATE TABLE prospects (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'prospects', @role_name = NULL, @supports_net_changes = 0;
GO

Restart SQL Server instance and agent

alt text

alt text

alt text

then create a kafka connector to sql server :

docker exec -it connect1 bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors -d '{ "name": "debezium-connector", 
"config": { 
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "192.168.1.4", 
    "database.port": "1433", 
    "database.user": "sa",
    "database.password": "ABCabc123456", 
    "database.dbname": "Test_DB", 
    "database.names":"Test_DB",
    "database.server.name": "192.168.1.4", 
    "table.whitelist": "dbo.prospects", 
    "topic.prefix": "fullfillment",
    "database.history.kafka.bootstrap.servers": "kafka1:29092", 
    "database.history.kafka.topic": "schema-changes-topic",
    "errors.log.enable": "true",
    "schema.history.internal.kafka.bootstrap.servers": "kafka1:29092",  
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "database.trustServerCertificate": true  } 
}';

alt text

alt text

and after inserting new data to related sql server table, the connector sync kafka topic with the new data :

alt text

go to http://localhost:8080/ui/docker-kafka-server/topic/usertopic/data?sort=Oldest&partition=All

alt text

alt text

alt text