In [None]:
%%configure -f
{
  "conf": {
    "spark.notebook.parameters": "{\"name\": \"BFF-1M-LH-to-WH-Increment\", \"dataset_name\": \"1m\", \"source\": \"lakehouse\", \"format\": \"warehouse\", \"update_strategy\": \"Increment\", \"AZURE_SQL_SERVER\": \"benchmarking-bff\", \"AZURE_SQL_DB\": \"benchmarking\", \"AZURE_SQL_SCHEMA\": \"dbo\"}"
  },
  "defaultLakehouse": {
    "name": "BenchmarkLakehouse"
  }
}


# ðŸ““ 1. Ingest Data
## Ingestion Module â€” single parameter-set run

This notebook ingests one parameter set and supports lakehouse/sql sources and delta/warehouse targets.
Behavior: strict parameter expectations (no silent defaults). Parameter-set keys expected in spark.notebook.parameters: `name`, `dataset_name`, `source`, `format`, `update_strategy`. If `source == 'sql'`, `AZURE_SQL_SERVER` and `AZURE_SQL_DB` are also required.

In [None]:
import json
import time
from datetime import datetime
import re
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType
from pyspark.sql.functions import lit

print('Setup imports done')


In [None]:
# Params cell â€” ABFSS URIs using workspace/display-name containers (apply_updates style)
spark = SparkSession.builder.getOrCreate()
conf_key = 'spark.notebook.parameters'
conf_str = None
try:
    conf_str = spark.conf.get(conf_key, None)
except Exception:
    conf_str = None
if not conf_str:
    try:
        conf_str = spark.sparkContext.getConf().get(conf_key, None)
    except Exception:
        conf_str = None
if not conf_str:
    raise SystemExit('Missing required spark.notebook.parameters. Provide parameter-set keys in the %%configure cell.')

raw_params = json.loads(conf_str)

# Required keys
required = ["name", "dataset_name", "source", "format", "update_strategy"]
missing = [k for k in required if k not in raw_params]
if missing:
    raise SystemExit(f"Missing required parameters in spark.notebook.parameters: {', '.join(missing)}")

# Core params
test_case_name = raw_params["name"]
dataset_name = raw_params["dataset_name"]
source = str(raw_params["source"]).lower()
fmt = str(raw_params["format"]).lower()
update_strategy = raw_params["update_strategy"]

# derive sanitized target table name from the parameter-set display name
# produce safe SQL identifier: lowercase letters, numbers and underscores
# convert spaces and hyphens to underscores, then remove any other invalid chars
sanitized_name = re.sub(r"[^a-z0-9_]", "", re.sub(r"[\s-]+", "_", test_case_name.strip().lower()))
target_table = sanitized_name

# Display-name anchors
controller_workspace_name = "BFF-Controller"           # ABFSS container for DataSourceLakehouse & MetricsLakehouse
controller_lakehouse_name = "DataSourceLakehouse"     # folder under workspace lakehouse area

# Benchmark workspace/container uses the test_case display name
benchmark_workspace_container = test_case_name

# ABFSS account/host used in this workspace
abfss_account = "onelake.dfs.fabric.microsoft.com"

# Helper to convert display-name -> container-like string used elsewhere in repo (apply_updates uses container names without spaces)
def _container_from_display(name):
    return name.replace(" ", "")

# Build ABFSS path for data source (generator writes to e.g. 10kbase)
src_container = _container_from_display(controller_workspace_name)
data_source_lakehouse_path = (
    f"abfss://{src_container}@{abfss_account}/{controller_lakehouse_name}.Lakehouse/Files/{dataset_name}base"
)

# Targets (display-name based) â€” keep same simple names as apply_updates
target_lakehouse = raw_params.get("target_lakehouse", "BenchmarkLakehouse")
target_warehouse = raw_params.get("target_warehouse", "BenchmarkWarehouse")

# Build target ABFSS URIs for diagnostics / fallback (use benchmark workspace container)
tgt_container = _container_from_display(benchmark_workspace_container)
target_lakehouse_abfss_tables = (
    f"abfss://{tgt_container}@{abfss_account}/{target_lakehouse}.Lakehouse/Tables/{target_table}"
)
target_lakehouse_abfss_files = (
    f"abfss://{tgt_container}@{abfss_account}/{target_lakehouse}.Lakehouse/Files/{dataset_name}/{target_table}"
)
# SQL params (only required for source == 'sql')
AZURE_SQL_SERVER = raw_params.get("AZURE_SQL_SERVER")
AZURE_SQL_DB = raw_params.get("AZURE_SQL_DB")
AZURE_SQL_SCHEMA = raw_params.get("AZURE_SQL_SCHEMA", "dbo")
if source == "sql" and (not AZURE_SQL_SERVER or not AZURE_SQL_DB):
    raise SystemExit("SOURCE=sql requires AZURE_SQL_SERVER and AZURE_SQL_DB in spark.notebook.parameters")

# Expose globals for downstream cells
globals().update({
    "raw_params": raw_params,
    "test_case_name": test_case_name,
    "dataset_name": dataset_name,
    "source": source,
    "fmt": fmt,
    "update_strategy": update_strategy,
    "sanitized_name": sanitized_name,
    "target_table": target_table,
    "data_source_lakehouse_path": data_source_lakehouse_path,
    "target_lakehouse": target_lakehouse,
    "target_warehouse": target_warehouse,
    "target_lakehouse_abfss_tables": target_lakehouse_abfss_tables,
    "target_lakehouse_abfss_files": target_lakehouse_abfss_files,
    "AZURE_SQL_SERVER": AZURE_SQL_SERVER,
    "AZURE_SQL_DB": AZURE_SQL_DB,
    "AZURE_SQL_SCHEMA": AZURE_SQL_SCHEMA,
})

print(f"Loaded parameter set: name={test_case_name} sanitized_name={sanitized_name}")
print(f" dataset_name={dataset_name} source={source} format={fmt} update_strategy={update_strategy}")
print('Using data_source_lakehouse_path =', data_source_lakehouse_path)
print('Target lakehouse (display) =', target_lakehouse, '| target_warehouse =', target_warehouse)
print('Target tables ABFSS (diagnostic) =', target_lakehouse_abfss_tables)


In [None]:
metrics_schema = StructType([
    StructField("test_case_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("source", StringType(), True),
    StructField("format", StringType(), True),
    StructField("rows", IntegerType(), True),
    StructField("update_strategy", StringType(), True),
    StructField("action", StringType(), True),
    StructField("ingest_time_s", FloatType(), True),
    StructField("spinup_time_s", FloatType(), True),
    StructField("query_type", StringType(), True),
    StructField("query_time_s", FloatType(), True),
    StructField("notes", StringType(), True)
])
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {target_lakehouse}.metrics (
    test_case_id STRING,
    timestamp TIMESTAMP,
    source STRING,
    format STRING,
    rows INT,
    update_strategy STRING,
    action STRING,
    ingest_time_s FLOAT,
    spinup_time_s FLOAT,
    query_type STRING,
    query_time_s FLOAT,
    notes STRING
)
""")
print('Ensured metrics table exists in', target_lakehouse)


In [None]:
# SQL helpers (only used when source == 'sql')
try:
    from notebookutils import mssparkutils
except Exception:
    mssparkutils = None

SQL_COPT_SS_ACCESS_TOKEN = 1256

def _token_struct():
    if not mssparkutils:
        raise RuntimeError('mssparkutils not available to get token')
    t = mssparkutils.credentials.getToken('https://database.windows.net/')
    exptoken = b''.join(bytes([c]) + b'\x00' for c in t.encode('utf-8'))
    return __import__('struct').pack('=i', len(exptoken)) + exptoken

def _pyodbc_conn_with_retry(server=None, database=None, timeout=120, retries=2, backoff=2):
    import pyodbc
    server = server or AZURE_SQL_SERVER
    database = database or AZURE_SQL_DB
    if not server or not database:
        raise RuntimeError('AZURE_SQL_SERVER and AZURE_SQL_DB must be set (or passed in)')
    if not server.lower().endswith('.database.windows.net'):
        server = server.rstrip('.') + '.database.windows.net'
    conn_str = (
        'Driver={ODBC Driver 18 for SQL Server};'
        f'Server=tcp:{server},1433;'
        f'Database={database};'
        'Encrypt=yes;TrustServerCertificate=no;'
    )
    last_exc = None
    for attempt in range(1, retries + 1):
        try:
            return pyodbc.connect(conn_str, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: _token_struct()}, timeout=timeout)
        except Exception as e:
            last_exc = e
            if attempt < retries:
                time.sleep(backoff * attempt)
            else:
                raise
    raise last_exc

print('SQL helper functions ready')


In [None]:
# --- Read source (updated) â€” single-pass timestamp canonicalization in Spark
spinup_start = time.time()
base_folder = data_source_lakehouse_path.rstrip('/')
print('Reading parquet from ABFSS folder:', base_folder)

# Optional listing for debugging
try:
    from notebookutils import mssparkutils
    try:
        entries = mssparkutils.fs.ls(base_folder)
        print('Listed entries:', [getattr(e, 'path', e) for e in entries[:20]])
    except Exception as _e:
        print('mssparkutils.fs.ls failed:', _e)
except Exception:
    pass

from pyspark.sql.functions import col

if source == 'lakehouse':
    try:
        df = spark.read.parquet(base_folder)
        print('Read rows (spark):', df.count())
    except Exception as e:
        raise SystemExit(f"Spark failed to read parquet from {base_folder}. Underlying error: {e}")

elif source == 'sql':
    # Read from Azure SQL (token-based, using mssparkutils if available)
    table_name_sql = f"{AZURE_SQL_SCHEMA}.base_{dataset_name}"
    print(f'Reading Azure SQL table {table_name_sql} from {AZURE_SQL_SERVER}/{AZURE_SQL_DB} (token-based)')
    conn = _pyodbc_conn_with_retry(server=AZURE_SQL_SERVER, database=AZURE_SQL_DB)
    try:
        pdf = pd.read_sql(f'SELECT * FROM {table_name_sql}', conn)
        print('Pandas rows read from SQL:', len(pdf))
        # create Spark DataFrame from pandas; canonicalize timestamps in Spark below
        df = spark.createDataFrame(pdf)
        print('Converted pandas -> spark rows:', df.count())
    finally:
        conn.close()
else:
    raise SystemExit(f'Unsupported source: {source}')

# Single pass: canonicalize any timestamp columns named ts_* into legacy Spark TimestampType
# (cast in Spark to avoid creating TimestampNTZType). Do NOT loop twice over all columns.
ts_cols = [c for c in df.columns if c.startswith('ts_')]
if ts_cols:
    print("Canonicalizing timestamp columns to legacy Spark TimestampType:", ts_cols)
    for c in ts_cols:
        df = df.withColumn(c, col(c).cast('timestamp'))

# Repartition by ts_1 to reduce skew (single-line, tuned partition count)
df = df.repartition(200, "ts_1")

spinup_end = time.time()
spinup_duration = spinup_end - spinup_start
print('Spinup duration (s):', spinup_duration)


In [None]:
# synapsesql needs this import to work
from com.microsoft.spark.fabric import Constants

import time
from pyspark.sql.functions import col

ingest_start = time.time()

if fmt == 'delta':
    table_full = f"{target_lakehouse}.{target_table}"
    print('Writing Delta table ->', table_full)
    # Direct saveAsTable â€” will raise on errors
    df.write.mode('overwrite').saveAsTable(table_full)
    ingest_end = time.time()
    ingest_duration = ingest_end - ingest_start

elif fmt == 'warehouse':
    table_full = f"{target_warehouse}.dbo.{target_table}"
    print('Writing Warehouse table ->', table_full)

    # Fabric writer currently fails on Spark TimestampNTZType -> map it to legacy timestamp
    # Detect fields whose Spark type is timestamp_ntz and cast them to legacy timestamp before write.
    # This produces a Spark TimestampType the connector knows how to map to SQL DATETIME2.
    ntz_cols = [f.name for f in df.schema.fields if getattr(f.dataType, "simpleString", lambda: "")().lower() == "timestamp_ntz"]
    if ntz_cols:
        print("Casting TimestampNTZ columns to legacy timestamp for synapsesql:", ntz_cols)
        for c in ntz_cols:
            df = df.withColumn(c, col(c).cast("timestamp"))

    # Direct synapsesql call â€” no try/except, will raise AttributeError or Java error if connector can't handle schema.
    df.write.mode('overwrite').synapsesql(table_full)

    ingest_end = time.time()
    ingest_duration = ingest_end - ingest_start

else:
    raise SystemExit(f'Unsupported format: {fmt}')

print('Ingest duration (s):', ingest_duration)


In [None]:
metrics_row = [
    (
        test_case_name,
        datetime.now(),
        source,
        fmt.upper(),
        int(df.count()),
        update_strategy,
        'initial_load',
        float(ingest_duration),
        float(spinup_duration),
        'N/A',
        float('nan'),
        f'Ingest from {source} into {fmt} target table {target_table}'
    )
]
spark.createDataFrame(metrics_row, schema=metrics_schema).write.mode('append').saveAsTable(f'{target_lakehouse}.metrics')
print('Metrics appended to', target_lakehouse + '.metrics')


In [None]:
print('Ingest step completed successfully for single parameter set.')
print('Summary:')
print(f" test_case: {test_case_name} | dataset: {dataset_name} | source: {source} | format: {fmt} | target_table: {target_table}")
