In [1]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d1393c6af37bc524723cee6423bbdccd4cfd279b2290ebafde685194866aafd1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


![9 different shoes](https://image-cdn.hypb.st/https%3A%2F%2Fhypebeast.com%2Fwp-content%2Fblogs.dir%2F6%2Ffiles%2F2019%2F07%2Fsneaker-birthstones-nike-adidas-gucci-balenciaga-01-1.jpg?cbr=1&q=90)

# **Data Manipulation of Shoe Prices**

This project is divided into two parts: **Data Manipulation using Resilient Distributed Datasets (RDD)** and **Data  Manipulation using DataFrame.**

By the end of this project, I aim to answer the following questions by running SQL query:
1. What is the average price of different types of shoes?
2. What is the most expensive model per type of shoe?
3. How does the shoe material affect the price?


### **Part 1: Data Manipulation using an RDD**

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("ShoePricesApp") \
    .getOrCreate()

data_rdd = spark.sparkContext.textFile('shoe-prices.csv')
print(f'The number of rows in the RDD is: {data_rdd.count()}')

The number of rows in the RDD is: 1007


In [3]:
print('These are the first 20 rows of the dataset:')
data_rdd.take(20)

These are the first 20 rows of the dataset:


['Brand,Model,Type,Gender,Size,Color,Material,Price (USD)',
 'Nike,Air Jordan 1,Basketball,Men,US 10,Red/Black,Leather,$170.00 ',
 'Adidas,Ultra Boost 21,Running,Men,US 9.5,Black,Primeknit,$180.00 ',
 'Reebok,Classic Leather,Casual,Men,US 11,White,Leather,$75.00 ',
 'Converse,Chuck Taylor,Casual,Women,US 8,Navy,Canvas,$55.00 ',
 'Puma,Future Rider,Lifestyle,Women,US 7.5,Pink,Mesh,$80.00 ',
 'Vans,Old Skool,Skate,Men,US 8.5,Black/White,Suede/Canvas,$65.00 ',
 'New Balance,990v5,Running,Women,US 6.5,Grey,Suede/Mesh,$175.00 ',
 'Asics,Gel-Kayano 28,Running,Men,US 10.5,Blue,Mesh,$160.00 ',
 'Fila,Disruptor II,Fashion,Women,US 9,White,Leather/Synthetic,$65.00 ',
 'Skechers,Go Run Razor 3,Running,Men,US 9,Black,Mesh,$110.00 ',
 'Nike,Air Max 270,Running,Women,US 7,Pink,Mesh,$150.00 ',
 'Adidas,Stan Smith,Casual,Men,US 9,White/Green,Leather,$80.00 ',
 'Reebok,Club C 85,Casual,Women,US 8.5,Pink,Leather,$70.00 ',
 'Converse,One Star,Skate,Men,US 10,Black,Suede,$75.00 ',
 'Puma,RS-Fast,Lifestyle

In [4]:
# Function to parse the CSV data
def parse_csv(row):
    return row.split(',')

# Parse the data
parsed_data_rdd = data_rdd.map(parse_csv)

# Filter out the header row
header = parsed_data_rdd.first()
data_no_header_rdd = parsed_data_rdd.filter(lambda row: row != header)
print('Without the header, this is now the first row:')
data_no_header_rdd.first()

Without the header, this is now the first row:


['Nike',
 'Air Jordan 1',
 'Basketball',
 'Men',
 'US 10',
 'Red/Black',
 'Leather',
 '$170.00 ']

In [5]:
# Function to clean each row
def clean_row(row):
    row[4] = row[4].replace("US ", "") # Remove "US" from the Size column
    row[7] = row[7].replace("$", "").strip() # Remove "$" and any whitespace from the Price column
    return row

# Apply the clean_row function to each row
cleaned_data_rdd = data_no_header_rdd.map(clean_row)

# Show the first few rows of the cleaned data
cleaned_data_rdd.take(3)

[['Nike',
  'Air Jordan 1',
  'Basketball',
  'Men',
  '10',
  'Red/Black',
  'Leather',
  '170.00'],
 ['Adidas',
  'Ultra Boost 21',
  'Running',
  'Men',
  '9.5',
  'Black',
  'Primeknit',
  '180.00'],
 ['Reebok',
  'Classic Leather',
  'Casual',
  'Men',
  '11',
  'White',
  'Leather',
  '75.00']]

In [6]:
# Group by Brand
def to_key_value(row):
    return (row[0], row)  # Use the brand as the key

brand_rdd = cleaned_data_rdd.map(to_key_value)

# Sort brands alphabetically before grouping (optional)
sorted_brand_rdd = brand_rdd.sortByKey()

# Group by the sorted brands
grouped_by_brand_rdd = sorted_brand_rdd.groupByKey()

# Show the grouped data for a few brands
for brand, rows in grouped_by_brand_rdd.collect():
    print(f"Brand: {brand}")
    for row in rows:
        print(row)
    print()

Brand: Adidas
['Adidas', 'Ultra Boost 21', 'Running', 'Men', '9.5', 'Black', 'Primeknit', '180.00']
['Adidas', 'Stan Smith', 'Casual', 'Men', '9', 'White/Green', 'Leather', '80.00']
['Adidas', 'NMD_R1', 'Running', 'Women', '8.5', 'Black/Pink', 'Primeknit', '140.00']
['Adidas', 'Adizero Adios 5', 'Running', 'Women', '7.5', 'Blue', 'Mesh', '170.00']
['Adidas', 'NMD_R1', 'Running', 'Women', '9', 'Black', 'Knit', '140.00']
['Adidas', 'Adilette Cloudfoam', 'Slides', 'Women', '6', 'Black', 'Synthetic', '35.00']
['Adidas', 'NMD_R1', 'Lifestyle', 'Women', '7', 'Pink', 'Primeknit', '130.00']
['Adidas', 'Yeezy Boost 350', 'Lifestyle', 'Men', '10', 'Black', 'Primeknit', '220.00']
['Adidas', 'NMD_R1', 'Lifestyle', 'Men', '8.5', 'Black', 'Primeknit', '140.00']
['Adidas', 'Harden Stepback', 'Basketball', 'Men', '12', 'Grey', 'Synthetic', '80.00']
['Adidas', 'Superstar', 'Casual', 'Men', '8.5', 'Black/White', 'Leather', '80.00']
['Adidas', 'Ultraboost 5.0 DNA', 'Running', 'Women', '8.5', 'Grey', 'Pri

In [7]:
# Sort the grouped data by price in descending order
sorted_grouped_by_price_rdd = grouped_by_brand_rdd.mapValues(lambda rows: sorted(rows, key=lambda x: float(x[7]), reverse=True))

# Show the sorted grouped data for a few brands
for brand, rows in sorted_grouped_by_price_rdd.collect():
    print(f"Brand: {brand}")
    for row in rows:
        print(row)
    print()

Brand: Adidas
['Adidas', 'Yeezy Boost 350', 'Lifestyle', 'Men', '10', 'Black', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Fashion', 'Men', '9', 'Zebra', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Fashion', 'Women', '8.5', 'Cream', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350', 'Lifestyle', 'Women', '6.5', 'Cream', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Fashion', 'Women', '8.5', 'Cream White', 'Primeknit/Synthetic', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Fashion', 'Women', '7', 'Cream', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Lifestyle', 'Men', '10', 'Cream', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Lifestyle', 'Women', '6.5', 'Cloud White', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Fashion', 'Women', '7.5', 'Cream White', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350', 'Fashion', 'Women', '8', 'Cream White', 'Primeknit', '220.00']
['Adidas', 'Yeezy Boost 350 V2', 'Lifestyle', '

### **Part 2: Data Manipulation using DataFrame**

In [8]:
# Load the dataset into dataframe
df = spark.read.csv('shoe-prices.csv', header=True, inferSchema=True)

# Count the number of rows
row_count = df.count()
print(f'The initial number of rows in the DataFrame: {row_count}')

# Show the first few rows of the DataFrame
df.show()

The initial number of rows in the DataFrame: 1006
+-----------+------------------+----------+------+-------+-----------+-----------------+-----------+
|      Brand|             Model|      Type|Gender|   Size|      Color|         Material|Price (USD)|
+-----------+------------------+----------+------+-------+-----------+-----------------+-----------+
|       Nike|      Air Jordan 1|Basketball|   Men|  US 10|  Red/Black|          Leather|   $170.00 |
|     Adidas|    Ultra Boost 21|   Running|   Men| US 9.5|      Black|        Primeknit|   $180.00 |
|     Reebok|   Classic Leather|    Casual|   Men|  US 11|      White|          Leather|    $75.00 |
|   Converse|      Chuck Taylor|    Casual| Women|   US 8|       Navy|           Canvas|    $55.00 |
|       Puma|      Future Rider| Lifestyle| Women| US 7.5|       Pink|             Mesh|    $80.00 |
|       Vans|         Old Skool|     Skate|   Men| US 8.5|Black/White|     Suede/Canvas|    $65.00 |
|New Balance|             990v5|   Runnin

In [9]:
from pyspark.sql.functions import regexp_replace, col

# Remove "US" from the Size column
df_cleaned = df.withColumn('Size', regexp_replace(col('Size'), 'US ', ''))

# Remove the dollar sign and any extra spaces from the Price (USD) column, then convert to float
df_cleaned = df_cleaned.withColumn('Price (USD)', regexp_replace(col('Price (USD)'), '[$,]', '').cast('float'))

# Rename the Size column to Size (US)
df_cleaned = df_cleaned.withColumnRenamed('Size', 'Size (US)')

# Clean the Type column by replacing "CrossFit" with "Crossfit"
df_cleaned = df_cleaned.withColumn('Type', regexp_replace(col('Type'), 'CrossFit', 'Crossfit'))

# Show the cleaned DataFrame
print('This is the cleaned dataframe')
df_cleaned.show()

This is the cleaned dataframe
+-----------+------------------+----------+------+---------+-----------+-----------------+-----------+
|      Brand|             Model|      Type|Gender|Size (US)|      Color|         Material|Price (USD)|
+-----------+------------------+----------+------+---------+-----------+-----------------+-----------+
|       Nike|      Air Jordan 1|Basketball|   Men|       10|  Red/Black|          Leather|      170.0|
|     Adidas|    Ultra Boost 21|   Running|   Men|      9.5|      Black|        Primeknit|      180.0|
|     Reebok|   Classic Leather|    Casual|   Men|       11|      White|          Leather|       75.0|
|   Converse|      Chuck Taylor|    Casual| Women|        8|       Navy|           Canvas|       55.0|
|       Puma|      Future Rider| Lifestyle| Women|      7.5|       Pink|             Mesh|       80.0|
|       Vans|         Old Skool|     Skate|   Men|      8.5|Black/White|     Suede/Canvas|       65.0|
|New Balance|             990v5|   Running|

In [10]:
# Remove duplicates
df_cleaned = df_cleaned.dropDuplicates()

# Count and show the cleaned DataFrame without duplicates
row_count_cleaned = df_cleaned.count()
print(f'The number of rows in the DataFrame without duplicates: {row_count_cleaned}')
df_cleaned.show()

The number of rows in the DataFrame without duplicates: 925
+-----------+------------------+--------+------+---------+-----------+--------------+-----------+
|      Brand|             Model|    Type|Gender|Size (US)|      Color|      Material|Price (USD)|
+-----------+------------------+--------+------+---------+-----------+--------------+-----------+
|New Balance|   Fresh Foam More| Running| Women|        9|Grey/Purple|          Mesh|      165.0|
|      Asics|   Gel-Quantum 180| Running| Women|      7.5|       Grey|          Mesh|      120.0|
|   Converse|          Chuck 70|  Casual| Women|      7.5|      Egret|        Canvas|       85.0|
|     Reebok|Floatride Energy 3| Running|   Men|      8.5|White/Black|Knit/Synthetic|      100.0|
|       Nike|React Infinity Run| Running|   Men|      9.5| Blue/Green|          Mesh|      160.0|
|       Nike|      Air Max 2090| Running|   Men|     10.5|      Green|          Mesh|      150.0|
|     Reebok|            Nano 9|Crossfit|   Men|      9.5|

In [11]:
# Group by 'Brand' and count the number of shoe models for each brand
brand_model_count = df_cleaned.groupBy("Brand").count()

# Sort the result by count in descending order
brand_model_count_sorted = brand_model_count.orderBy("Count", ascending=False)

# Show the result
brand_model_count_sorted.show()

+-----------+-----+
|      Brand|count|
+-----------+-----+
|New Balance|   98|
|       Fila|   97|
|       Nike|   95|
|      Asics|   95|
|       Puma|   94|
|     Reebok|   94|
|     Adidas|   94|
|   Skechers|   87|
|       Vans|   87|
|   Converse|   84|
+-----------+-----+



### **Running SQL query to answer questions**

In [12]:
# Register the DataFrame as a temporary SQL table
df_cleaned.createOrReplaceTempView("shoes")

# Run an SQL query to find the average price of different types of shoes
average_price_by_type = spark.sql("""
    SELECT Type, ROUND(AVG(`Price (USD)`), 2) as Average_Price_USD
    FROM shoes
    GROUP BY Type
    ORDER BY Average_Price_USD DESC""")

# Show the results
average_price_by_type.show()

+--------------+-----------------+
|          Type|Average_Price_USD|
+--------------+-----------------+
| Weightlifting|            187.5|
|      Crossfit|            130.0|
|Cross-training|            130.0|
|       Running|           129.11|
|     Lifestyle|           122.71|
|    Basketball|           114.88|
| Trail Running|           113.33|
|      Training|           111.67|
|        Racing|            110.0|
|         Trail|            99.17|
|         Retro|             90.0|
|       Fashion|            84.11|
|        Casual|            80.07|
|        Hiking|             77.5|
|       Walking|            65.09|
|         Skate|            62.36|
|        Slides|            31.67|
+--------------+-----------------+



In [13]:
# Run an SQL query to find the top 1 most expensive model per type of shoe
top_1_model_per_type = spark.sql("""
  SELECT Type, Brand, Model, `Price (USD)`
  FROM (
    SELECT Type, Brand, Model, `Price (USD)`, ROW_NUMBER() OVER(PARTITION BY Type ORDER BY `Price (USD)` DESC) as Rank
    FROM shoes
  ) tmp
  WHERE Rank = 1
  ORDER BY Type """)

# Show the result
top_1_model_per_type.show(top_1_model_per_type.count(), truncate=False)

+--------------+-----------+--------------------+-----------+
|Type          |Brand      |Model               |Price (USD)|
+--------------+-----------+--------------------+-----------+
|Basketball    |Nike       |Air Jordan 1        |170.0      |
|Casual        |Adidas     |Yeezy Boost 350 V2  |220.0      |
|Cross-training|Reebok     |Nano 9              |130.0      |
|Crossfit      |Reebok     |Nano 9              |130.0      |
|Fashion       |Adidas     |Yeezy Boost 350 V2  |220.0      |
|Hiking        |Fila       |Trailblazer         |85.0       |
|Lifestyle     |Adidas     |Yeezy Boost 350 V2  |220.0      |
|Racing        |New Balance|1500v6              |110.0      |
|Retro         |New Balance|327                 |100.0      |
|Running       |Nike       |ZoomX Vaporfly Next%|250.0      |
|Skate         |Nike       |SB Dunk Low         |100.0      |
|Slides        |Adidas     |Adilette Cloudfoam  |35.0       |
|Trail         |New Balance|Fresh Foam Hierro v6|135.0      |
|Trail R

In [14]:
# Run an SQL query to analyze the relationship between shoe material and price
material_price_analysis = spark.sql("""
    SELECT Material, ROUND(AVG(`Price (USD)`), 2) AS Average_Price_USD
    FROM shoes
    GROUP BY Material
    ORDER BY Average_Price_USD DESC
""")

# Show the results
material_price_analysis.show(material_price_analysis.count())

+--------------------+-----------------+
|            Material|Average_Price_USD|
+--------------------+-----------------+
| Primeknit/Synthetic|            220.0|
|        Mesh/Leather|            180.0|
|           Primeknit|           168.73|
|             Flyknit|           158.82|
|      Mesh/Synthetic|           133.33|
|     Textile/Leather|            130.0|
|           Flexweave|            130.0|
|Flexweave/Cushioning|            130.0|
|             Textile|            130.0|
|      Flexweave/Knit|            130.0|
| Flexweave/Synthetic|            130.0|
|      Knit/Synthetic|           123.33|
|        Leather/Mesh|            120.0|
|      Synthetic/Mesh|            115.0|
|                Mesh|           112.67|
|                Knit|           112.33|
|          Mesh/Suede|            110.0|
|   Synthetic/Textile|            105.0|
|       Leather/Suede|            105.0|
|          Suede/Mesh|           103.75|
|         Suede/Nylon|             92.5|
|             Le