In [0]:
configs = {"fs.azure.account.auth.type": "OAuth", 
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", 
           "fs.azure.account.oauth2.client.id": "Application (client) ID", #fill with Application (client) ID
           "fs.azure.account.oauth2.client.secret": 'secret key', # fill with secret key
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/Directory (tenant) ID/oauth2/token"} # fill with Directory (tenant) ID


dbutils.fs.mount(
source = "abfss://walmart-data@walmartdatavidit.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/walmartdata",
extra_configs = configs)

In [0]:
%fs
ls "/mnt/walmartdata"

path,name,size,modificationTime
dbfs:/mnt/walmartdata/raw-data/,raw-data/,0,1738814961000
dbfs:/mnt/walmartdata/transformed-data/,transformed-data/,0,1738814980000


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.functions import avg, max, min, count
spark

In [0]:
f1 = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/mnt/walmartdata/raw-data/features.csv")
s1 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/walmartdata/raw-data/stores.csv")
tr1 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/walmartdata/raw-data/train.csv")

In [0]:
f1.show()

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    false|
|    1|2010-02-12|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|     true|
|    1|2010-02-19|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|    false|
|    1|2010-02-26|      46.63|     2.561|       NA|       NA|       NA|       NA|       NA|211.3196429|       8.106|    false|
|    1|2010-03-05|       46.5|     2.625|       NA|       NA|       NA|       NA|       NA|211.3501429|       8

Below we are type casting the correct datatypes since even after infer_schema =True data types were considered wrong

In [0]:
#Typecasting data type since even after infer_schema =True data types were considered wrong

f1 = f1.withColumn("Store",col("Store").cast("Integer"))
f1 = f1.withColumn("Date",col("Date").cast("Date"))
f1 = f1.withColumn("Temperature",col("Temperature").cast("Float"))

f1 = f1.withColumn("MarkDown1",col("MarkDown1").cast("Float"))
f1 = f1.withColumn("MarkDown2",col("MarkDown2").cast("Float"))
f1 = f1.withColumn("MarkDown3",col("MarkDown3").cast("Float"))
f1 = f1.withColumn("MarkDown4",col("MarkDown4").cast("Float"))
f1 = f1.withColumn("MarkDown5",col("MarkDown5").cast("Float"))
f1 = f1.withColumn("CPI",col("CPI").cast("Float"))
f1 = f1.withColumn("Unemployment",col("Unemployment").cast("Float"))
f1 = f1.withColumn("IsHoliday",col("IsHoliday").cast("String"))
f1.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday: string (nullable = true)



Lets find out the Number of null values in each column of Markdown1 - 5

In [0]:
# finding number of Null values in MarkDown1-5
print(f1.where((f1.MarkDown1.isNull())).count(),
f1.where((f1.MarkDown2.isNull())).count(),
f1.where((f1.MarkDown3.isNull())).count(),
f1.where((f1.MarkDown4.isNull())).count(),
f1.where((f1.MarkDown5.isNull())).count())

4158 5269 4577 4726 4140


Finding Mean values per column to be filled in Null values

In [0]:
# Finding Mean values in each column
m1_mean = f1.select(mean("MarkDown1")).collect()[0][0]

m2_mean = f1.select(mean("MarkDown2")).collect()[0][0]
m3_mean = f1.select(mean("MarkDown3")).collect()[0][0]
m4_mean = f1.select(mean("MarkDown4")).collect()[0][0]
m5_mean = f1.select(mean("MarkDown5")).collect()[0][0]
print(m1_mean)
print(m2_mean)
print(m3_mean)
print(m4_mean)

7032.371786093377
3384.176592808865
1760.100175767131
3292.9358917338986


Filling Null values with Mean per column

In [0]:
f1=f1.fillna({"MarkDown1":m1_mean})
f1=f1.fillna({"MarkDown2":m2_mean})
f1=f1.fillna({"MarkDown3":m3_mean})
f1=f1.fillna({"MarkDown4":m4_mean})
f1=f1.fillna({"MarkDown5":m5_mean})

In [0]:
# Checking number of Null values after process in MarkDown1-5
print(f1.where((f1.MarkDown1.isNull())).count(),
f1.where((f1.MarkDown2.isNull())).count(),
f1.where((f1.MarkDown3.isNull())).count(),
f1.where((f1.MarkDown4.isNull())).count(),
f1.where((f1.MarkDown5.isNull())).count())

0 0 0 0 0


We have identified null values in Markdown1 - 5 columns and replaced it with mean value of specific columns 

Now Lets check if Date column is in the "yyyy-MM-dd"

In [0]:
f1.select("Date").show()

+----------+
|      Date|
+----------+
|2010-02-05|
|2010-02-12|
|2010-02-19|
|2010-02-26|
|2010-03-05|
|2010-03-12|
|2010-03-19|
|2010-03-26|
|2010-04-02|
|2010-04-09|
|2010-04-16|
|2010-04-23|
|2010-04-30|
|2010-05-07|
|2010-05-14|
|2010-05-21|
|2010-05-28|
|2010-06-04|
|2010-06-11|
|2010-06-18|
+----------+
only showing top 20 rows



Hence no format changing needed for date column,
Now we will calculate a 4-Week Moving Average for the train.csv file to make forecasting easy in Dashboard graphs

In [0]:
tr1 = tr1.withColumn("Weekly_Sales",col("Weekly_Sales").cast("Float"))
tr1.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_Sales: float (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



Defining a Window for 4-Week Moving Average (Including Current Row)

In [0]:
# Define a Window for 4-Week Moving Average (Including Current Row)
window_spec = Window.partitionBy("Store", "Dept").orderBy("Date").rowsBetween(-3, 0)

In [0]:
# Compute the Weekly Moving Average
tr1 = tr1.withColumn("4_Week_Moving_Avg", avg(col("Weekly_Sales")).over(window_spec))
tr1 = tr1.withColumn("4_Week_Moving_Avg",col("4_Week_Moving_Avg").cast("Float"))

In [0]:
tr1.show()

+-----+----+----------+------------+---------+-----------------+
|Store|Dept|      Date|Weekly_Sales|IsHoliday|4_Week_Moving_Avg|
+-----+----+----------+------------+---------+-----------------+
|    1|   1|2010-02-05|     24924.5|    false|          24924.5|
|    1|   1|2010-02-12|    46039.49|     true|        35481.992|
|    1|   1|2010-02-19|    41595.55|    false|        37519.848|
|    1|   1|2010-02-26|    19403.54|    false|         32990.77|
|    1|   1|2010-03-05|     21827.9|    false|         32216.62|
|    1|   1|2010-03-12|    21043.39|    false|        25967.596|
|    1|   1|2010-03-19|    22136.64|    false|        21102.867|
|    1|   1|2010-03-26|    26229.21|    false|        22809.285|
|    1|   1|2010-04-02|    57258.43|    false|        31666.918|
|    1|   1|2010-04-09|    42960.91|    false|        37146.297|
|    1|   1|2010-04-16|    17596.96|    false|         36011.38|
|    1|   1|2010-04-23|    16145.35|    false|        33490.414|
|    1|   1|2010-04-30|  

In [0]:
tr1.select(min("Weekly_Sales")).show()

+-----------------+
|min(Weekly_Sales)|
+-----------------+
|         -4988.94|
+-----------------+



Aggregating Total_Weekly_Sales, Avg_Weekly_Sales, Max_Weekly_Sales, Min_Weekly_Sales, Total_Transactions per Store to rank top performers as storewise_stats

In [0]:
#grouping Weekly_Sales per Store

storewise_stats = (
    tr1.groupBy("Store")
    .agg(
        sum("Weekly_Sales").alias("Total_Weekly_Sales"),
        avg("Weekly_Sales").alias("Avg_Weekly_Sales"),
        max("Weekly_Sales").alias("Max_Weekly_Sales"),
        min("Weekly_Sales").alias("Min_Weekly_Sales"),
        count("Weekly_Sales").alias("Total_Transactions")  # Counts number of sales records per store
    )
    .orderBy("Store")
    .withColumn("Total_Weekly_Sales", col("Total_Weekly_Sales").cast("decimal(20,2)"))
    .withColumn("Avg_Weekly_Sales", col("Avg_Weekly_Sales").cast("decimal(20,2)"))
    .withColumn("Max_Weekly_Sales", col("Max_Weekly_Sales").cast("decimal(20,2)"))
    .withColumn("Min_Weekly_Sales", col("Min_Weekly_Sales").cast("decimal(20,2)"))
)

storewise_stats.show()

+-----+------------------+----------------+----------------+----------------+------------------+
|Store|Total_Weekly_Sales|Avg_Weekly_Sales|Max_Weekly_Sales|Min_Weekly_Sales|Total_Transactions|
+-----+------------------+----------------+----------------+----------------+------------------+
|    1|      222402808.88|        21710.54|       203670.47|         -863.00|             10244|
|    2|      275382440.86|        26898.07|       285353.53|        -1098.00|             10238|
|    3|       57586735.05|         6373.03|       155897.94|        -1008.96|              9036|
|    4|      299543953.46|        29161.21|       385051.03|         -898.00|             10272|
|    5|       45475688.87|         5053.42|        93517.72|         -101.26|              8999|
|    6|      223756130.79|        21913.24|       342578.66|         -698.00|             10211|
|    7|       81598275.18|         8358.77|       222921.09|         -459.00|              9762|
|    8|      129951181.15|    

We are joining Stores.csv to storewise_stats for creating one more df for data analytics and BI reports called Transformedfile1

In [0]:
Transformedfile1 = storewise_stats.join(s1, on="Store", how="left")
Transformedfile1.show()

+-----+------------------+----------------+----------------+----------------+------------------+----+------+
|Store|Total_Weekly_Sales|Avg_Weekly_Sales|Max_Weekly_Sales|Min_Weekly_Sales|Total_Transactions|Type|  Size|
+-----+------------------+----------------+----------------+----------------+------------------+----+------+
|   12|      144287230.04|        14867.31|       360140.66|         -598.00|              9705|   B|112238|
|    1|      222402808.88|        21710.54|       203670.47|         -863.00|             10244|   A|151315|
|   13|      286517703.72|        27355.14|       292165.78|          -98.00|             10474|   A|219622|
|    6|      223756130.79|        21913.24|       342578.66|         -698.00|             10211|   A|202505|
|    3|       57586735.05|         6373.03|       155897.94|        -1008.96|              9036|   B| 37392|
|    5|       45475688.87|         5053.42|        93517.72|         -101.26|              8999|   B| 34875|
|   15|       89133

In [0]:
f1_tr1_join = f1.join(tr1, ["Store", "Date","IsHoliday"], "inner")
f1_tr1_join.show()

+-----+----------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------------+-----------------+
|Store|      Date|IsHoliday|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|Dept|Weekly_Sales|4_Week_Moving_Avg|
+-----+----------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------------+-----------------+
|    1|2010-02-05|    false|      42.31|     2.572|7032.3716|3384.1765|1760.1002|3292.9358|4132.2163|211.09636|       8.106|   1|     24924.5|          24924.5|
|    1|2010-02-12|     true|      38.51|     2.548|7032.3716|3384.1765|1760.1002|3292.9358|4132.2163|211.24217|       8.106|   1|    46039.49|        35481.992|
|    1|2010-02-19|    false|      39.93|     2.514|7032.3716|3384.1765|1760.1002|3292.9358|4132.2163|211.28914|       8.106|   1|    41595.55|        37519.848|
|    1|2010-02-26|    false|      

In [0]:
f1_tr1_join.schema.names
#Transformedfile1.schema.names

['Store',
 'Date',
 'IsHoliday',
 'Temperature',
 'Fuel_Price',
 'MarkDown1',
 'MarkDown2',
 'MarkDown3',
 'MarkDown4',
 'MarkDown5',
 'CPI',
 'Unemployment',
 'Dept',
 'Weekly_Sales',
 '4_Week_Moving_Avg']

Saving the transformed data frames as csv files: 

f1_tr1_join as joined_data.csv, 
Transformedfile1 as store_wise_stats.csv


In [0]:
# Write Transformedfile1 to CSV
Transformedfile1.repartition(1).write.option("header", "true").mode("overwrite").csv("/mnt/walmartdata/transformed-data/store_wise_stats")

# Write f1_tr1_join to CSV
f1_tr1_join.repartition(1).write.option("header", "true").mode("overwrite").csv("/mnt/walmartdata/transformed-data/joined_data")
