In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta import *

In [2]:
# download latest spark binary from https://spark.apache.org/downloads.html
! export SPARK_HOME="/Users/kanchanapadmanabhan/OneDrive/Personal-Course/Vector/Model Deployment/mlops_course/Week 3/spark/spark-3.4.1-bin-hadoop3"
! export PATH=$SPARK_HOME/bin:$PATH

In [3]:
spark = SparkSession.builder \
    .appName("PySparkExample") \
    .master("local[*]")\
    .config("spark.default.parallelism", "4")\
    .config("spark.driver.bindAddress","localhost")\
    .config("spark.ui.port","4050")\
    .getOrCreate()


23/08/20 11:06:11 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.26 instead (on interface en0)
23/08/20 11:06:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 11:06:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
features_df = spark.read.format("csv").option("header",True).options(inferSchema='True',delimiter=',').load("../../retail_kaggle_dataset_spark/Features*")
features_df.printSchema()

[Stage 1:>                                                          (0 + 4) / 4]

root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



                                                                                

In [5]:
features_df.head(5)

[Row(Store=1, Date='05/02/2010', Temperature=42.31, Fuel_Price=2.572, MarkDown1='NA', MarkDown2='NA', MarkDown3='NA', MarkDown4='NA', MarkDown5='NA', CPI='211.0963582', Unemployment='8.106', IsHoliday=False),
 Row(Store=1, Date='12/02/2010', Temperature=38.51, Fuel_Price=2.548, MarkDown1='NA', MarkDown2='NA', MarkDown3='NA', MarkDown4='NA', MarkDown5='NA', CPI='211.2421698', Unemployment='8.106', IsHoliday=True),
 Row(Store=1, Date='19/02/2010', Temperature=39.93, Fuel_Price=2.514, MarkDown1='NA', MarkDown2='NA', MarkDown3='NA', MarkDown4='NA', MarkDown5='NA', CPI='211.2891429', Unemployment='8.106', IsHoliday=False),
 Row(Store=1, Date='26/02/2010', Temperature=46.63, Fuel_Price=2.561, MarkDown1='NA', MarkDown2='NA', MarkDown3='NA', MarkDown4='NA', MarkDown5='NA', CPI='211.3196429', Unemployment='8.106', IsHoliday=False),
 Row(Store=1, Date='05/03/2010', Temperature=46.5, Fuel_Price=2.625, MarkDown1='NA', MarkDown2='NA', MarkDown3='NA', MarkDown4='NA', MarkDown5='NA', CPI='211.3501429

In [6]:
filtered_df = features_df.filter("IsHoliday = True")

In [7]:
output_path = "features_df_filter_new.csv"
filtered_df.write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [8]:
features_df = features_df.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))
features_df = features_df.withColumn("IsHoliday", features_df["IsHoliday"].cast("boolean"))

# Compute year_month_first and month columns
features_df = features_df.withColumn("year_month_first", F.date_trunc("month", F.add_months("Date", -1)))
features_df = features_df.withColumn("month", F.month("Date"))

# Group by Store and year_month_first, then aggregate using Spark functions
features_df_month = features_df.groupby("Store", "year_month_first").agg(
    F.expr("percentile_approx(Temperature, 0.5)").alias("Temperature_median"),
    F.expr("percentile_approx(Fuel_Price, 0.5)").alias("Fuel_Price_median"),
    F.avg("MarkDown1").alias("MarkDown1_mean"),
    F.avg("MarkDown2").alias("MarkDown2_mean"),
    F.avg("MarkDown3").alias("MarkDown3_mean"),
    F.avg("MarkDown4").alias("MarkDown4_mean"),
    F.avg("MarkDown5").alias("MarkDown5_mean"),
    F.avg("CPI").alias("CPI_mean"),
    F.avg("Unemployment").alias("Unemployment_mean"),
    F.max("IsHoliday").alias("IsHoliday_sum"),
    F.max("month").alias("month_max")
)


In [9]:
# Show the resulting DataFrame
features_df_month.show()

                                                                                

+-----+----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------+---------+
|Store|year_month_first|Temperature_median|Fuel_Price_median|    MarkDown1_mean|    MarkDown2_mean|    MarkDown3_mean|    MarkDown4_mean|    MarkDown5_mean|          CPI_mean| Unemployment_mean|IsHoliday_sum|month_max|
+-----+----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------+---------+
|   31|            null|             67.87|            3.353|10432.855111111097| 3049.626438356164|1221.0650000000005| 3400.065333333329| 7546.233666666672|  216.918639708284| 7.403958579881612|         true|     null|
|   34|            null|             58.59|            3.354| 5385.857111111113|2145.8228378378376|1371.2926744186034|1709.0

In [10]:
output_path = "features_df_month_new.csv"
features_df_month.write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [11]:
# Stop the Spark session
spark.stop()