# Learn How to Move CDC Enabled ScyllaDB Tables into Hudi Datalakes 

# prerequisites 

### Docker compose file 
```
version: '3'

services:
  scylla:
    image: scylladb/scylla:4.6.0
    expose:
      - "9042"
    ports:
      - "9042:9042"
```


#### Create Samople table
```
-- Create Keyspace if not already created
CREATE KEYSPACE IF NOT EXISTS my_keyspace
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};

USE my_keyspace;

-- Create Table with new columns: city, state, and ts (auto-generated timestamp)
-- Create Table with CDC enabled
CREATE TABLE IF NOT EXISTS users (
    user_id int PRIMARY KEY,
    name text,
    age int,
    city text,
    state text
) WITH cdc = {'enabled': true};  -- Enabling CDC

-- Insert Data (you can leave out the ts column as it will be auto-generated)
INSERT INTO users (user_id, name, age, city, state)
VALUES (1, 'Alice', 30, 'Seattle', 'WA');

INSERT INTO users (user_id, name, age, city, state)
VALUES (2, 'Bob', 25, 'San Francisco', 'CA');

INSERT INTO users (user_id, name, age, city, state)
VALUES (3, 'Charlie', 28, 'Austin', 'TX');

-- Select to verify inserted data
SELECT * FROM users;
```

### Install DEP
```
!/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9 -m pip install pyspark==3.4.0
!/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9 -m pip install cassandra-driver

```


In [1]:
! ls

[34mcheckpoints[m[m         [31mdocker-compose.yaml[m[m [31mlab1.py[m[m             [31mscyla_batch.py[m[m
[31mdemo.sql[m[m            [31mhudi-scylladb.ipynb[m[m [31mnotes[m[m               [31mspark_lab1.py[m[m


In [2]:
! docker-compose up --build -d

[1A[1B[0G[?25l[+] Building 0.0s (0/0)                                    docker:desktop-linux
[?25h[1A[1B[0G[?25l[+] Running 2/0
 [32m✔[0m Network sylladb_default     [32mCreated[0m                                     [34m0.0s [0m
 [32m✔[0m Container sylladb-scylla-1  [32mCreated[0m                                     [34m0.0s [0m
[?25h[1A[1A[1A[0G[?25l[34m[+] Running 2/2[0m
 [32m✔[0m Network sylladb_default     [32mCreated[0m                                     [34m0.0s [0m
 [32m✔[0m Container sylladb-scylla-1  [32mStarted[0m                                     [34m0.0s [0m
[?25h

# Create Spark Session

In [3]:
from pyspark.sql import SparkSession
import os, sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.773 pyspark-shell"

os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config("fs.s3a.prefetch.enable", "false") \
    .config("fs.s3a.experimental.fadvise", "random") \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()


Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cd51692b-95db-42e4-8485-efbdf61670a0;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in local-m2-cache
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in local-m2-cache
	found com.amazonaws#aws-java-sdk-bundle;1.12.773 in central
downloading file:/Users/soumilshah/.m2/repository/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar ...
	[SUCCESSFUL ] org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0!hudi-spark3.4-bundle_2.12.jar (137ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.4!hadoop-aws.jar (127ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.773/aws-java-sdk-bundle-1.12.773.jar ...
	[SUCCESSFUL ] com.amazonaws#aws-java-sdk-bundle;1.12.773!aws-java-sdk-bundle.jar (6407ms)
downloading file:/Users/soumilshah/.m2/repository/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-o

In [4]:
spark

# Python helper class


In [5]:
try:
    import json, os
    from cassandra.cluster import Cluster
    from cassandra.query import dict_factory
    from datetime import datetime
    from datetime import datetime, timezone
    from uuid import UUID
except Exception as e:
    print("Library Missing ", e)


class ScyllaCDCReader:
    def __init__(self, host='127.0.0.1', keyspace='my_keyspace', table_name='users_scylla_cdc_log',
                 checkpoint_dir='checkpoints', batch_size=10):
        self.cluster = Cluster([host])
        self.session = self.cluster.connect(keyspace)
        self.session.row_factory = dict_factory  # This will return rows as dictionaries
        self.table_name = table_name
        self.checkpoint_dir = checkpoint_dir
        self.checkpoint_file = os.path.join(checkpoint_dir, f'{table_name}_checkpoint.json')
        self.batch_size = batch_size

    def read_checkpoint(self):
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r') as f:
                checkpoint = json.load(f)
            return UUID(checkpoint['last_cdc_time'])
        return UUID('00000000-0000-1000-8080-808080808080')

    def commit_checkpoint(self, last_cdc_time):
        os.makedirs(self.checkpoint_dir, exist_ok=True)
        with open(self.checkpoint_file, 'w') as f:
            json.dump({'last_cdc_time': str(last_cdc_time)}, f)
        print(f"Checkpoint committed: {last_cdc_time}")

    def get_cdc_logs(self, start_time=None):
        if start_time is None:
            query = f'SELECT * FROM {self.table_name}'
        else:
            query = f"""
            SELECT *
            FROM {self.table_name}
            WHERE "cdc$time" > {start_time}
            ALLOW FILTERING
            """
        return self.session.execute(query)

    def get_messages(self):
        start_time = self.read_checkpoint()
        rows = self.get_cdc_logs(start_time)

        batch = []
        for row in rows:
            batch.append(row)
            if len(batch) >= self.batch_size:
                yield batch
                batch = []

        # Yield any remaining messages in the last batch
        if batch:
            yield batch

    def close(self):
        self.session.shutdown()
        self.cluster.shutdown()


def create_cdc_reader(host, keyspace, table, checkpoint_dir, batch_size=10):
    return ScyllaCDCReader(host=host, keyspace=keyspace, table_name=table,
                           checkpoint_dir=checkpoint_dir, batch_size=batch_size)


def sanitize_column_names(row):
    return {k.replace('$', '_'): v for k, v in row.items()}


def uuid_to_datetime(uuid_obj):
    timestamp = uuid_obj.time / 1e7  # Convert 100-nanosecond intervals to seconds
    return datetime.fromtimestamp(timestamp, tz=timezone.utc)


# Method to Write into Hudi table

In [8]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):

    path = f"file:///Users/soumilshah/Desktop/hudi/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields, 
         'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.PartialUpdateAvroPayload'
        
    }

    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

# Main Code

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType


reader = create_cdc_reader('127.0.0.1',
                           'my_keyspace',
                           'users_scylla_cdc_log',
                           './checkpoints',
                           batch_size=1000)


schema = StructType([
    StructField("cdc_time", StringType(), True),
    StructField("cdc_batch_seq_no", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("cdc_deleted_age", IntegerType(), True),
    StructField("cdc_deleted_city", StringType(), True),
    StructField("cdc_deleted_name", StringType(), True),
    StructField("cdc_deleted_state", StringType(), True),
    StructField("cdc_end_of_batch", BooleanType(), True),
    StructField("cdc_operation", IntegerType(), True),
    StructField("cdc_ttl", StringType(), True),
    StructField("city", StringType(), True),
    StructField("name", StringType(), True),
    StructField("state", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("readable_timestamp", StringType(), True)
])


try:
    for batch in reader.get_messages():
        sanitized_batch = [sanitize_column_names(row) for row in batch]
        for row in sanitized_batch:
            row.pop("cdc_stream_id")
            readable_timestamp = uuid_to_datetime((row['cdc_time']))
            row['cdc_time'] = str(row['cdc_time'])
            row['readable_timestamp'] = readable_timestamp.isoformat()

        """Create DF """
        df = spark.createDataFrame(sanitized_batch, schema=schema)        
        write_to_hudi(
            spark_df=df,
            db_name="default",
            table_name="users_scylla_cdc_log",
            recordkey="user_id",
            precombine="readable_timestamp",
            partition_fields=""
        )

        if sanitized_batch:
            pass
            reader.commit_checkpoint(sanitized_batch[-1]['cdc_time'])
finally:
    reader.close()

Checkpoint committed: 2cc3ea74-8da5-11ef-98e5-eef8b009db9e


# 

# Read from hudi table

In [14]:
path = "file:///Users/soumilshah/Desktop/hudi/database=default/table_name=users_scylla_cdc_log"

spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("snapshot")

spark.sql("select user_id,state,city,age from snapshot").show()

+-------+-----+-------------+---+
|user_id|state|         city|age|
+-------+-----+-------------+---+
|      2|   CA|San Francisco| 25|
|      1|   WA|      Seattle| 30|
|      3|   TX|       Austin| 28|
+-------+-----+-------------+---+

