In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import os

In [0]:
# Defining the schema
purchases_schema = StructType([
    StructField("Customer_ID", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Purchase_Date", StringType(), True),
    StructField("Purchase_Amount", DoubleType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Discount_Applied", StringType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Repeat_Customer", StringType(), True)
])

In [0]:
# Defining the data path
raw_data_path = "/Volumes/walmart_purchases/landing/rawdata/"
catalog_name = "walmart_purchases"
schema_name = "bronze"
table_name = "bronze_purchases"
bronze_table = f"{catalog_name}.{schema_name}.{table_name}"

In [0]:
# List to store all csv files dynamically
csv_files = [f for f in os.listdir(raw_data_path) if f.endswith(".csv")]

# Reading and combining all csv files
combined_dfs = []
for f in csv_files:
    df = spark.read.format("csv") \
                .option("header", True) \
                .schema(purchases_schema) \
                .load(os.path.join(raw_data_path, f))
    combined_dfs.append(df)


In [0]:
# Combining rows from all dataframes 

landing_df = combined_dfs[0]
for df in combined_dfs[1:]:
    landing_df = landing_df.unionByName(df)


In [0]:
# Adding ingestion metadata
bronze_df = (
    landing_df \
    .withColumn("ingestion_date", date_format(current_timestamp(), "yyyy-MM-dd")) 
)


In [0]:
if spark.catalog.tableExists(bronze_table):
    last_load_date = (
        spark.table(bronze_table)
        .agg(max("Purchase_Date").alias("last_date"))
        .collect()[0]["last_date"]
    )

    if last_load_date:
        bronze_df = bronze_df.filter(col("Purchase_Date") > last_load_date)

    # Append new data
    (
        bronze_df.write \
                .format("delta") \
                .mode("append") \
                .saveAsTable(bronze_table)
    )

else:
    (
        bronze_df.write \
                .format("delta") \
                .mode("overwrite") \
                .saveAsTable(bronze_table)
    )

In [0]:
%sql
select * from walmart_purchases.bronze.bronze_purchases