In [1]:
from pyspark.sql import functions as F
from datetime import datetime

# One batch id for the whole run (helps tracking)
BATCH_ID = datetime.utcnow().strftime("%Y%m%d_%H%M%S")

print("BATCH_ID =", BATCH_ID)

# Source paths (Files area)
SRC = {
    "customers": "Files/source/customers_csv",
    "agents":    "Files/source/agents_csv",
    "policies":  "Files/source/policies_parquet",
    "premiums":  "Files/source/premiums_json",
    "claims":    "Files/source/claims_csv",
}


StatementMeta(, 961e8924-b44e-4496-8b1b-396e267357c3, 3, Finished, Available, Finished)

BATCH_ID = 20260129_163016


In [2]:
def read_source(name: str, fmt: str, path: str):
    if fmt == "csv":
        df = (spark.read
              .option("header", "true")
              .option("inferSchema", "true")
              .csv(path))
    elif fmt == "json":
        df = spark.read.json(path)
    elif fmt == "parquet":
        df = spark.read.parquet(path)
    else:
        raise ValueError(f"Unsupported format: {fmt}")

    # Add Bronze ingestion metadata
    df = (df
          .withColumn("_ingest_ts", F.current_timestamp())
          .withColumn("_batch_id", F.lit(BATCH_ID))
          .withColumn("_source_path", F.lit(path))
          .withColumn("_source_file", F.input_file_name()))
    return df

StatementMeta(, 961e8924-b44e-4496-8b1b-396e267357c3, 4, Finished, Available, Finished)

In [3]:
datasets = [
    ("customers", "csv",     SRC["customers"]),
    ("agents",    "csv",     SRC["agents"]),
    ("policies",  "parquet", SRC["policies"]),
    ("premiums",  "json",    SRC["premiums"]),
    ("claims",    "csv",     SRC["claims"]),
]

for name, fmt, path in datasets:
    df = read_source(name, fmt, path)

    table_name = f"bronze_{name}"

    # For initial load: overwrite is fine (repeatable)
    (df.write
       .format("delta")
       .mode("overwrite")
       .option("overwriteSchema", "true")
       .saveAsTable(table_name))

    print(f"✅ Ingested {name} -> {table_name} | rows={df.count()}")

StatementMeta(, 961e8924-b44e-4496-8b1b-396e267357c3, 5, Finished, Available, Finished)

✅ Ingested customers -> bronze_customers | rows=50000
✅ Ingested agents -> bronze_agents | rows=250
✅ Ingested policies -> bronze_policies | rows=120000
✅ Ingested premiums -> bronze_premiums | rows=1522899
✅ Ingested claims -> bronze_claims | rows=43294


In [4]:
for name in ["customers","agents","policies","premiums","claims"]:
    t = f"bronze_{name}"
    df = spark.table(t)
    print(t, "rows =", df.count(), "| columns =", len(df.columns))

StatementMeta(, 961e8924-b44e-4496-8b1b-396e267357c3, 6, Finished, Available, Finished)

bronze_customers rows = 50000 | columns = 14
bronze_agents rows = 250 | columns = 9
bronze_policies rows = 120000 | columns = 16
bronze_premiums rows = 1522899 | columns = 23
bronze_claims rows = 43294 | columns = 21


In [5]:
display(spark.table("bronze_customers").limit(5))
display(spark.table("bronze_policies").limit(5))
display(spark.table("bronze_premiums").limit(5))
display(spark.table("bronze_claims").limit(5))

StatementMeta(, 961e8924-b44e-4496-8b1b-396e267357c3, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cc9474ed-2f54-4478-b6d5-3181778b88eb)

SynapseWidget(Synapse.DataFrame, beadef83-0383-422f-9969-8b52a5c05025)

SynapseWidget(Synapse.DataFrame, 0bb435a0-1039-4cfe-bc90-14af7a9c3473)

SynapseWidget(Synapse.DataFrame, 2b8b6f91-d624-4dde-9c23-09346b7c584f)