# Logistics ETL – Function Based PySpark Pipeline


In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [ ]:
def read_csv_df(spark, path, header=True, infer_schema=True, sep=","):
    return spark.read.option("header", header).option("inferSchema", infer_schema).option("sep", sep).csv(path)

def read_json_df(spark, path):
    return spark.read.json(path)


In [ ]:
def cleanse_staff_df(df):
    return df.dropna(subset=["shipment_id"]).fillna({"vehicle_type": "UNKNOWN"})

def enrich_staff_df(df):
    return df.withColumn("load_dt", current_timestamp())

def enrich_shipment_df(df):
    return df.withColumn("shipment_year", year("shipment_date"))

def join_staff_shipments(staff_df, shipment_df):
    return staff_df.join(shipment_df, "shipment_id", "left")


In [ ]:
staff1 = read_csv_df(spark, "/FileStore/logistics_source1.csv")
staff2 = read_csv_df(spark, "/FileStore/logistics_source2.csv")
shipment = read_json_df(spark, "/FileStore/logistics_shipment_detail_3000.json")

staff = staff1.unionByName(staff2, allowMissingColumns=True)
staff_clean = cleanse_staff_df(staff)
staff_enriched = enrich_staff_df(staff_clean)
shipment_enriched = enrich_shipment_df(shipment)
final_df = join_staff_shipments(staff_enriched, shipment_enriched)
display(final_df)
