In [3]:
# Ensure the local package is importable both locally and on Databricks
import sys
from pathlib import Path

# Local/Jupyter: search upwards for a 'src' folder and add to sys.path
for base in [Path.cwd(), *Path.cwd().parents]:
    candidate = base / "src"
    if candidate.exists():
        sys.path.insert(0, str(candidate))
        break

# Databricks: try to add the imported workspace folder to sys.path
try:
    from pyspark.dbutils import DBUtils  # type: ignore
    from pyspark.sql import SparkSession  # type: ignore

    spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
    dbutils = DBUtils(spark)
    user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply("user")
    ws_base = Path(f"/Workspace/Users/{user}/bluebrick")
    for candidate in [ws_base / "src", ws_base]:
        if candidate.exists():
            sys.path.insert(0, str(candidate))
except Exception as exc:
    import logging

    logging.getLogger(__name__).debug("Skipping Databricks sys.path setup: %s", exc)

# BlueBrick Quickstart ETL

This notebook reads configuration, creates a tiny demo dataset, applies deterministic
transformations, then writes a Delta table to Unity Catalog as `catalog.schema.table`.
Run this on a UC-enabled workspace.

In [4]:
from IPython.display import display

from bluebrick.config import load_config
from bluebrick.io import ensure_uc_names, get_spark, write_delta
from bluebrick.transformations import clean_sales

In [5]:
# 1) Spark session
spark = get_spark("bluebrick-quickstart")

# 2) Load config (BLUEBRICK_ENV or Databricks widget "bluebrick_env")
cfg = load_config()
catalog, schema, table = ensure_uc_names(cfg["catalog"], cfg["schema"], cfg["table"])

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/04 10:58:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# 3) Create tiny demo dataset (acts as RAW)
src = spark.createDataFrame(
    [(1, " A ", "2024-01-01", 100.0), (2, "B", "2024-01-02", None)],
    "id INT, name STRING, tx_date STRING, amount DOUBLE",
)

# 4) Transform
df_clean = clean_sales(src)
df_clean.createOrReplaceTempView("sales_clean")

In [7]:
# 5) Ensure UC namespaces and write as managed Delta table
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")

full_name = f"{catalog}.{schema}.{table}"
write_delta(df_clean, full_name)

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'CATALOG'.(line 1, pos 7)

== SQL ==
CREATE CATALOG IF NOT EXISTS main
-------^^^


In [None]:
# 6) Simple verification queries
count_df = spark.table(full_name).groupBy().count()
schema_df = spark.table(full_name).limit(0)
display(count_df)
display(schema_df)