Skip to content

josephmachado/change_data_capture

Repository files navigation

Change Data Capture

Repository for the Change Data Capture with Debezium blog at startdataengineering.com.

Project Design

Run on codespaces

You can run this CDC data pipeline using GitHub codespaces. Follow the instructions below.

  1. Create codespaces by going to the change_data_capture repository, cloning(or fork) it and then clicking on Create codespaces on main button.
  2. Wait for codespaces to start, then in the terminal type make up && sleep 60 && make connectors && sleep 60.
  3. Wait for the above to complete, it can take up a couple of minutes.
  4. Go to the ports tab and click on the link exposing port 9001 to access Minio (open source S3) UI.
  5. In the minio UI, use minio, and minio123 as username and password respectively. In the minio UI you will be able to see the the paths commerce/debezium.commerce.products and commerce/debezium.commerce.users paths, which have json files in them. The json files contain data about the create, updates and deletes in the respective products and users tables.

NOTE: The screenshots below, show the general process to start codespaces, please follow the instructions shown above for this project.

codespace start codespace make up codespace access ui

Note Make sure to switch off codespaces instance, you only have limited free usage; see docs here.

Prerequisites

  1. git version >= 2.37.1
  2. Docker version >= 20.10.17 and Docker compose v2 version >= v2.10.2. Make sure that docker is running using docker ps
  3. pgcli

Windows users: please setup WSL and a local Ubuntu Virtual machine following the instructions here. Install the above prerequisites on your ubuntu terminal; if you have trouble installing docker, follow the steps here (only Step 1 is necessary). Please install the make command with sudo apt install make -y (if its not already present).

Setup

All the commands shown below are to be run via the terminal (use the Ubuntu terminal for WSL users). We will use docker to set up our containers. Clone and move into the lab repository, as shown below.

git clone https://github.com/josephmachado/change_data_capture.git
cd change_data_capture

We have some helpful make commands to make working with our systems more accessible. Shown below are the make commands and their definitions

  1. make up: Spin up the docker containers for Postgres, data generator, Kafka Connect, Kafka, & minio (open source S3 alternative). Note this also sets up Postgres tables and starts a python script to create-delete-update rows in those tables.
  2. make conenctors: Set up the debezium connector to start recording changes from Postgres and another connector to push this data into minio.
  3. make down: Stop the docker containers.

You can see the commands in this Makefile. If your terminal does not support make commands, please use the commands in the Makefile directly. All the commands in this book assume that you have the docker containers running.

In your terminal, do the following:

# Make sure docker is running using docker ps
make up # starts the docker containers
sleep 60 # wait 1 minute for all the containers to set up
make connectors # Sets up the connectors
sleep 60 # wait 1 minute for some data to be pushed into minio
make minio-ui # opens localhost:9001

In the minio UI, use minio, and minio123 as username and password respectively. In the minio UI you will be able to see the the paths commerce/debezium.commerce.products and commerce/debezium.commerce.users paths, which have json files in them. The json files contain data about the create, updates and deletes in the respective products and users tables.

Analyze data with duckDB

Access the data in minio via filesystem

We mount a local folder to minio container which allows us to access the data in minio via filesystem. We can start a Python REPL to run DuckDB as shown below:

python

Now let's create a SCD2 for products table from the data we have in minio. Note we are only looking at rows that have updates and deletes in them (see the where id in filter in the below query).

import duckdb as d
d.sql("""
    WITH products_create_update_delete AS (
        SELECT
            COALESCE(CAST(json->'value'->'after'->'id' AS INT), CAST(json->'value'->'before'->'id' AS INT)) AS id,
            json->'value'->'before' AS before_row_value,
            json->'value'->'after' AS after_row_value,
            CASE
                WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"c"' THEN 'CREATE'
                WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"d"' THEN 'DELETE'
                WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"u"' THEN 'UPDATE'
                WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"r"' THEN 'SNAPSHOT'
                ELSE 'INVALID'
            END AS operation_type,
            CAST(json->'value'->'source'->'lsn' AS BIGINT) AS log_seq_num,
            epoch_ms(CAST(json->'value'->'source'->'ts_ms' AS BIGINT)) AS source_timestamp
        FROM
            read_ndjson_objects('minio/data/commerce/debezium.commerce.products/*/*/*.json')
        WHERE
            log_seq_num IS NOT NULL
    )
    SELECT
        id,
        CAST(after_row_value->'name' AS VARCHAR(255)) AS name,
        CAST(after_row_value->'description' AS TEXT) AS description,
        CAST(after_row_value->'price' AS NUMERIC(10, 2)) AS price,
        source_timestamp AS row_valid_start_timestamp,
        CASE 
            WHEN LEAD(source_timestamp, 1) OVER lead_txn_timestamp IS NULL THEN CAST('9999-01-01' AS TIMESTAMP) 
            ELSE LEAD(source_timestamp, 1) OVER lead_txn_timestamp 
        END AS row_valid_expiration_timestamp
    FROM products_create_update_delete
    WHERE id in (SELECT id FROM products_create_update_delete GROUP BY id HAVING COUNT(*) > 1)
    WINDOW lead_txn_timestamp AS (PARTITION BY id ORDER BY log_seq_num )
    ORDER BY id, row_valid_start_timestamp
    LIMIT
        200;
    """).execute()

Access data via s3 api

We can also access the data via the S3 API in duckdb as shown in this example SQL query.

References

  1. Debezium postgre docs
  2. Redpanda CDC example
  3. duckDB docs
  4. Kafka docs
  5. Minio DuckDB example