In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
#Creating spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
#Create Spark Dataraframe and reading data using Dataframe reader API
shopping_df= spark.read.format('csv').option('header',True).load('shopping.csv')
shopping_df.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Item Purchased: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Purchase Amount (USD): string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Review Rating: string (nullable = true)
 |-- Subscription Status: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Shipping Type: string (nullable = true)
 |-- Discount Applied: string (nullable = true)
 |-- Promo Code Used: string (nullable = true)
 |-- Previous Purchases: string (nullable = true)
 |-- Preferred Payment Method: string (nullable = true)
 |-- Frequency of Purchases: string (nullable = true)



In [4]:
#Validating the schema of the Dataframe using inferSchema function
shopping_df= spark.read.format('csv').option('inferSchema',True).option('header',True).load('shopping.csv')
shopping_df.show()

+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|Customer ID|Age|Gender|Item Purchased|   Category|Purchase Amount (USD)|     Location|Size|    Color|Season|Review Rating|Subscription Status|Payment Method| Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Preferred Payment Method|Frequency of Purchases|
+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|          1| 55|  Male|        Blouse|   Clothing|                   53|     Kentucky|   L|     Gray|Winter|          3.1|                Yes|   Credit Card|       Express|     

In [5]:
#Checking the schema after validation
shopping_df.printSchema()

root
 |-- Customer ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Item Purchased: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Purchase Amount (USD): integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Review Rating: double (nullable = true)
 |-- Subscription Status: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Shipping Type: string (nullable = true)
 |-- Discount Applied: string (nullable = true)
 |-- Promo Code Used: string (nullable = true)
 |-- Previous Purchases: integer (nullable = true)
 |-- Preferred Payment Method: string (nullable = true)
 |-- Frequency of Purchases: string (nullable = true)



In [6]:
shopping_df.describe().show()

+-------+------------------+-----------------+------+--------------+-----------+---------------------+--------+----+------+------+------------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|summary|       Customer ID|              Age|Gender|Item Purchased|   Category|Purchase Amount (USD)|Location|Size| Color|Season|     Review Rating|Subscription Status|Payment Method| Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Preferred Payment Method|Frequency of Purchases|
+-------+------------------+-----------------+------+--------------+-----------+---------------------+--------+----+------+------+------------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|  count|              3900|             3900|  3900|          3900|       3900|                 390

In [11]:
#Renaming the column names using the ColumnRenamed
shopping_df = shopping_df.withColumnRenamed("Customer ID", "Customer_ID").withColumnRenamed("Item Purchased", "Item_Purchased")\
.withColumnRenamed("Purchase Amount (USD)","Purchase_Amount_(USD)")\
.withColumnRenamed("Review Rating","Review_Rating")\
.withColumnRenamed("Subscription Status","Subscription_Status")\
.withColumnRenamed("Payment Method","Payment_Method")\
.withColumnRenamed("Shipping Type", "Shipping_Type")\
.withColumnRenamed("Discount Applied", "Discount_Applied")\
.withColumnRenamed("Promo Code Used", "Promo_Code_Used")\
.withColumnRenamed("Previous Purchases", "Previous_Purchases")\
.withColumnRenamed("Preferred Payment Method", "Preferred_Payment_Method")\
.withColumnRenamed("Frequency of Purchases", "Frequency_of_Purchases")


shopping_df.show()

+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|Customer_ID|Age|Gender|Item_Purchased|   Category|Purchase_Amount_(USD)|     Location|Size|    Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Discount_Applied|Promo_Code_Used|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|
+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|          1| 55|  Male|        Blouse|   Clothing|                   53|     Kentucky|   L|     Gray|Winter|          3.1|                Yes|   Credit Card|       Express|     

In [14]:
#Changing the Datatype of the Columns
from pyspark.sql.types import IntegerType,DoubleType
shopping_df = shopping_df.withColumn("Customer_ID", shopping_df["Customer_ID"].cast(IntegerType()))
shopping_df = shopping_df.withColumn("Age", shopping_df["Age"].cast(IntegerType()))
shopping_df = shopping_df.withColumn("Purchase_Amount_(USD)", shopping_df["Purchase_Amount_(USD)"].cast(IntegerType()))
shopping_df = shopping_df.withColumn("Review_Rating", shopping_df["Review_Rating"].cast(DoubleType()))
shopping_df = shopping_df.withColumn("Previous_Purchases", shopping_df["Previous_Purchases"].cast(IntegerType()))



In [15]:
shopping_df.printSchema()

root
 |-- Customer_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Item_Purchased: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Purchase_Amount_(USD): integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Review_Rating: double (nullable = true)
 |-- Subscription_Status: string (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Shipping_Type: string (nullable = true)
 |-- Discount_Applied: string (nullable = true)
 |-- Promo_Code_Used: string (nullable = true)
 |-- Previous_Purchases: integer (nullable = true)
 |-- Preferred_Payment_Method: string (nullable = true)
 |-- Frequency_of_Purchases: string (nullable = true)



In [16]:
ecommerce_df.show()

+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|Customer_ID|Age|Gender|Item_Purchased|   Category|Purchase_Amount_(USD)|     Location|Size|    Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Discount_Applied|Promo_Code_Used|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|
+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+
|          1| 55|  Male|        Blouse|   Clothing|                   53|     Kentucky|   L|     Gray|Winter|          3.1|                Yes|   Credit Card|       Express|     

In [21]:
from pyspark.sql.functions import col
shopping_df.filter(shopping_df.Customer_ID.isNull()).show()
shopping_df.filter(shopping_df.Age.isNull()).show()
shopping_df.filter(shopping_df.Gender.isNull()).show()
shopping_df.filter(shopping_df.Item_Purchased.isNull()).show()
shopping_df.filter(shopping_df.Category.isNull()).show()
shopping_df.filter(shopping_df.Promo_Code_Used.isNull()).show()
shopping_df.filter(shopping_df.Location.isNull()).show()
shopping_df.filter(shopping_df.Size.isNull()).show()
shopping_df.filter(shopping_df.Color.isNull()).show()
shopping_df.filter(shopping_df.Season.isNull()).show()
shopping_df.filter(shopping_df.Review_Rating.isNull()).show()
shopping_df.filter(shopping_df.Subscription_Status.isNull()).show()
shopping_df.filter(shopping_df.Payment_Method.isNull()).show()
shopping_df.filter(shopping_df.Shipping_Type.isNull()).show()
shopping_df.filter(shopping_df.Discount_Applied.isNull()).show()
shopping_df.filter(shopping_df.Previous_Purchases.isNull()).show()
shopping_df.filter(shopping_df.Preferred_Payment_Method.isNull()).show()
shopping_df.filter(shopping_df.Frequency_of_Purchases.isNull()).show()

+-----------+---+------+--------------+--------+---------------------+--------+----+-----+------+-------------+-------------------+--------------+-------------+----------------+---------------+------------------+------------------------+----------------------+
|Customer_ID|Age|Gender|Item_Purchased|Category|Purchase_Amount_(USD)|Location|Size|Color|Season|Review_Rating|Subscription_Status|Payment_Method|Shipping_Type|Discount_Applied|Promo_Code_Used|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|
+-----------+---+------+--------------+--------+---------------------+--------+----+-----+------+-------------+-------------------+--------------+-------------+----------------+---------------+------------------+------------------------+----------------------+
+-----------+---+------+--------------+--------+---------------------+--------+----+-----+------+-------------+-------------------+--------------+-------------+----------------+---------------+------------------+-----

In [23]:
shopping_dfw = shopping_df.dropna(subset=["Review_Rating"], how = 'any' )

In [26]:
from pyspark.sql.functions import current_timestamp
shopping_in = shopping_dfw.withColumn("Ingestion_Date",current_timestamp())
shopping_in.show()

+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+--------------------+
|Customer_ID|Age|Gender|Item_Purchased|   Category|Purchase_Amount_(USD)|     Location|Size|    Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Discount_Applied|Promo_Code_Used|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|      Ingestion_Date|
+-----------+---+------+--------------+-----------+---------------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+----------------+---------------+------------------+------------------------+----------------------+--------------------+
|          1| 55|  Male|        Blouse|   Clothing|                   53|     Kentucky|   L|     Gray|Winter|      

In [36]:
from datetime import date
from pyspark.sql.functions import *

shopping_df_final = shopping_in.select(
    shopping_in.Customer_ID,
    shopping_in.Discount_Applied.alias('Promo_Code_Used'),
    to_date(shopping_in['Purchase_Amount_(USD)']).alias('Purchase_Amount_(USD)'),  # Removed square brackets
    shopping_in.Age,
    shopping_in.Gender,
    shopping_in.Item_Purchased,
    shopping_in.Category,
    shopping_in.Discount_Applied,
    shopping_in.Location,
    shopping_in.Size,
    shopping_in.Color,
    shopping_in.Season,
    shopping_in.Review_Rating,
    shopping_in.Subscription_Status,
    shopping_in.Payment_Method,
    shopping_in.Shipping_Type,
    shopping_in.Previous_Purchases,
    shopping_in.Preferred_Payment_Method,
    shopping_in.Frequency_of_Purchases,
    shopping_in.Ingestion_Date
)

shopping_df_final.show()

+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|Customer_ID|Promo_Code_Used|Purchase_Amount_(USD)|Age|Gender|Item_Purchased|   Category|Discount_Applied|     Location|Size|    Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|      Ingestion_Date|
+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|          1|            Yes|                 NULL| 55|  Male|        Blouse|   Clothing|             Yes|     Kent

In [38]:
shopping_df_final.createOrReplaceTempView("rfa")
spark.sql("SELECT COUNT(*) FROM rfa WHERE Promo_Code_Used = 'No'").show()

+--------+
|count(1)|
+--------+
|    2223|
+--------+



In [39]:
shopping_df_final.show()
shopping_df_final.write.mode("overwrite").partitionBy('Promo_Code_Used').parquet("shopping")

+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|Customer_ID|Promo_Code_Used|Purchase_Amount_(USD)|Age|Gender|Item_Purchased|   Category|Discount_Applied|     Location|Size|    Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|      Ingestion_Date|
+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+-------------+----+---------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|          1|            Yes|                 NULL| 55|  Male|        Blouse|   Clothing|             Yes|     Kent

In [64]:
Shopping_No = spark.read.format('parquet').option('header', True).load("shopping/Promo_Code_Used=No/part-00000-3134cf2a-b59d-4159-b48e-a61bd593fa5d.c000.snappy.parquet")
shopping_1 = spark.read.format('parquet').option('header', True).load("shopping/Promo_Code_Used=No/part-00000-3134cf2a-b59d-4159-b48e-a61bd593fa5d.c000.snappy.parquet")

Shopping_No.createOrReplaceTempView("emr")
shopping_1.createOrReplaceTempView("ems")

emc_df = spark.sql("SELECT * FROM emr UNION ALL SELECT * FROM ems")
emc_df.createOrReplaceTempView("emc_No")

In [110]:
# goal1 1.### Analyze the dataset to gain insights into specific customer demographics,Discount Applied,Frequency of purchase
spark.sql("""
    SELECT 
        Age,
        Gender,
        LAG(Gender) OVER (ORDER BY Gender) AS previous_row,
       Discount_Applied - LAG(Gender) OVER (ORDER BY Gender) AS difference
    FROM emc_No
    WHERE Age = 24
""").show()


+---+------+------------+----------+
|Age|Gender|previous_row|difference|
+---+------+------------+----------+
| 24|Female|        NULL|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
| 24|Female|      Female|      NULL|
+---+------+------------+----------+
only showing top 20 rows



In [77]:
#Goal_2 ### Exploring the factors contributing to discount applied by customer age .
spark.sql("""
    SELECT 
        Age,
        Discount_Applied,
        Frequency_of_Purchases,
        LAG(Discount_Applied) OVER (ORDER BY Discount_Applied) AS previous_row,
       Discount_Applied - LAG(Discount_Applied) OVER (ORDER BY Discount_Applied) AS difference
    FROM emc_No
    WHERE Age = 22
""").show()


+---+----------------+----------------------+------------+----------+
|Age|Discount_Applied|Frequency_of_Purchases|previous_row|difference|
+---+----------------+----------------------+------------+----------+
| 22|              No|              Annually|        NULL|      NULL|
| 22|              No|                Weekly|          No|      NULL|
| 22|              No|              Annually|          No|      NULL|
| 22|              No|               Monthly|          No|      NULL|
| 22|              No|             Bi-Weekly|          No|      NULL|
| 22|              No|              Annually|          No|      NULL|
| 22|              No|           Fortnightly|          No|      NULL|
| 22|              No|           Fortnightly|          No|      NULL|
| 22|              No|               Monthly|          No|      NULL|
| 22|              No|        Every 3 Months|          No|      NULL|
| 22|              No|              Annually|          No|      NULL|
| 22|              N

In [78]:
spark.sql("select * from rfa where age = 22 order by Discount_Applied").show()

+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+------------+----+------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|Customer_ID|Promo_Code_Used|Purchase_Amount_(USD)|Age|Gender|Item_Purchased|   Category|Discount_Applied|    Location|Size| Color|Season|Review_Rating|Subscription_Status|Payment_Method| Shipping_Type|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|      Ingestion_Date|
+-----------+---------------+---------------------+---+------+--------------+-----------+----------------+------------+----+------+------+-------------+-------------------+--------------+--------------+------------------+------------------------+----------------------+--------------------+
|       1687|             No|                 NULL| 22|  Male|        Gloves|Accessories|              No|    Illinois|   L|   

In [81]:
mrf = spark.sql("SELECT DISTINCT Age FROM rfa WHERE Discount_Applied = 'yes'")
mrf.createOrReplaceTempView('Applied')

In [83]:
mpf = spark.sql("SELECT DISTINCT Age FROM rfa WHERE Discount_Applied = 'No'")
mpf.createOrReplaceTempView('Not_Applied')

In [85]:
spark.sql("""
    SELECT Age FROM Applied
    INTERSECT
    SELECT Age FROM `Not_Applied`
""").show()

+---+
|Age|
+---+
+---+



In [93]:
# Goal 3 Analyze the size of the customer based upon the category.
spark.sql("select count(*) from rfa where Category = 'Clothing'").show()
spark.sql("SELECT * FROM rfa WHERE Category = 'Clothing' And Size = 's' LIMIT 75").show()
spark.sql("SELECT * FROM rfa WHERE Category = 'Clothing' AND Size = 'M' LIMIT 75").show()


+--------+
|count(1)|
+--------+
|    1737|
+--------+

+-----------+---------------+---------------------+---+------+--------------+--------+----------------+--------+----+-----+------+-------------+-------------------+--------------+-------------+------------------+------------------------+----------------------+--------------+
|Customer_ID|Promo_Code_Used|Purchase_Amount_(USD)|Age|Gender|Item_Purchased|Category|Discount_Applied|Location|Size|Color|Season|Review_Rating|Subscription_Status|Payment_Method|Shipping_Type|Previous_Purchases|Preferred_Payment_Method|Frequency_of_Purchases|Ingestion_Date|
+-----------+---------------+---------------------+---+------+--------------+--------+----------------+--------+----+-----+------+-------------+-------------------+--------------+-------------+------------------+------------------------+----------------------+--------------+
+-----------+---------------+---------------------+---+------+--------------+--------+----------------+--------+----

In [96]:
#Goal 4. Analyzing the Previous Purchases based upon the customer age groups
spark.sql("SELECT Category, Age, SUM(Previous_Purchases) AS Sales FROM rfa GROUP BY Category, Age ORDER BY Category, Age").show(100)


+-----------+---+-----+
|   Category|Age|Sales|
+-----------+---+-----+
|Accessories| 18|  387|
|Accessories| 19|  682|
|Accessories| 20|  369|
|Accessories| 21|  454|
|Accessories| 22|  625|
|Accessories| 23|  563|
|Accessories| 24|  527|
|Accessories| 25|  806|
|Accessories| 26|  609|
|Accessories| 27|  689|
|Accessories| 28|  613|
|Accessories| 29|  662|
|Accessories| 30|  664|
|Accessories| 31|  488|
|Accessories| 32|  682|
|Accessories| 33|  455|
|Accessories| 34|  600|
|Accessories| 35|  706|
|Accessories| 36|  644|
|Accessories| 37|  608|
|Accessories| 38|  512|
|Accessories| 39|  775|
|Accessories| 40|  517|
|Accessories| 41|  699|
|Accessories| 42|  596|
|Accessories| 43|  512|
|Accessories| 44|  657|
|Accessories| 45|  574|
|Accessories| 46|  435|
|Accessories| 47|  494|
|Accessories| 48|  665|
|Accessories| 49|  427|
|Accessories| 50|  512|
|Accessories| 51|  546|
|Accessories| 52|  491|
|Accessories| 53|  583|
|Accessories| 54|  822|
|Accessories| 55|  648|
|Accessories| 56

In [102]:
# Goal 5 Analyze review ratings and subscription status
spark.sql("""
    SELECT
        Review_Rating,
        Subscription_Status,
        COUNT(*) AS count_Review_Rating
    FROM
        rfa
    GROUP BY
        Review_Rating,
        Subscription_Status
    ORDER BY
        Review_Rating,
        Subscription_Status
""").show()

+-------------+-------------------+-------------------+
|Review_Rating|Subscription_Status|count_Review_Rating|
+-------------+-------------------+-------------------+
|          2.5|                 No|                 47|
|          2.5|                Yes|                 19|
|          2.6|                 No|                111|
|          2.6|                Yes|                 48|
|          2.7|                 No|                116|
|          2.7|                Yes|                 38|
|          2.8|                 No|                 97|
|          2.8|                Yes|                 39|
|          2.9|                 No|                129|
|          2.9|                Yes|                 41|
|          3.0|                 No|                118|
|          3.0|                Yes|                 44|
|          3.1|                 No|                119|
|          3.1|                Yes|                 38|
|          3.2|                 No|             

In [104]:
#Goal 6	Analyze the frequency of customer purchases.
spark.sql("""
    SELECT
        Frequency_of_Purchases,
        COUNT(*) AS count_Customers
    FROM
        rfa
    GROUP BY
        Frequency_of_Purchases
    ORDER BY
        Frequency_of_Purchases
""").show()

+----------------------+---------------+
|Frequency_of_Purchases|count_Customers|
+----------------------+---------------+
|              Annually|            572|
|             Bi-Weekly|            547|
|        Every 3 Months|            584|
|           Fortnightly|            542|
|               Monthly|            553|
|             Quarterly|            563|
|                Weekly|            539|
+----------------------+---------------+

