# Data Processing Notebook Template

Use this scaffold to build repeatable PySpark pipelines with logging, validations, and optional Delta rollbacks. Replace placeholders with your sources, business logic, and checks.


## Notebook guidelines

- Name notebooks in clear snake_case (e.g., `orders_enriched.ipynb` or `domain_orders_enriched.ipynb`); keep one primary table per notebook.
- Make runs idempotent: deterministic transforms, safe overwrites/merges, and repeatable partition logic so reruns do not create duplicates.
- Keep scopes clean: functions in snake_case, classes in PascalCase, constants UPPER_SNAKE, modules/files in snake_case; avoid implicit globals beyond parameters.
- Encapsulate helpers in small functions inside the notebook when reusable; prefer pure functions and pass Spark/DataFrames explicitly.
- Document inputs/outputs near the top and ensure the notebook owns producing one curated table/view, not many.


In [11]:
from spark_fuse.spark import create_session
from spark_fuse.utils import change_tracking  # noqa: F401 (enables .change_tracking accessors)
from spark_fuse.utils.logging import create_progress_tracker, enable_spark_logging, log_info as log_info_step, log_error as log_error_step, log_end as log_end_step, console
from spark_fuse.utils.dataframe import ensure_columns, preview
from pyspark.sql import functions as F, types as T
import datetime as _dt

progress_tracker = create_progress_tracker(total_steps=10)
log = console()

def log_info(label: str, *, advance: int = 1) -> None:
    log_info_step(progress_tracker, log, label, advance=advance)


def log_error(label: str, *, advance: int = 1) -> None:
    log_error_step(progress_tracker, log, label, advance=advance)


def log_end(label: str, *, advance: int = 1) -> None:
    log_end_step(progress_tracker, log, label, advance=advance)

# Set any reusable parameters here
job_ts = _dt.datetime.now().replace(microsecond=0).isoformat()  # UTC timestamp; override if needed


> Why `functions as F` and `types as T`? Aliasing keeps chained expressions concise, matches common Spark style, and avoids polluting the global namespace with hundreds of column functions and type classes.


## Create a session

Adjust `app_name`, `master`, and configs for your environment.


In [12]:
log_info("Starting Spark session...", advance=0)
spark = create_session(
    app_name="data-processing-template",
    master="local[*]",
    extra_configs={"spark.some.credential": "value"},
)
log_info("Spark session ready")
spark


INFO: Spark session ready:  10%|█         | 1/10 [00:00<00:02,  3.24it/s, +0.31s, total 0.32s]

## Start logging

Raise Spark log verbosity while you iterate so shuffle and scheduler details show up in the driver logs.


In [13]:
enable_spark_logging(spark, level="WARN")
log_info("Spark logging enabled at WARN.", advance=0)
log_info("Logging configured")


INFO: Logging configured:  10%|█         | 1/10 [00:00<00:02,  3.24it/s, +0.00s, total 0.34s]            

## Load relevant data

Declare input locations and load dataframes. Swap formats and options for your sources.


In [14]:
log_info("Loading input data (dummy samples; replace with real sources)...", advance=0)

orders_schema = T.StructType(
    [
        T.StructField("order_id", T.StringType(), False),
        T.StructField("order_ts", T.StringType(), False),
        T.StructField("order_total", T.DoubleType(), False),
        T.StructField("customer_id", T.StringType(), False),
    ]
)
orders_data = [
    ("O-1001", "2024-01-05", 42.50, "C001"),
    ("O-1002", "2024-01-06", 18.00, "C002"),
    ("O-1003", "2024-01-06", 120.75, "C003"),
]
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

customers_schema = T.StructType(
    [
        T.StructField("customer_id", T.StringType(), False),
        T.StructField("segment", T.StringType(), True),
        T.StructField("country", T.StringType(), True),
    ]
)
customers_data = [
    ("C001", "retail", "US"),
    ("C002", "enterprise", "CA"),
    ("C003", "retail", "UK"),
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

log_info("Input data loaded")
log_info(f"Orders sample:\n{preview(orders_df)}", advance=0)


INFO: Input data loaded:  20%|██        | 2/10 [00:00<00:02,  3.24it/s, +0.04s, total 0.39s]                                               

                                                                                

INFO: Orders sample:
rows=[{'order_id': 'O-1001', 'order_ts': '2024-01-05', 'order_total': 42.5, 'customer_id': 'C001'}, {'order_id': 'O-1002', 'order_ts': '2024-01-06', 'order_total': 18.0, 'customer_id': 'C002'}, {'order_id': 'O-1003', 'order_ts': '2024-01-06', 'order_total': 120.75, 'customer_id': 'C003'}]
INFO: Orders sample:id:string,order_ts:string,order_total:double,customer_id:string>:  30%|███       | 3/10 [00:01<00:02,  3.24it/s, +0.04s, total 0.39s]
rows=[{'order_id': 'O-1001', 'order_ts': '2024-01-05', 'order_total': 42.5, 'customer_id': 'C001'}, {'order_id': 'O-1002', 'order_ts': '2024-01-06', 'order_total': 18.0, 'customer_id': 'C002'}, {'order_id': 'O-1003', 'order_ts': '2024-01-06', 'order_total': 120.75, 'customer_id': 'C003'}]
INFO: Orders sample:id:string,order_ts:string,order_total:double,customer_id:string>:  30%|███       | 3/10 [00:01<00:02,  3.24it/s, +1.05s, total 1.43s]
rows=[{'order_id': 'O-1001', 'order_ts': '2024-01-05', 'order_total': 42.5, 'customer_id': 

## Process data

Apply your business logic: filtering, casting, enrichment, and derived columns.


In [15]:
log_info("Curating datasets...", advance=0)
curated_orders_df = (
    orders_df
    .withColumn("order_date", F.to_date("order_ts"))
    .withColumn("order_month", F.date_format("order_date", "yyyy-MM"))
    .withColumn("processing_ts", F.lit(job_ts))
)

curated_customers_df = customers_df.select("customer_id", "segment", "country")
log_info("Curated orders and customers dataframes ready.", advance=0)
log_info("Curated datasets ready")


INFO: Curated datasets ready:  40%|████      | 4/10 [00:01<00:02,  2.70it/s, +0.00s, total 1.47s]                                                                                                                                                                                                                                                                                                                                                                               

## Do joins

Join curated datasets and pick the right join strategy for your domain (inner/left/anti).


In [16]:
log_info("Joining curated datasets...", advance=0)
joined_df = (
    curated_orders_df.alias("o")
    .join(curated_customers_df.alias("c"), on="customer_id", how="left")
)

log_info(f"Join complete. Sample:\n{preview(joined_df)}", advance=0)
log_info("Join complete")


INFO: Join complete. Sample:ts...:  40%|████      | 4/10 [00:01<00:02,  2.70it/s, +0.01s, total 1.48s]
rows=[{'customer_id': 'C001', 'order_id': 'O-1001', 'order_ts': '2024-01-05', 'order_total': 42.5, 'order_date': datetime.date(2024, 1, 5), 'order_month': '2024-01', 'processing_ts': '2026-01-20T13:47:26', 'segment': 'retail', 'country': 'US'}, {'customer_id': 'C002', 'order_id': 'O-1002', 'order_ts': '2024-01-06', 'order_total': 18.0, 'order_date': datetime.date(2024, 1, 6), 'order_month': '2024-01', 'processing_ts': '2026-01-20T13:47:26', 'segment': 'enterprise', 'country': 'CA'}, {'customer_id': 'C003', 'order_id': 'O-1003', 'order_ts': '2024-01-06', 'order_total': 120.75, 'order_date': datetime.date(2024, 1, 6), 'order_month': '2024-01', 'processing_ts': '2026-01-20T13:47:26', 'segment': 'retail', 'country': 'UK'}]
INFO: Join complete. Sample:ring,order_id:string,order_ts:string,order_total:double,order_date:date,order_month:string,processing_ts:string,segment:string,country:strin

                                                                                

## Do data tests

Add lightweight checks so issues surface early during development.


In [17]:
log_info("Running in-memory data tests...", advance=0)
# Schema/column guardrails
ensure_columns(joined_df, ["order_id", "customer_id", "order_date"])

# Null/uniqueness/data quality checks (expand as needed)
assert joined_df.filter(F.col("order_id").isNull()).count() == 0, "order_id should be populated"
assert joined_df.filter(F.col("customer_id").isNull()).count() == 0, "customer_id should be populated"
assert joined_df.dropDuplicates(["order_id"]).count() == joined_df.count(), "order_id should be unique"

# Domain-specific rule example; swap column names for your metric
invalid_states = joined_df.filter(F.col("order_total") < 0).count()
assert invalid_states == 0, f"Found {invalid_states} negative order totals"
log_info("In-memory data tests passed")


INFO: In-memory data tests passed:  60%|██████    | 6/10 [00:03<00:02,  1.55it/s, +0.96s, total 3.33s]    

## Write data & post-write tests

Persist curated results, run post-write validations, and attempt rollback when Delta Lake is available.


In [18]:
output_path = "/tmp/spark_fuse/orders_enriched_ct"  # dummy local path; replace with real target (e.g., s3://bucket/silver/orders)
target_table = "orders_enriched_ct"  # metastore table name if you want one registered
log_info(f"Preparing to write dataset to {output_path}", advance=0)

delta_supported = False
pre_write_version = None
output_format = "delta"
table_exists = False
try:
    from delta.tables import DeltaTable
    delta_supported = True
    try:
        DeltaTable.forPath(spark, output_path)
        table_exists = True
        log_info("Existing Delta table found; skipping DDL.", advance=0)
    except Exception:
        log_info("No existing Delta table found at output path; creating with change-tracking columns.", advance=0)
        (
            DeltaTable.createIfNotExists(spark)
            .tableName(target_table)
            .location(output_path)
            .addColumn("order_id", T.StringType())
            .addColumn("order_ts", T.StringType())
            .addColumn("order_total", T.DoubleType())
            .addColumn("customer_id", T.StringType())
            .addColumn("order_date", T.DateType())
            .addColumn("order_month", T.StringType())
            .addColumn("processing_ts", T.StringType())
            .addColumn("segment", T.StringType())
            .addColumn("country", T.StringType())
            .addColumn("effective_start_ts", T.TimestampType())
            .addColumn("effective_end_ts", T.TimestampType())
            .addColumn("is_current", T.BooleanType())
            .addColumn("version", T.LongType())
            .addColumn("row_hash", T.StringType())
            .addColumn("load_ts", T.TimestampType())
            .execute()
        )
except Exception:
    output_format = "parquet"
    log_info("Delta Lake not available; falling back to parquet for write and disabling rollback.", advance=0)

log_info(f"Writing data to final path with format={output_format}...", advance=0)
log_info("Write prerequisites complete")

if delta_supported:
    change_tracking_options = {
        "business_keys": ["order_id"],
        "tracked_columns": [
            "order_id",
            "customer_id",
            "order_date",
            "order_month",
            "processing_ts",
            "segment",
            "country",
            "order_total",
        ],
        "load_ts_expr": "current_timestamp()",
        "create_if_not_exists": not table_exists,
        "allow_schema_evolution": True,
    }
    joined_df.write.change_tracking.options(
        change_tracking_mode="track_history",
        change_tracking_options=change_tracking_options,
    ).table(output_path)
else:
    (
        joined_df.write
        .option("mergeSchema", "true")
        .mode("overwrite")
        .format(output_format)
        .partitionBy("order_month")
        .save(output_path)
    )

log_info("Write complete")

log_info("Running post-write validations on persisted data...", advance=0)
persisted_df = spark.read.format(output_format).load(output_path)
current_df = persisted_df.filter(F.col("is_current") == F.lit(True)) if delta_supported else persisted_df

try:
    ensure_columns(current_df, ["order_id", "customer_id", "order_date", "order_month"])
    assert current_df.count() > 0, "Persisted dataset is empty"
    assert current_df.filter(F.col("order_id").isNull()).count() == 0, "order_id should be populated"
    assert current_df.dropDuplicates(["order_id"]).count() == current_df.count(), "order_id should be unique"
    invalid_persisted_states = current_df.filter(F.col("order_total") < 0).count()
    assert invalid_persisted_states == 0, f"Found {invalid_persisted_states} negative order totals after write"
    log_info("Post-write validations passed")
except Exception as exc:
    log_error(f"Post-write validation failed: {exc}", advance=0)
    if delta_supported and pre_write_version is not None:
        try:
            log_info(f"Attempting Delta rollback to version {pre_write_version} ...", advance=0)
            spark.sql(f"RESTORE TABLE delta.`{output_path}` TO VERSION AS OF {pre_write_version}")
            log_info("Rollback succeeded.", advance=0)
        except Exception as rollback_exc:
            log_error(f"Rollback attempt failed: {rollback_exc}", advance=0)
    else:
        log_info("No rollback available; inspect persisted data manually.", advance=0)
    raise


INFO: Write prerequisites complete:  60%|██████    | 6/10 [00:03<00:02,  1.55it/s, +0.00s, total 3.37s]                                    

26/01/20 13:47:33 WARN MapPartitionsRDD: RDD 90 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


INFO: Post-write validations passed:  90%|█████████ | 9/10 [00:09<00:01,  1.48s/it, +1.04s, total 9.99s]                      

## Post-write Delta log
Recent Delta history after writing.


In [19]:
from pathlib import Path
from delta.tables import DeltaTable
from pyspark.sql import functions as F
log_path = Path(output_path) / '_delta_log'
if delta_supported and log_path.exists():
    try:
        dt = DeltaTable.forPath(spark, output_path)
        history_df = dt.history(10)
        merge_ops = history_df.filter(F.col('operation') == 'MERGE')
        history_df.select('version','timestamp','operation','operationParameters','operationMetrics').show(truncate=False)
        pivoted = (
            merge_ops
            .select('version', F.explode('operationMetrics').alias('metric','value'))
            .where(F.col('value').isNotNull())
            .groupBy('version')
            .pivot('metric')
            .agg(F.first('value'))
            .orderBy('version')
        )
        pivoted.show(truncate=False)
    except Exception as exc:
        log_info(f"Delta history not available: {exc}", advance=0)
else:
    log_info("No Delta log found after write; ensure output_path is correct.", advance=0)


+-------+-----------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationP

## Stop session

Shut down the session once the job completes.


In [20]:
log_info("Stopping Spark session.", advance=0)
spark.stop()
log_end("Spark session stopped")


END: Spark session stopped: 100%|██████████| 10/10 [00:11<00:00,  1.13s/it, +0.81s, total 11.29s]       
