# CDC - DLT Bronze Demo

**Summary:**
- Purpose:
  - Ingest incremental CDC files from s3 into raw bronze table
- Tools: 
  - autoloader
- Input: 
  - CDC parquet files in s3 bucket
- Output: 
  - my_catalog.my_schema.cdc_bronze_orders
    - data types are all string


In this notebook, we will demo how to build a **DLT**(Delta-Live-Tables), using ***Autoloader***. 
We will cover
- CDC to Bronze raw
  - Support schema evolution

We also comment out the code for defining schema explicitly

### CDC to Bronze raw
- CDC parquet files get dumpled into s3
- Autoloader detects new files and load it into cdc_bronze_orders as appending behavior

In [0]:
import dlt
from pyspark.sql.functions import *

# Define DLT table
@dlt.table(
    name="cdc_bronze_orders",
    comment="Bronze table to capture raw CDC data for orders."
)
def bronze_cdc_data():
    return (
        spark.readStream.format("cloudFiles")                         # autoloader to ingest data
        .option("cloudFiles.format", "parquet")                       # specify underlying file format
        .load("/Volumes/neo_zhou/client_demo_schema_evolution/cdc_volume_managed")                # repalce with path to your cdc volume, recommend configuring a managed volume for it
    )

#### Check SQL Editor panel for generated results

After this step, Catalog will have a table *neo_zhou.client_demo.cdc_bronze_orders*.    

Run the following code in SQL panel to validate:  
*SELECT \* FROM neo_zhou.client_demo_schema_evolution.cdc_bronze_orders*


## Appendix: define schema


In [0]:

# import dlt
# from pyspark.sql.functions import *

# # Define the schema for CDC data to structure the data correctly
# cdc_schema = """schema STRING, 
#                 table STRING, 
#                 operation STRING, 
#                 timestamp STRING, 
                
#                 data STRUCT<order_id: INT, 
#                             customer_id: INT, 
#                             order_date: STRING, 
#                             amount: DOUBLE, 
#                             status: STRING>,

#                 old_data STRUCT<amount: DOUBLE, 
#                                 status: STRING>
#               """

# @dlt.table(
#     name="cdc_bronze_orders",
#     comment="Bronze table to capture raw CDC data for orders."
# )
# def bronze_cdc_data():
#     return (
#         spark.readStream.format("cloudFiles")           # autoloader way to ingest data
#         .option("cloudFiles.format", "json")            # specify underlying file format
#         .schema(cdc_schema)                             # using the defined schema
#         .load("s3://neo-client-demo-tokyo/cdc_volume")  # Replace with the path to your volume
#     )