In [0]:
%pip install pytest==8.4.2

# üßæ Module 3 ‚Äî Incremental CDC Ingestion (Sales)

Building resilient pipelines with **Delta Lake MERGE** and **Auto Loader**

### Learning Objectives
In this notebook, learners will:
- Understand **Change Data Capture (CDC)** concepts in a Lakehouse.
- Simulate daily incremental data deliveries using generated CSVs.
- Apply **MERGE INTO** to synchronize changes (insert, update, delete).
- Manage **schema evolution** (new `region` column on day 4).
- Ensure **idempotent processing** ‚Äî re-running a date shouldn‚Äôt create duplicates.
- Validate the final Silver table.

### Scenario:
Your e-commerce system delivers **daily incremental sales extracts**:  
- Day 0 ‚Üí Historical snapshot.  
- Days 1‚Äì7 ‚Üí Daily delta files with inserts, updates, and deletes.  
- On day 4 ‚Üí The schema evolves (a new column `region` is added).  

Our job is to build a resilient CDC pipeline capable of handling all these changes seamlessly.

## Step 0 ‚Äî Setup and Context

**TO DO:**

Define common variables like 
- Volume locations: data, checkpoints and schemas.
- Table full name (three level namespace).
- Import python libraries if needed: e.g. `helpers.utils` package.

> **Optional:**  
> Validate data in the volumen you just created: `"/Volumes/capstone_dev/{{you_bronze_schema}}/raw_files/"`  
> You can use dbutils command for that purpose.


In [0]:
from helpers import utils
from instructors.src.solutions.cdc_merge import CDCMerge

mod3 = CDCMerge()
catalog_name = utils.get_param("catalog", "capstone_dev")

base_user = utils.get_base_user_schema()
schema_bronze = f"{base_user}_bronze"
schema_silver = f"{base_user}_silver"
table_name = "sales"

# UC Volume path for governed data
volume_data = f"/Volumes/{catalog_name}/{schema_bronze}/raw_files/sales"
volume_check = f"/Volumes/{catalog_name}/{schema_bronze}/checkpoint_files/sales"
volume_schema = f"/Volumes/{catalog_name}/{schema_bronze}/schema_files/sales"
full_table_bronze = f"{catalog_name}.{schema_bronze}.{table_name}"
full_table_silver = f"{catalog_name}.{schema_silver}.{table_name}"

print(f"Source Volume Path: {volume_data}")
print(f"Source Volume Checkpoints: {volume_check}")
print(f"Source Volume Schema: {volume_schema}")
print(f"Bronze Table: {full_table_bronze}")
print(f"Silver Table: {full_table_silver}")


In [0]:
# Optional: Validate that 8 files exist in the raw folder
files = dbutils.fs.ls(volume_data)
print(f"Found {len(files)} files in {volume_data}:")
for f in files:
    print("-", f.name)

## Step 1 ‚Äî Ingest Daily Files into Bronze Layer

**TO DO:**
- Create empty table in bronze layer with expected schema.
- Table creation is optional if autoloader is used and toTable option is active.
- Initial schema doesn't contain the column region.
- Copy data from the input files CSV (data volumen) into the **Bronze Delta table**.
- You can use both `COPY INTO` or Autoloader to insert the data, both commands ensure **idempotency** ‚Äî previously loaded files won‚Äôt reload.
- Include the `_metadata` struct (available in Auto Loader and COPY INTO), which contains file-level information such as _metadata.file_name, _metadata.file_modification_time, and _metadata.file_size. We'll use that column in future steps. https://docs.databricks.com/aws/en/ingestion/file-metadata-column 

**TIPS:**
- Handle schema change / evolution on read (FORMAT_OPTIONS).
- Enable `mergeSchema = true` option on write (COPY_OPTIONS) to gracefully handle the appearance of new columns.
- In case of choosing autoloader, use the corresponding `checkpoint_files` and `schema_files` volumes.

### 1.1 ‚Äî Create a Bronze Delta Table (OPTIONAL)

| COLUMN | DATA TYPE |
| :------- | :------: |
| sale_id | INT |
| product_id | INT |
| user_id | INT |
| qty | INT |
| price | DOUBLE |
| status | STRING |
| updated_at | DATE |

In [0]:
display(mod3.create_bronze_sales(full_table_bronze))

### 1.2 ‚Äî Copy Data

In [0]:
### OPTION 1
display(mod3.copy_into_with_metadata(full_table_bronze, volume_data))

In [0]:
### OPTION 2
#display(mod3.auto_loader_with_metadata(full_table_bronze, volume_data, volume_schema, volume_check))

### 1.3 ‚Äî Validate Data

In [0]:
query = f"SELECT * FROM {full_table_bronze}"
display(spark.sql(query))

In [0]:
query = f"DESCRIBE HISTORY {full_table_bronze}"
display(spark.sql(query))

In [0]:
query = f"DESCRIBE TABLE {full_table_bronze}"
display(spark.sql(query))


## Step 2 ‚Äî Apply CDC Merge Logic

We'll now implement the **Silver merge logic**, where each day's file is treated as an incremental CDC feed applied onto our existing table.

**TO DO:**
- Create a silver table **sales**:
  - Add 4 additional columns **_is_active**, **_created_at**, **_updated_at** and **_file_name**.
  - The input columns **status** and **updated_at** are not needed after the merge and should be ignored in the silver table.
  - Rename the column qty to quantity.
- Create a function named `cdc_merge` that simulates a daily insertion.
  - This function should receive 3 parameters `full_table_bronze`, `full_table_silver`, and a string `filter_date` parameter with format ('YYYY-MM-DD').
  - Filter the bronze table by using the `filter_date` on the `updated_at` column for a single day increment, and perform a `MERGE INTO` operation into the silver table. You can use an intermediate temporary table for that purpose.
- Execute the function iteratively **per day** in order to simulate historical load and incremental updates.

**MERGE INTO - DETAIL**  

- The status column identifies the operation to perform (Update, Delete, Insert).
- We want to keep all sales, even after deletion by using the **_is_active** column.
- **INSERT:** new records.
  - The new columns **_created_at** and **_updated_at** should be populated with the **updated_at** value.
  - **_is_active** should be true at insertion.
  - **_file_name** should be extracted from the metadata struct (_metadata.file_name).
- **UPDATE:** modified columns (**quantity**, **price**, **region** and **_updated_at**).
- **DELETE:** Mark deleted records as inactive (`is_active = false`) and set new value of **_updated_at**.
- You should guarantee `Idempotency, It means if we run the data merge again for the same already processed dates, the result should't change. You can use  **_updated_at** column to achieve that purpose.

### 2.1 ‚Äî Create a Silver Delta Table

| COLUMN | DATA TYPE |
| :------- | :------: |
| sale_id | INT |
| product_id | INT |
| user_id | INT |
| quantity | INT |
| price | DOUBLE |
| region | STRING |
| (new columns) ... | ... |

In [0]:
display(mod3.create_silver_sales(full_table_silver))

### 2.2 ‚Äî Create CDC merge function

In [0]:
"""
def cdc_merge(self, full_table_bronze: str, full_table_silver: str, filter_date: str):
    pass
"""

### 2.3 ‚Äî Execute merge _iteratively_

In [0]:
query = f"SELECT DISTINCT CAST(updated_at AS STRING) FROM {full_table_bronze} ORDER BY updated_at"
dates = [row['updated_at'] for row in spark.sql(query).collect()] 
print(dates)

for date in dates:
    print("Performing CDC Merge:", date)
    mod3.cdc_merge(full_table_bronze, full_table_silver, date)


### 2.4 ‚Äî Validate Data

Let‚Äôs validate our CDC logic and schema evolution:
- Visualize resulting data.
- Check the number of active vs deleted rows.
- Validate table history, there should be 1 merge operation per each file / date.

In [0]:
query = f"SELECT * FROM {full_table_silver} ORDER BY sale_id"
display(spark.sql(query))

In [0]:
query = f"""
SELECT
  MIN(_file_name) AS min_file,
  MAX(_file_name) AS max_file,
  COUNT(*) AS total_rows,
  SUM(CASE WHEN _is_active THEN 1 ELSE 0 END) AS active_rows,
  COUNT(DISTINCT region) AS distinct_regions
FROM {full_table_silver}
"""
display(spark.sql(query))

In [0]:
query = f"DESCRIBE HISTORY {full_table_silver}"
display(spark.sql(query))

## Step 3 ‚Äî Optimize and Time Travel (OPTIONAL)

Finally, we apply performance and maintenance commands:
- `OPTIMIZE` with ZORDER for query speed (sale_id).
- Demonstrate **time travel** by exploring previous table versions - e.g. first historic load.

>**Key Note:**
>- `ZORDER BY sale_id` co-locates data with similar sale_ids, speeding up point lookups and merge operations.
>- Use the `DESCRIBE HISTORY` command to explore rollback scenarios.


In [0]:
query = f"OPTIMIZE {full_table_silver} ZORDER BY sale_id"
display(spark.sql(query))

In [0]:
query = f"SELECT * FROM {full_table_silver} VERSION AS OF 1"
display(spark.sql(query))

## Reflection & Discussion

1. **Soft vs. Hard Deletes:**  
   - How would your MERGE logic change if you wanted to *physically remove* deleted records rather than marking them inactive?
   - What would be the pros and cons in a data lake context?
2. **Reprocessing and Checkpoints:**  
   - If a new CDC file arrives late or you must re-run day 3, how could you ensure idempotency?  
   - What metadata or checkpointing strategies could you implement to track the ‚Äúlast successfully processed‚Äù date or file?
3. **Schema Evolution in Streaming Pipelines:**  
   - How can you adapt this logic to Auto Loader in *continuous mode*?  
   - What settings (`cloudFiles.schemaEvolutionMode`, `mergeSchema`) help keep the pipeline robust against schema drift?![](path)


# Tests

TO DO:

- Check if there are any failed tests and investigate their root cause

In [0]:
from helpers import test_runner
import os

notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
os.environ["NOTEBOOK_NAME"] = notebook_path.split("/")[-1]

test_runner.run()