# 01_modular_etl
Modular ETL pipeline (Ingest -> Enrich -> Validate).

In [None]:
from pyspark.sql import functions as F

# CONFIG - update for your environment
catalog = "finance"
schema = "kyc_ml"
raw_table = f"{catalog}.{schema}.raw_paysim"
enriched_table = f"{catalog}.{schema}.customer_enriched"
validated_table = f"{catalog}.{schema}.validated_data"
source_path = "dbfs:/FileStore/paysim.csv"   # change to your path

def ingest(path=source_path):
    df = (spark.read
          .option("header","true")
          .option("inferSchema","true")
          .csv(path))
    df.write.format("delta").mode("overwrite").saveAsTable(raw_table)
    print("Ingested ->", raw_table)
    return df

def enrich():
    df = spark.table(raw_table)
    df = df.withColumn("step", F.col("step").cast("int"))
    df = df.withColumn("accountType", F.col("type"))
    df = df.withColumn("date", F.date_add(F.to_date(F.lit("2020-01-01")), (F.col("step")/24).cast("int")))
    for col in ("nameOrig","nameDest"):
        if col in df.columns:
            df = df.drop(col)
    df.write.format("delta").mode("overwrite").saveAsTable(enriched_table)
    print("Enriched ->", enriched_table)
    return df

def validate():
    df = spark.table(enriched_table)
    df_valid = df.filter((F.col("amount").isNotNull()) & (F.col("amount") > 0))
    df_valid.write.format("delta").mode("overwrite").saveAsTable(validated_table)
    print("Validated ->", validated_table)
    return df_valid

# Run pipeline:
# ingest()
# enrich()
# validate()
print('Notebook loaded. Call ingest(), enrich(), validate() to run pipeline.')