In [0]:
from pyspark.sql.types import (StructType,StructField,StringType,DoubleType,IntegerType,DateType)
from pyspark.sql.functions import col

def log(step):
    print(f"[INGESTION]{step}")

In [0]:
log("Defining Schema of Walmart Sales")

Sales_schema=StructType([
    StructField("Store",IntegerType(),True),
    StructField("Date",DateType(),True),
    StructField("Department",IntegerType(),True),
    StructField("Weekly_Sales",IntegerType(),True),
    StructField("Temperature",DoubleType(),True),
    StructField("Fuel_Price",DoubleType(),True),
    StructField("CPI",DoubleType(),True),
    StructField("Unemployment",DoubleType(),True)

])

In [0]:
sales_df=spark.read.table("walmart_sales")

In [0]:
sales_df.show(5)

In [0]:
sales_df.printSchema()


In [0]:
from pyspark.sql.functions import col,to_date
clean_sales_df=(sales_df.select(
    col("Store").cast("int").alias("STORE"),
    to_date(col("Date"),"YYYY-MM-dd").alias("DATE"),
    col("Weekly_Sales").cast("int").alias("Weekly_Sales"),
    col("Holiday_Flag").cast("int").alias("Holiday_Flag"),
    col("Temperature").cast("double").alias("Temperature"),
    col("Fuel_Price").cast("double").alias("Fuel_Price"),
    col("CPI").cast("double").alias("CPI"),
    col("Unemployment").cast("double").alias("Unemployment")
)
)



In [0]:
clean_sales_df.show(5)
clean_sales_df.printSchema()

In [0]:
clean_sales_df.write.mode("overwrite").saveAsTable("sales_df")

In [0]:
raw_sales_df=spark.table("sales_df")
raw_sales_df.show(5)
raw_sales_df.printSchema()

In [0]:
from pyspark.sql.functions import col

clean_df=(
    raw_sales_df
        .filter(col("Weekly_Sales").isNotNull())
        .filter(col("Weekly_Sales")>=0)
        .filter(col("STORE").isNotNull())
)
print("Before Cleaning ",raw_sales_df.count())
print("After Cleaning",clean_df.count())


In [0]:
from pyspark.sql.functions import year,month,weekofyear
clean_df=(clean_df
          .withColumn("Year",year(col("Date")))
          .withColumn("Month",month(col("Date")))
          .withColumn("Week",weekofyear(col("Date")))
          )

clean_df.select("Date","Year","Month","Week").show(5)


In [0]:
from pyspark.sql.functions import when
clean_df=(clean_df.
          withColumn("Is_Holiday",
                     when(col("Holiday_Flag")==1,True).otherwise(False)
          )
)
clean_df.show(5)

In [0]:
final_clean_df=clean_df.select(
    "STORE","DATE","Weekly_Sales","Holiday_Flag","Temperature","Fuel_Price","CPI","Unemployment","Year","Month","Week","Is_Holiday")

final_clean_df.show(5)
final_clean_df.printSchema()

In [0]:
final_clean_df.write.mode("overwrite").saveAsTable("Clean_Walmart_Sales")

In [0]:
spark.table("Clean_Walmart_Sales").count()

In [0]:
agg_df=spark.table("workspace.default.clean_walmart_sales")

agg_df.show(5)
agg_df.printSchema()

In [0]:
from pyspark.sql.functions import sum as spark_sum
monthly_sales_df=(
    agg_df
    .groupBy("Store","Year","Month")
    .agg(
        spark_sum("Weekly_Sales").alias("Total_monthly_Sales")
    )
    )
monthly_sales_df.show(5)

In [0]:
from pyspark.sql.functions import sum as spark_sum
year_df=(agg_df
         .groupBy("Store","Year")
         .agg(
                spark_sum("Weekly_Sales").alias("Total_yearly_Sales")
                )
)
year_df.show(5)

In [0]:
from pyspark.sql.functions import sum as spark_sum
holiday_sales_df=(agg_df
                  .groupBy("Store","Year","Is_Holiday")
                  .agg(
                      spark_sum("Weekly_Sales").alias("Total_Sales")
                  )
)
holiday_sales_df.show(5)

In [0]:
monthly_sales_df.write.mode("overwrite").saveAsTable("Curated_Monthly_Sales")
year_df.write.mode("overwrite").saveAsTable("Curated_yearly_Sales")
holiday_sales_df.write.mode("overwrite").saveAsTable("Curated_holiday_Sales")


In [0]:
spark.table("Curated_Monthly_Sales").show(5)
spark.table("Curated_yearly_Sales").show(5)
spark.table("Curated_holiday_Sales").show(5)

In [0]:
DQ_df=spark.table("clean_walmart_sales")
total_rows=DQ_df.count()
print("Total Rows:",total_rows)

In [0]:
DQ_df.printSchema()

In [0]:
from pyspark.sql.functions import count
DQ_nulls_df=DQ_df.select(
    count("*").alias("total_rows"),
    count(when(col("Weekly_Sales").isNull(),1  )).alias("null_weekly_sales"),
    count(when(col("Temperature").isNull(),1)).alias("null_Temperature"),
    count(when(col("Fuel_Price").isNull(),1)).alias("null_Fuel_price"),
    count(when(col("CPI").isNull(),1)).alias("null_CPI"),
    count(when(col("Unemployment").isNull(),1)).alias("null_Unemployment")
    )

DQ_nulls_df.show()
                                                     

In [0]:
from pyspark.sql.functions import count,col
DQ_metrics_nulls_df=DQ_nulls_df.select(
    col("total_rows"),
    (col("null_weekly_sales")/col("total_rows")*100).alias("per_null_weekly_sales"),
    (col("null_Temperature")/col("total_rows")*100).alias("per_null_Temperature"),
    (col("null_Fuel_price")/col("total_rows")*100).alias("per_null_Fuel_price"),
    (col("null_CPI")/col("total_rows")*100).alias("per_null_CPI"),
    (col("null_Unemployment")/col("total_rows")*100).alias("per_null_Unemployment")
    )

DQ_metrics_nulls_df.show()

In [0]:
invalid_sales = agg_df.filter(col("Weekly_Sales")<=0).count()
invalid_stores=agg_df.filter(col("STORE")<=0).count()
invalid_years=agg_df.filter(col("year")<=2000).count()

print("Invalid Weekly_Sales:",invalid_sales)
print("Invalid Store IDs:",invalid_stores)
print("Invalid Years values",invalid_years)

In [0]:
assert invalid_sales==0,"Negative Sales Found"
assert invalid_stores==0,"Invalid Store ID Found"
assert invalid_years==0,"Invalid Years Found"

In [0]:
from pyspark.sql.functions import current_timestamp

df_final_df=DQ_metrics_nulls_df.withColumn(
    "dq_run_timestamp",current_timestamp()
)

df_final_df.write.mode("overwrite").saveAsTable("dq_walmart_sales_metrics")

df_final_df.show(5)

In [0]:
clean_df=spark.table("clean_walmart_sales")
spark.conf.get("spark.sql.shuffle.partitions")
display(clean_df)
spark.conf.set("spark.sql.shuffle.partitions",50)


In [0]:
optimized_monthly_df=monthly_sales_df.coalesce(8)

optimized_monthly_df.write.mode("overwrite").saveAsTable("curated_monthly_sales")

In [0]:
monthly_sales_df.explain()

In [0]:
spark.sql("""
          create table if not exists curated_monthly_sales_delta
          using delta
          as
          select * from curated_monthly_sales
          """)

In [0]:
display(spark.sql("DESCRIBE DETAIL curated_monthly_sales_delta"))

In [0]:
from pyspark.sql.functions import lit

incremental_df=(
    spark.table("clean_walmart_sales")
    .filter("Year=2012 and Month=12")
    .withColumn("Total_Monthly_Sales",lit(999999.0))
    .select("STORE","Year","Month","Total_Monthly_Sales")
)

incremental_df.show(5)


In [0]:
incremental_df.createOrReplaceTempView("incremental_updates")
spark.sql("""
MERGE INTO curated_monthly_sales_delta AS target
USING incremental_updates AS source
ON target.Store = source.Store
AND target.Year = source.Year
AND target.Month = source.Month

WHEN MATCHED THEN
  UPDATE SET target.Total_Monthly_Sales = source.Total_Monthly_Sales

WHEN NOT MATCHED THEN
  INSERT *
""")

In [0]:
spark.table("curated_monthly_sales_delta") \
     .filter("Year = 2012 AND Month = 12") \
     .show()

In [0]:
spark.sql("""
SELECT * FROM curated_monthly_sales_delta VERSION AS OF 0
""")