# 🟫 Bronze Layer - Table Ingestion Template

This notebook is a standard template for ingesting raw data into the **Bronze layer** of our Lakehouse architecture using Databricks and Unity Catalog.

It handles:
- Extraction (full or incremental)
- Serialization and standardization of data types
- Quality validation (optional)
- Writing to a Delta table (first-time or partition overwrite)
- Metadata logging (record count, expectations, vacuum)
- Optional table documentation (tags, notebook registration)

---

## How to use this template

1. **Update the table name and path**
   - Edit the `table = ...` line to set the correct target table.
   - The `path` will be inferred automatically.

2. **Implement your extraction logic**
   - Based on the `w_first_write` flag, define full or incremental extraction.

3. **Use the schema helper**
   - When `w_debug = true`, a suggested schema for Silver typing will be printed.

4. **Optional: Add more quality rules**
   - You can extend the list of rules in the `Expectations` section.

5. **Test in `dev` or `stg` environments first**
   - Debug mode is automatically enabled in non-production.

---

## Widgets

| Widget            | Description                                 | Default     |
|-------------------|---------------------------------------------|-------------|
| `w_reference_date`| Reference date for incremental loads        | Today       |
| `w_first_write`   | Indicates full (true) or incremental (false)| false       |
| `w_documentation` | Whether to apply table tags and register it | true        |
| `w_debug`         | Enables debug prints and disables logging   | auto (env)  |
| `w_quality`       | Enables application of quality rules        | true        |

---

## Table conventions

- Tables must be in the format: `catalog.schema.table`
- Bronze data is **typed as string** and **partitioned by `_created_at`**
- Source structures (array, map, struct) are **serialized as JSON**
- All writes append to Delta tables using partition overwrite

---

## Logging & Observability

- `Expectations` will quarantine and log failing records (if enabled)
- `RecordsQuantity` logs the amount of written rows
- `VACUUM` retains 7 days of history by default
- In production, notebook-to-table mapping is saved for lineage tracking



In [0]:
from datetime import datetime
import re

import pyspark.sql.functions as F
import pyspark.sql.types as T

import src.utils as utils
from src.notebook_context import notebook_context
from src.expectations import Expectations
from src.records_quantity import RecordsQuantity

In [0]:
# Notebook context
notebook_vars = notebook_context()
env = notebook_vars.get("env_folder")
catalog = env.split(".")[-1]
notebook_path = notebook_vars.get("notebook_path")

# Default parameters
today = datetime.today().strftime("%Y-%m-%d")
default_debug = "true" if env in ["dev", "stg"] else "false"

# Widgets declaration
dbutils.widgets.text("w_reference_date", today, "Reference date (YYYY-MM-DD)")
dbutils.widgets.dropdown("w_first_write", "false", ["true", "false"], "First write?")
dbutils.widgets.dropdown("w_documentation", "true", ["true", "false"], "Apply documentation?")
dbutils.widgets.dropdown("w_debug", default_debug, ["true", "false"], "Debug mode?")
dbutils.widgets.dropdown("w_quality", "true", ["true", "false"], "Validate quality rules?")

# Widget values
w_reference_date = utils.get_date_widget("w_reference_date")
w_first_write = utils.get_bool_widget("w_first_write")
w_documentation = utils.get_bool_widget("w_documentation")
w_debug = utils.get_bool_widget("w_debug")
w_quality = utils.get_bool_widget("w_quality")

In [0]:
# TODO: Update with the correct table name in the format: <catalog>.<schema>.<table_name>
table = f"{catalog}.source_bronze.table"

# Parse table components
parts = table.split(".")
if len(parts) != 3:
    raise ValueError(f"[config] Invalid table format: '{table}'. Expected format: '<catalog>.<schema>.<table>'")

catalog, schema, table_name = parts
path = f"Volumes/{catalog}/{schema}/{table_name}/"

utils.formatted_print(f"[config] Table: {table}", debug=w_debug)
utils.formatted_print(f"[config] Path: {path}", debug=w_debug)

In [0]:
if w_first_write:
    # TODO: Implement full extraction logic
    utils.formatted_print("[extract] Full extraction triggered", debug=w_debug)
    df = ...  # full load
else:
    # TODO: Implement incremental extraction logic
    utils.formatted_print("[extract] Incremental extraction triggered", debug=w_debug)
    df = ...  # incremental load

utils.debug_df(df, debug=w_debug, label="Raw")

In [0]:
for col_name in df.columns:
    data_type = df.schema[col_name].dataType

    if isinstance(data_type, (T.ArrayType, T.MapType, T.StructType)):
        df = df.withColumn(col_name, F.to_json(F.col(col_name)))
        utils.formatted_print(f"[typing] Column '{col_name}' ({data_type}) -> serialized to JSON", debug=w_debug, force_debug_only=True)
    else:
        df = df.withColumn(col_name, F.col(col_name).cast("string"))
        utils.formatted_print(f"[typing] Column '{col_name}' ({data_type}) -> casted to string", debug=w_debug, force_debug_only=True)

utils.debug_df(df, debug=w_debug, label="Typed")

In [0]:
pattern = r'\b(?!Array|Map|Struct)(\w+Type\(\))'
modified_schema = re.sub(pattern, 'StringType()', str(df.schema))

utils.formatted_print(f"\n[debug/schema]\n> Suggested schema for Silver typing (convert all simple types to StringType):\n{modified_schema}", debug=w_debug, force_debug_only=True)

In [0]:
df = df.withColumns({
    "_created_at": F.current_timestamp(),
    # "_source_file": F.input_file_name(),  # TODO: Uncomment if reading from files (e.g., CSV, Parquet)
})

utils.debug_df(df, debug=w_debug, label="Final")

In [0]:
if w_quality:
    rules = [
        {"scenario": "is_not_empty"}
    ]

    Expectations(
        df=df,
        table_name=table,
        debug=w_debug,
        save_log=not w_debug,
        raise_exception=True,
        quarantine=True
    ).apply(rules)

In [0]:
utils.formatted_print(f"[write] Writing table: {table} | Mode: {'full' if w_first_write else 'incremental'}", debug=w_debug)

if w_first_write:
    # Full write: create table with schema and path
    (df.write
        .mode("overwrite")
        .partitionBy("_created_at")
        .option("overwriteSchema", "true")
        .option("path", path)
        .saveAsTable(table)
    )
    utils.formatted_print(f"[write] Table created at path: {path}", debug=w_debug)

    # Register table <-> notebook (only in prod)
    if env == '3.prd':
        utils.register_table_notebook(table, notebook_path, debug=w_debug)

else:
    # Incremental write: overwrite based on date partition
    utils.formatted_print(f"[write] Overwriting partition: _created_at = {w_reference_date}", debug=w_debug)

    (df.write
        .mode("overwrite")
        .option("replaceWhere", f"_created_at = '{w_reference_date}'")
        .saveAsTable(table)
    )

# Collect number of records written
RecordsQuantity(table_name=table, debug=w_debug, save_log=not w_debug).save()

utils.formatted_print(f"[vacuum] Executing VACUUM on table {table} (168 hours retention)", debug=w_debug)
spark.sql(f"VACUUM {table} RETAIN 168 HOURS")

In [0]:
if w_documentation:
    utils.formatted_print("[documentation] Applying table tags...", debug=w_debug)

    tags = {
        "source": "",
        "domain": "",
        "owner": "",
        "criticality": "",
        "retention_policy": ""
    }

    utils.apply_table_tags(table=table, tags=tags, debug=w_debug)

    utils.formatted_print("[documentation] Tagging completed", debug=w_debug)

    if w_debug:
        tags_applied = (
            spark.sql(f"DESCRIBE TABLE EXTENDED {table}")
                .filter("col_name = 'Table Properties'")
                .select("data_type").collect()
            )
        print("[debug] Applied table tags:")
        for row in tags_applied:
            print(row["data_type"])