# Week 4 Lab: Bronze Layer

In this lab you will build the Bronze layer of our bookstore's medallion architecture.

You'll ingest four CSV files into Delta tables, adding audit columns and using MERGE INTO so the pipeline is idempotent — safe to run multiple times without creating duplicates.

**A note on our source data:**
- **Books and Stores** are static reference data — each is a single CSV file that won't change over time.
- **Online Orders and In-Store Orders** are transactional data. New order files can arrive in their directories at any time, so `read_files` reads the entire directory to pick up any new files on each run.

## Prerequisites

Before running this notebook, make sure the target Bronze tables exist. Run the DDL notebook at `ddl/bronze` to create them.

## Step 1: Load CSV Files into Temporary Views

We read each source CSV into a temporary view. For books and stores, we point at a single static file. For orders, we point at a directory — `read_files` will read all CSVs in the directory, picking up any new files that have landed since the last run. We add `source_filename` here since it comes from the file itself. The `ingestion_timestamp` will be set later at MERGE time — that way it reflects when the row was actually written to the table, not when it was read from the CSV.

Load the stores CSV file into a temporary view called `stores_raw`. Stores is also a static data set.

In [None]:
CREATE OR REPLACE TEMPORARY VIEW stores_raw AS
SELECT
  *,
  _metadata.file_path AS source_filename
FROM read_files(
  '/FileStore/hwe-data/stores/stores.csv',
  format => 'csv',
  header => true
)

Load the books CSV file into a temporary view called `books_raw`. Books is a static data set — there's only ever one file. Select all columns plus `_metadata.file_path` as `source_filename`.

In [None]:
CREATE OR REPLACE TEMPORARY VIEW books_raw AS
SELECT
  *,
  _metadata.file_path AS source_filename
FROM read_files(
  '/FileStore/hwe-data/books/books.csv',
  format => 'csv',
  header => true
)

Load all CSVs from the online orders directory into a temporary view called `online_orders_raw`. Unlike books and stores, new order files can arrive in this directory at any time — `read_files` reads the entire directory so we always pick up everything.

In [None]:
CREATE OR REPLACE TEMPORARY VIEW online_orders_raw AS
SELECT
  *,
  _metadata.file_path AS source_filename
FROM read_files(
  '/FileStore/hwe-data/online_orders/',
  format => 'csv',
  header => true
)

Load all CSVs from the in-store orders directory into a temporary view called `instore_orders_raw`. Same as online orders — new files can arrive at any time.

In [None]:
CREATE OR REPLACE TEMPORARY VIEW instore_orders_raw AS
SELECT
  *,
  _metadata.file_path AS source_filename
FROM read_files(
  '/FileStore/hwe-data/instore_orders/',
  format => 'csv',
  header => true
)

---
## Step 2: MERGE INTO Bronze Tables

Now we load data from the temporary views into the bronze tables using MERGE INTO.

MERGE INTO matches rows on a natural key:
- If a match is found → update the existing row
- If no match → insert a new row

This makes our pipeline **idempotent** — running it again with the same data won't create duplicates.

We wrap the source in a subquery to add `ingestion_timestamp` at write time.

Merge `stores_raw` into `bronze.stores`, matching on `store_nbr`.

In [None]:
MERGE INTO bronze.stores AS target
USING (
  SELECT *, current_timestamp() AS ingestion_timestamp
  FROM stores_raw
) AS source
ON target.store_nbr <=> source.store_nbr
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Merge `books_raw` into `bronze.books`, matching on `isbn`.

In [None]:
INSERT OVERWRITE bronze.books
SELECT
  isbn,
  title,
  author,
  genre,
  current_timestamp() AS ingestion_timestamp,
  source_filename
FROM books_raw

Merge `online_orders_raw` into `bronze.online_orders`, matching on `order_id`.

In [None]:
MERGE INTO bronze.online_orders AS target
USING (
  SELECT *, current_timestamp() AS ingestion_timestamp
  FROM online_orders_raw
) AS source
ON target.order_id <=> source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Merge `instore_orders_raw` into `bronze.instore_orders`, matching on `order_id`.

In [None]:
MERGE INTO bronze.instore_orders AS target
USING (
  SELECT *, current_timestamp() AS ingestion_timestamp
  FROM instore_orders_raw
) AS source
ON target.order_id <=> source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

---
## Step 3: Verify Idempotency

Now go back and **run all the cells above a second time**. The MERGE should match every row and update it — no new rows should be inserted.

Let's verify.

Check the row counts across all four bronze tables. These should be the same before and after the second run.

In [None]:
SELECT 'bronze.stores' AS table_name, COUNT(*) AS row_count FROM bronze.stores
UNION ALL
SELECT 'bronze.books', COUNT(*) FROM bronze.books
UNION ALL
SELECT 'bronze.online_orders', COUNT(*) FROM bronze.online_orders
UNION ALL
SELECT 'bronze.instore_orders', COUNT(*) FROM bronze.instore_orders

Check for duplicate keys. This query should return **no rows**. If it returns anything, your MERGE key isn't working correctly.

In [None]:
SELECT 'bronze.stores' AS table_name, store_nbr AS key, COUNT(*) AS cnt
FROM bronze.stores GROUP BY store_nbr HAVING COUNT(*) > 1
UNION ALL
SELECT 'bronze.books', isbn, COUNT(*)
FROM bronze.books GROUP BY isbn HAVING COUNT(*) > 1
UNION ALL
SELECT 'bronze.online_orders', order_id, COUNT(*)
FROM bronze.online_orders GROUP BY order_id HAVING COUNT(*) > 1
UNION ALL
SELECT 'bronze.instore_orders', order_id, COUNT(*)
FROM bronze.instore_orders GROUP BY order_id HAVING COUNT(*) > 1

Finally, spot-check a few rows to confirm the audit columns look correct. The `ingestion_timestamp` should reflect when you last ran the pipeline.

In [None]:
SELECT isbn, title, ingestion_timestamp, source_filename
FROM bronze.books
LIMIT 5