# Project - Product analysis in Spark.
# Deadline - 8th of February, 2025

## Name -

## Objective
The objective of this project is to analyse various datasets in spark about commercial products and rating of e-commerce website NewChic.com.
## Overview of the datasets
For this project there are three csv files one for each product ( Shoe products, jewelry products and accessories).The datasets are available in the zip folder with th project.I advise you have the notebook and all the csv files in the same directory, so that it is easier when you are loading the csv files to pass in the csv files directly since they will be in the same directory as the notebook.<br>
There are 22 features in each dataset. The main features are
- Category ( men/accessory/jewelry/etc)
- Subcategory type of catgeory
- name of the product
- current price ==> price listed on the website  
- raw price ==> total price of the product before any discounts(i.e original price of the product)
- discount
- Currency ( currency of which the price listed in)
- likes count ==> popularity of the product
- isnew ==> tells whether the product is used or not (Binary value true or false)


# Tasks -

In [108]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [109]:
# Initialize SparkSession
spark = SparkSession.builder.appName("ProductAnalysis").getOrCreate()

In [None]:
# CSV Paths
accessory_path = "hdfs://namenode-master:8020/accessories.csv"
jewelry_path = "hdfs://namenode-master:8020/jewelry.csv"
shoe_path = "hdfs://namenode-master:8020/shoes.csv"
# Load Datasets
shoe_df = spark.read.options(header=True, inferSchema=True).csv(accessory_path)
jewelry_df = spark.read.options(header=True, inferSchema=True).csv(jewelry_path)
accessories_df = spark.read.options(header=True, inferSchema=True).csv(shoe_path)

                                                                                

In [None]:
# Print Schema
accessories_df.printSchema()
jewelry_df.printSchema()
shoe_df.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)

root
 |-- category: string (nullable = true)
 |-- subca

In [None]:
all_products_df = shoe_df.union(jewelry_df).union(accessories_df)
all_products_df.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



## Transformation of the data -  building more features to help analyse each product

### First create functions which would take the dataframe as its input and return a new dataframe contianing the following columns.
1. Average price of each category
2. Average price of each subcategory
3. Average discount of each category
4. Average discount of each subcategory
5. Average likes of each category
6. Average likes of each subcategory
7. sum of the unused products in each category (tip - first convert the feature isnew to numeric to be able to get the sum of the unused products)
8. sum of the unused products in subcategory


In [None]:
def add_category_averages(df):
    """Adds average price, discount, and likes for each category."""

    category_stats = df.groupBy("category").agg(
        avg("current_price").alias("avg_price_category"),
        avg("discount").alias("avg_discount_category"),
        avg("likes_count").alias("avg_likes_category")
    )

    df_with_category_avgs = df.join(category_stats, "category", "left")
    return df_with_category_avgs

In [None]:
def add_subcategory_averages(df):
    """Adds average price, discount, and likes for each subcategory."""

    subcategory_stats = df.groupBy("subcategory").agg(
        avg("current_price").alias("avg_price_subcategory"),
        avg("discount").alias("avg_discount_subcategory"),
        avg("likes_count").alias("avg_likes_subcategory")
    )

    df_with_subcategory_avgs = df.join(subcategory_stats, "subcategory", "left")
    return df_with_subcategory_avgs

In [None]:
def add_unused_product_sums(df):
    """Adds the sum of unused products for each category and subcategory."""

    # Convert 'isnew' to numerical (1 for True, 0 for False)
    df = df.withColumn("isnew_numeric", when(df["is_new"] == True, 1).otherwise(0)) # 0 for used, 1 for unused.

    category_unused = df.groupBy("category").agg(sum("isnew_numeric").alias("sum_unused_category"))
    subcategory_unused = df.groupBy("subcategory").agg(sum("isnew_numeric").alias("sum_unused_subcategory"))

    df_with_unused_sums = df.join(category_unused, "category", "left").join(subcategory_unused, "subcategory", "left")
    return df_with_unused_sums

## Call these functions that were created to create the new columns.

In [None]:
# Apply the transformations
all_products_df = add_category_averages(all_products_df)
all_products_df = add_subcategory_averages(all_products_df)
new_all_products_df = add_unused_product_sums(all_products_df)

## Print the schema of the dataframe to check the new columns created

In [None]:
new_all_products_df.printSchema()

root
 |-- subcategory: string (nullable = true)
 |-- category: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- avg_price_category: double (nullable = true)
 |-- a

## Analysis - Utilising the newly created features to answer some questions<br>For this taks use the dataframe API to answer the following questions

### Which type of product is the least popular with the users

In [None]:
least_popular_type_df = new_all_products_df.groupBy("category").agg(sum("likes_count").alias("total_likes")).orderBy("total_likes")
least_popular_type = least_popular_type_df.first()
print(f"The least popular product type is: {least_popular_type['category']}")
least_popular_type_df.show()

                                                                                

The least popular product type is: accessories




+-----------+-----------+
|   category|total_likes|
+-----------+-----------+
|accessories|     613522|
|    jewelry|     806306|
|      shoes|    3358818|
+-----------+-----------+



                                                                                

### Which type of product is the most popular with the users

In [None]:
most_popular_type_df = new_all_products_df.groupBy("category").agg(sum("likes_count").alias("total_likes")).orderBy(desc("total_likes"))
most_popular_type = most_popular_type_df.first()
print(f"The most popular product type is: {most_popular_type['category']}")
most_popular_type_df.show()

                                                                                

The most popular product type is: shoes




+-----------+-----------+
|   category|total_likes|
+-----------+-----------+
|      shoes|    3358818|
|    jewelry|     806306|
|accessories|     613522|
+-----------+-----------+



                                                                                

### Which subcategory product is the most popular with the users

In [None]:
most_popular_sub_type_df = new_all_products_df.groupBy("category", "subcategory").agg(sum("likes_count").alias("total_likes")).orderBy(desc("total_likes"))
most_popular_sub_type = most_popular_sub_type_df.first()
print(f"The most popular sub product type is: {most_popular_sub_type['subcategory']}")
most_popular_sub_type_df.show()

                                                                                

The most popular sub product type is: Derbies & Mocassins




+-----------+--------------------+-----------+
|   category|         subcategory|total_likes|
+-----------+--------------------+-----------+
|      shoes| Derbies & Mocassins|     794351|
|      shoes|   Bottes & Bottines|     686245|
|      shoes|    Sandales & Mules|     511183|
|      shoes|           Mocassins|     335507|
|      shoes|  Sneakers & Baskets|     283253|
|    jewelry|  Boucles d'oreilles|     201918|
|      shoes|           Escarpins|     193397|
|    jewelry|            Colliers|     143060|
|      shoes|Bottes & Chaussur...|     125682|
|      shoes|             Baskets|     117481|
|accessories|          Beanie Hat|      96462|
|    jewelry|              Bagues|      88119|
|    jewelry|           Bracelets|      82264|
|      shoes| Chaussures de ville|      80466|
|      shoes|  Claquettes & Tongs|      77440|
|      shoes|            Sandales|      72548|
|accessories|Chaussettes & Col...|      70669|
|    jewelry|  Montres pour femme|      59951|
|accessories|

                                                                                

###  Which subcategory product is the least popular with the users

In [None]:
least_popular_sub_type_df = new_all_products_df.groupBy("category", "subcategory").agg(sum("likes_count").alias("total_likes")).orderBy("total_likes")
least_popular_sub_type = least_popular_sub_type_df.first()
print(f"The least popular sub product type is: {least_popular_sub_type['subcategory']}")
least_popular_sub_type_df.show()

The least popular sub product type is: Berets
+-----------+--------------------+-----------+
|   category|         subcategory|total_likes|
+-----------+--------------------+-----------+
|accessories|              Berets|         12|
|      shoes|    CHAUSSURES HOMME|         33|
|    jewelry|    Montres en bague|         44|
|accessories|        Military Hat|         45|
|accessories|   Men's Accessories|         60|
|    jewelry|          Bijouterie|         69|
|      shoes|        Plate-formes|        101|
|accessories|           Ipad Case|        166|
|accessories|Sun Protection Sl...|        224|
|accessories|           Ceintures|        266|
|      shoes|              Bottes|        338|
|      shoes|Chaussures de jeu...|        364|
|    jewelry|          Pendentifs|        418|
|accessories|              Voiles|        427|
|    jewelry|Boucles d'oreille...|        454|
|    jewelry| Mascarade & Cosplay|        539|
|    jewelry|         Accessoires|        554|
|    jewelry|P

### In the accessory products, which subcategory was the most expensive

In [None]:
most_expensive_sub_type_df = new_all_products_df.filter(col("category") == "accessories").groupBy("category", "subcategory").agg(max("current_price").alias("current_price")).orderBy(desc("current_price"))
most_expensive_sub_type = most_expensive_sub_type_df.first()
print(f"The most expensive sub product type is: {most_expensive_sub_type['subcategory']}")
most_expensive_sub_type_df.show()

                                                                                

The most expensive sub product type is: Lunettes
+-----------+--------------------+-------------+
|   category|         subcategory|current_price|
+-----------+--------------------+-------------+
|accessories|            Lunettes|        92.99|
|accessories| Lunettes de lecture|        83.05|
|accessories|         Chaussettes|        66.15|
|accessories|Ceintures & Brete...|        63.23|
|accessories|  Chapeaux & Bonnets|        62.51|
|accessories|           Flat Caps|        61.46|
|accessories|       Baseball Caps|         58.3|
|accessories| Foulards & Écharpes|        53.77|
|accessories|          Phone Case|        48.01|
|accessories|          Straw Hats|        45.27|
|accessories|          Beanie Hat|         43.5|
|accessories|          Bucket Hat|        42.42|
|accessories|Fournitures de dé...|        40.63|
|accessories|Boutons de manche...|        38.77|
|accessories|               Gants|        38.65|
|accessories|Cache-oreilles & ...|        37.51|
|accessories|       

### Which type of product had the highest discount offerings

In [None]:
highest_discount_type_df = new_all_products_df.groupBy("category").agg(max("discount").alias("discount")).orderBy(desc("discount"))
highest_discount_type = highest_discount_type_df.first()
print(f"The product type with the highest discount is: {highest_discount_type['category']}")
highest_discount_type_df.show()

The product type with the highest discount is: accessories
+-----------+--------+
|   category|discount|
+-----------+--------+
|accessories|     100|
|    jewelry|      98|
|      shoes|      90|
+-----------+--------+



### Which type of product had the lowest discount offerings

In [None]:
lowest_discount_type_df = new_all_products_df.groupBy("category").agg(min("discount").alias("discount")).orderBy("discount")
lowest_discount_type = lowest_discount_type_df.first()
print(f"The product type with the lowest discount is: {lowest_discount_type['category']}")
lowest_discount_type_df.show()

The product type with the lowest discount is: accessories
+-----------+--------+
|   category|discount|
+-----------+--------+
|accessories|       0|
|    jewelry|       0|
|      shoes|       0|
+-----------+--------+



### In the shoe's products, which subcategory was the most common.

In [None]:
most_common_sub_shoe_df = new_all_products_df.filter(col("category") == "shoes").groupBy("category", "subcategory").agg(count("*").alias("product_count")).orderBy(desc("product_count"))
most_common_sub_shoe = most_common_sub_shoe_df.first()
print(f"The most common sub product shoe is: {most_common_sub_shoe['subcategory']}")
most_common_sub_shoe_df.show()

The most common sub product shoe is: Mocassins
+--------+--------------------+-------------+
|category|         subcategory|product_count|
+--------+--------------------+-------------+
|   shoes|           Mocassins|         1703|
|   shoes| Derbies & Mocassins|         1596|
|   shoes|   Bottes & Bottines|         1515|
|   shoes|    Sandales & Mules|         1340|
|   shoes|             Baskets|         1318|
|   shoes|  Sneakers & Baskets|          966|
|   shoes|Bottes & Chaussur...|          715|
|   shoes|            Sandales|          695|
|   shoes| Chaussures de ville|          494|
|   shoes|  Claquettes & Tongs|          378|
|   shoes|           Escarpins|          372|
|   shoes|           Chaussons|          246|
|   shoes|             Slipper|          167|
|   shoes|             Sandals|           79|
|   shoes|          Plateforme|           78|
|   shoes|      Flat & Loafers|           54|
|   shoes|               Pumps|           39|
|   shoes| Chaussures de sport|  

### Which type of product had the most unused products listed for sale

In [None]:
most_unused_type_sale_df = new_all_products_df.filter(col("isnew_numeric") == 1).groupBy("category").agg(count("*").alias("unused_count")).orderBy(desc("unused_count"))
most_unused_type_sale = most_unused_type_sale_df.first()
print(f"The most unused products listed for sale is: {most_unused_type_sale['category']}")
most_unused_type_sale_df.show()

                                                                                

The most unused products listed for sale is: shoes
+-----------+------------+
|   category|unused_count|
+-----------+------------+
|      shoes|         271|
|    jewelry|         113|
|accessories|         107|
+-----------+------------+



### Which subcategory product  had the most unused products listed for sale

In [None]:
most_unused_sub_type_sale_df = new_all_products_df.filter((col("isnew_numeric") == 1)).groupBy("category", "subcategory").agg(count("*").alias("unused_count")).orderBy(desc("unused_count"))
most_unused_sub_type_sale = most_unused_sub_type_sale_df.first()
print(f"The most unused sub products listed for sale is: {most_unused_sub_type_sale['subcategory']}")
most_unused_sub_type_sale_df.show()

The most unused sub products listed for sale is: Baskets
+-----------+--------------------+------------+
|   category|         subcategory|unused_count|
+-----------+--------------------+------------+
|      shoes|             Baskets|          68|
|      shoes|  Sneakers & Baskets|          41|
|      shoes| Derbies & Mocassins|          38|
|    jewelry|            Colliers|          34|
|accessories|       Baseball Caps|          33|
|      shoes|  Claquettes & Tongs|          27|
|accessories|          Skull Caps|          25|
|accessories|           Flat Caps|          22|
|      shoes|           Mocassins|          21|
|accessories|          Bucket Hat|          20|
|      shoes|Bottes & Chaussur...|          19|
|      shoes|   Bottes & Bottines|          18|
|    jewelry|  Montres connectées|          17|
|    jewelry|              Bagues|          15|
|      shoes|    Sandales & Mules|          15|
|    jewelry|  Boucles d'oreilles|          12|
|      shoes|            Sandal

## - Partition and Load transformed datasets into HDFS.

In [None]:
import os

num_cores = os.cpu_count()
print(f"Number of cores on the local machine: {num_cores}")

Number of cores on the local machine: 8


In [None]:
## Since my machine has 8 cores, the dataframe will be partitioned into 8 partitions so that each core takes one partition to process the data efficiently

partitioned_df = new_all_products_df.repartition(8)
# Display the number of partitions
print(f"Number of partitions: {partitioned_df.rdd.getNumPartitions()}")



Number of partitions: 8


In [None]:
# Output path in HDFS for partitioned data
output_path = "hdfs://namenode-master:8020/partitioned_products" 

# Partition the DataFrame by category and write to HDFS
new_all_products_df.write.partitionBy("category").parquet(output_path)

print(f"DataFrame partitioned and written to: {output_path}")

                                                                                

DataFrame partitioned and written to: hdfs://namenode-master:8020/partitioned_products


## Read each table as a dataframe and just print the schema of each one.

In [None]:
# Load the partitioned Parquet file for shoe products
shoe_df = spark.read.parquet("hdfs://namenode-master:8020/partitioned_products/category=shoes")
print("Shoe Products Schema:")
shoe_df.printSchema()

# Load the partitioned Parquet file for jewelry products
jewelry_df = spark.read.parquet("hdfs://namenode-master:8020/partitioned_products/category=jewelry")
print("Jewelry Products Schema:")
jewelry_df.printSchema()

# Load the partitioned Parquet file for accessories products
accessories_df = spark.read.parquet("hdfs://namenode-master:8020/partitioned_products/category=accessories")
print("Accessories Products Schema:")
accessories_df.printSchema()

Shoe Products Schema:
root
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- avg_price_category: double (nullable = true)
 |-- avg_discount_catego

In [None]:
spark.stop()