In [1]:
from pyspark.sql import SparkSession

In [2]:
#creating the spark session
spark = SparkSession.builder.appName("AmazonApperalAnalysis").getOrCreate() 

24/11/18 15:47:41 WARN Utils: Your hostname, Mahajans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.250.5.24 instead (on interface en0)
24/11/18 15:47:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/18 15:47:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#now loading the dataset into the DataFrame
data = spark.read.csv("/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/amazon_reviews_us_Apparel_v1_00.tsv", sep="\t", header=True, inferSchema=True)

                                                                                

In [4]:
data.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [5]:
data.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|Easy Tool Stainle...|         Apparel|          4|            0|          0|   N|                Y|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|V28 Women Cowl Ne...|         Apparel|          5|    

In [6]:
#counting the total rows
total_rows = data.count()
print(f"Total Rows: {total_rows}")



Total Rows: 5906333


                                                                                

In [7]:
#checking the missing values
data.select([((data[col].isNull()).cast("int").alias(col)) for col in ["customer_id", "product_id", "star_rating"]]).summary("count").show()

                                                                                

+-------+-----------+----------+-----------+
|summary|customer_id|product_id|star_rating|
+-------+-----------+----------+-----------+
|  count|    5906333|   5906333|    5906333|
+-------+-----------+----------+-----------+



In [10]:
#cleaning the data
cleaned_data = data.dropna(subset=["customer_id", "product_id", "star_rating"])

In [11]:
cleaned_rows = cleaned_data.count()
print(f"Rows after cleaning: {cleaned_rows}")



Rows after cleaning: 5906322


                                                                                

In [12]:
#creating new features and updating the csv
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Step 1: Creating Spark session and load dataset
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
data = spark.read.csv("/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/amazon_reviews_us_Apparel_v1_00.tsv", sep="\t", header=True, inferSchema=True)

# Step 2: Adding demographic_group column
data = data.withColumn("demographic_group",
    when(col("product_title").rlike("Men|Men's"), "Men")
    .when(col("product_title").rlike("Women|Women's"), "Women")
    .when(col("product_title").rlike("Kids|Children"), "Children")
    .otherwise("Uncategorized")
)

# Step 3: Adding apparel_section column
data = data.withColumn("apparel_section",
    when(col("product_title").rlike("Shirt|Outerwear"), "Shirts/Outerwear")
    .when(col("product_title").rlike("Pants|Jeans"), "Pants/Bottoms")
    .when(col("product_title").rlike("Footwear|Shoes"), "Footwear")
    .when(col("product_title").rlike("Accessories|Hats"), "Accessories")
    .otherwise("Other")
)

# Step 4: Saving the updated dataset as a new CSV file
output_path = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/updated_amazon_apparel.csv"
data.write.csv(output_path, header=True)

print(f"Updated dataset saved at: {output_path}")

24/11/18 15:59:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

Updated dataset saved at: /Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/updated_amazon_apparel.csv


                                                                                

In [13]:
# Combine all partitions into one
data.coalesce(1).write.csv("/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/single_updated_amazon_apparel.csv", header=True)

print("Combined single CSV saved!")

[Stage 15:>                                                         (0 + 1) / 1]

Combined single CSV saved!


                                                                                

In [14]:
from pyspark.sql.functions import split, explode, lower, col
from pyspark.ml.feature import StopWordsRemover
import pandas as pd

# Tokenize product titles into words
data = data.withColumn("words", split(lower(col("product_title")), "\\s+"))

# Explode the list of words into individual rows
words_df = data.select(explode(col("words")).alias("word"))

# Remove duplicates and filter out stopwords
stopwords = StopWordsRemover().getStopWords()
unique_words = words_df.filter(~col("word").isin(stopwords)).distinct()

# Show sample unique words
unique_words.show(20, truncate=False)

# Save unique words to a text file
words_list = unique_words.collect()  # Collect as a list
with open("unique_words.txt", "w") as f:
    for row in words_list:
        f.write(row["word"] + "\n")

print("Unique words saved to unique_words.txt")

                                                                                

+--------------+
|word          |
+--------------+
|unlined       |
|pant          |
|travel        |
|jewelry       |
|newlywed      |
|tmnt          |
|lady's        |
|outfit        |
|art           |
|hope          |
|activewear    |
|gandalf's     |
|(sports       |
|minions'      |
|3x            |
|garters       |
|(xxl)         |
|“imagine”     |
|anime         |
|pain/recovery,|
+--------------+
only showing top 20 rows



                                                                                

Unique words saved to unique_words.txt


In [16]:
# Load the file
with open("unique_words.txt", "r") as file:
    words = [line.strip() for line in file]

# Initialize empty lists for categories
demographics_keywords = []
apparel_keywords = []
irrelevant_keywords = []

# Define demographics patterns (e.g., genders, age groups)
demographics_patterns = ["men", "women", "ladies", "kids", "children", "girls", "boys", "baby", "infant"]

# Define apparel-related patterns
apparel_patterns = ["shirt", "pants", "jeans", "jacket", "shoes", "dress", "skirt", "t-shirt", "sweater"]

# Categorize words
for word in words:
    word_lower = word.lower()
    if any(pattern in word_lower for pattern in demographics_patterns):
        demographics_keywords.append(word)
    elif any(pattern in word_lower for pattern in apparel_patterns):
        apparel_keywords.append(word)
    else:
        irrelevant_keywords.append(word)

# Save categorized words into separate files
with open("refined_demographics.txt", "w") as f:
    for word in demographics_keywords:
        f.write(word + "\n")

with open("refined_apparel_sections.txt", "w") as f:
    for word in apparel_keywords:
        f.write(word + "\n")

with open("irrelevant_keywords.txt", "w") as f:
    for word in irrelevant_keywords:
        f.write(word + "\n")

print("Categorization complete: refined_demographics.txt, refined_apparel_sections.txt, irrelevant_keywords.txt created.")

Categorization complete: refined_demographics.txt, refined_apparel_sections.txt, irrelevant_keywords.txt created.


In [20]:
# Split keywords into smaller chunks
def chunk_keywords(keywords, chunk_size):
    for i in range(0, len(keywords), chunk_size):
        yield keywords[i:i + chunk_size]

# Apply chunked matching
def match_patterns(column, keywords):
    chunks = list(chunk_keywords(keywords, 100))  # Split into chunks of 100
    conditions = [col(column).rlike("|".join(chunk)) for chunk in chunks]
    return when(conditions[0], lit("Matched")).otherwise("Other")

# Apply chunked logic for demographics
data = data.withColumn(
    "demographic_group",
    match_patterns("product_title", demographics_keywords)
)

# Apply chunked logic for apparel sections
data = data.withColumn(
    "apparel_section",
    match_patterns("product_title", apparel_keywords)
)

In [24]:
# Replace `column_name` with the actual column you want to process
data = data.withColumn("product_title", regexp_replace(col("product_title"), r"\(|\)", ""))

In [25]:
data.printSchema()


root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- demographic_group: string (nullable = false)
 |-- apparel_section: string (nullable = false)



In [26]:
from pyspark.sql.functions import regexp_replace

# Example for multiple columns
columns_to_process = ["product_title", "review_body"]
for col_name in columns_to_process:
    if col_name in data.columns:
        data = data.withColumn(col_name, regexp_replace(col(col_name), r"\(|\)", ""))

In [27]:
print(data.columns)


['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date', 'demographic_group', 'apparel_section']


In [28]:
from pyspark.sql.functions import regexp_replace, col

# Correct column name
data = data.withColumn("product_title", regexp_replace(col("product_title"), r"\(|\)", ""))

In [29]:
from pyspark.sql.functions import trim, lower
data = data.withColumn("product_title", trim(lower(col("product_title"))))
data = data.withColumn("review_body", trim(lower(col("review_body"))))

In [30]:
#EDA
data.groupBy("star_rating").count().show()



+-----------+-------+
|star_rating|  count|
+-----------+-------+
|          1| 445456|
|          3| 623471|
|          5|3320557|
|          4|1147237|
|          2| 369601|
|       NULL|     11|
+-----------+-------+



                                                                                

In [31]:
from pyspark.sql.functions import col

# Select relevant columns
final_data = data.select(
    col("customer_id").cast("int"),
    col("product_id").cast("int"),
    col("star_rating").cast("float")
).dropna()

In [32]:
final_data.show(5)
final_data.printSchema()



+-----------+----------+-----------+
|customer_id|product_id|star_rating|
+-----------+----------+-----------+
|   45924188|1933000716|        1.0|
|   41244251|1933000716|        5.0|
|   49704575|1848572972|        2.0|
|   40440645|1616593989|        5.0|
|     288347|1608325229|        3.0|
+-----------+----------+-----------+
only showing top 5 rows

root
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- star_rating: float (nullable = true)



                                                                                

In [34]:
# Load the keywords into lists
with open("refined_demographics.txt", "r") as file:
    demographics_keywords = file.read().splitlines()

with open("refined_apparel_sections.txt", "r") as file:
    apparel_keywords = file.read().splitlines()

with open("irrelevant_keywords.txt", "r") as file:
    irrelevant_keywords = file.read().splitlines()

# Check the content of the lists
print(demographics_keywords[:5], apparel_keywords[:5], irrelevant_keywords[:5])

['baby-g', 'girls(s)', 'moolecole-men', '(women)', 'women,long'] ['skirt,medium,coral', 'tee-shirt', 'dress(drs-max,blua2-2x)', 'skirt(btm-skt,lor-l)', 'skirt-sk0062'] ['unlined', 'pant', 'travel', 'jewelry', 'newlywed']


In [36]:
from pyspark.sql.functions import col, when, lower
import re

# Escape special characters in keywords
demographics_keywords = [re.escape(k) for k in demographics_keywords]
apparel_keywords = [re.escape(k) for k in apparel_keywords]
irrelevant_keywords = [re.escape(k) for k in irrelevant_keywords]

# Join keywords with '|'
demographics_condition = "|".join(demographics_keywords)
apparel_condition = "|".join(apparel_keywords)
irrelevant_condition = "|".join(irrelevant_keywords)

# Add a category column
data = data.withColumn(
    "category",
    when(col("product_title").rlike(demographics_condition), "Demographics")
    .when(col("product_title").rlike(apparel_condition), "Apparel")
    .when(col("product_title").rlike(irrelevant_condition), "Irrelevant")
    .otherwise("Other")
)


In [37]:
data.select("product_title", "category").show(20, truncate=False)

24/11/18 16:50:41 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


+----------------------------------------------------------------------------------+------------+
|product_title                                                                     |category    |
+----------------------------------------------------------------------------------+------------+
|easy tool stainless steel fruit pineapple corer slicer peeler cut one size, sliver|Irrelevant  |
|v28 women cowl neck knit stretchable elasticity long sleeve slim fit sweater dress|Demographics|
|james fiallo men's 12-pairs low cut athletic sport socks                          |Demographics|
|belfry gangster 100% wool stain-resistant crushable dress fedora in 4 colors      |Apparel     |
|jaeden women's beaded spaghetti straps sexy long formal prom evening dresses      |Demographics|
|levi's boys' 514 straight fit jeans                                               |Demographics|
|minimalist wallet & credit card holder men with slim design by raw                |Demographics|
|harriton men's barb

                                                                                

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, rlike
import re

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Categorized Dataset Generation") \
    .getOrCreate()

# Load your dataset (adjust path as necessary)
data = spark.read.csv("/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/single_updated_amazon_apparel.csv/part-00000-7965547f-5f28-490d-8d29-eb821c97d76d-c000.csv", header=True, inferSchema=True)

# Define keywords for each category
demographics_keywords = [
    "baby-g", "girls(s)", "momen", "baby-doll", "ladies", "toddler", "women's", "men’s"
    # Add more keywords for Demographics
]

apparel_keywords = [
    "shirt", "t-shirt", "jeans", "jacket", "dress", "skirt", "pants", "shorts"
    # Add more keywords for Apparel
]

irrelevant_keywords = [
    "travel", "jewelry", "newlywed", "toy", "book"
    # Add more keywords for Irrelevant
]

# Escape special characters in keywords to make them safe for regex
demographics_keywords = [re.escape(keyword) for keyword in demographics_keywords]
apparel_keywords = [re.escape(keyword) for keyword in apparel_keywords]
irrelevant_keywords = [re.escape(keyword) for keyword in irrelevant_keywords]

# Create regex patterns for each category
demographics_condition = "|".join(demographics_keywords)
apparel_condition = "|".join(apparel_keywords)
irrelevant_condition = "|".join(irrelevant_keywords)

# Add a new column for category based on conditions
data = data.withColumn(
    "category",
    when(col("product_title").rlike(demographics_condition), lit("Demographics"))
    .when(col("product_title").rlike(apparel_condition), lit("Apparel"))
    .when(col("product_title").rlike(irrelevant_condition), lit("Irrelevant"))
    .otherwise(lit("Other"))
)

# Show a sample of the categorized data
data.select("product_title", "category").show(20, truncate=False)

# Save the categorized dataset to a CSV file
data.write.csv("categorized_dataset.csv", header=True, mode="overwrite")

                                                                                

+------------------------------------------------------------------------------------+--------+
|product_title                                                                       |category|
+------------------------------------------------------------------------------------+--------+
|Easy Tool Stainless Steel Fruit Pineapple Corer Slicer Peeler Cut (One size, sliver)|Other   |
|V28 Women Cowl Neck Knit Stretchable Elasticity Long Sleeve Slim Fit Sweater Dress  |Other   |
|James Fiallo Men's 12-Pairs Low Cut Athletic Sport Socks                            |Other   |
|Belfry Gangster 100% Wool Stain-Resistant Crushable Dress Fedora in 4 Colors        |Other   |
|JAEDEN Women's Beaded Spaghetti Straps Sexy Long Formal Prom Evening Dresses        |Other   |
|Levi's Boys' 514 Straight Fit Jeans                                                 |Other   |
|Minimalist Wallet & Credit Card Holder Men with Slim Design by Raw                  |Other   |
|Harriton Men's Barbados Textured Camp S

                                                                                

In [42]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Combine CSV Files") \
    .getOrCreate()

# Define the folder path where all CSV chunks are saved
folder_path = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/categorized_dataset.csv"

# Read all CSV files in the folder into a single DataFrame
combined_data = spark.read.csv(folder_path, header=True, inferSchema=True)

# Show the combined data
combined_data.show(20, truncate=False)

# Save the combined DataFrame as a single CSV file
combined_data.write.csv("combined_dataset.csv", header=True, mode="overwrite")

                                                                                

+-----------+-----------+--------------+----------+--------------+-------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+----------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [44]:
import os

write_path = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/"
if not os.path.exists(write_path):
    os.makedirs(write_path)

In [45]:
import shutil
import glob

output_path = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/combined_dataset"
final_file = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/final_combined_dataset.csv"

# Get the part file from the directory
part_file = glob.glob(output_path + "/part-*")[0]

# Rename part file to desired output file
shutil.move(part_file, final_file)

# Remove the directory created by PySpark
shutil.rmtree(output_path)

print(f"Combined CSV saved as {final_file}")


IndexError: list index out of range

In [47]:
import os

output_path = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/combined_dataset.csv"
print("Directory exists:", os.path.exists(output_path))
print("Files in directory:", os.listdir(output_path))


Directory exists: True
Files in directory: ['.part-00010-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', 'part-00012-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', 'part-00009-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', '.part-00011-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', 'part-00005-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', 'part-00013-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', '.part-00013-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', '.part-00008-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', '.part-00012-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', '.part-00009-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', 'part-00008-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', 'part-00004-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', '.part-00005-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv.crc', 'part-00010-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', '._SUCCESS.crc', 'part-00007-efb30cd2-8173-4e1a-991a-27e7f87ba8a1-c000.csv', '.pa

In [48]:
# Combine all partitions into one
data.coalesce(1).write.csv(
    "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/combined_dataset.csv",
    header=True,
    mode="overwrite"
)

print("Combined single CSV saved successfully!")


[Stage 49:>                                                         (0 + 1) / 1]

Combined single CSV saved successfully!


                                                                                

In [49]:
import glob

# List all part files in the directory
part_files = glob.glob(output_path + "/part-*")
print("Part files found:", part_files)

# Check if any part files are found
if len(part_files) == 0:
    raise FileNotFoundError("No part-* files found in the specified output directory.")

Part files found: ['/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/combined_dataset.csv/part-00000-1a4d93bd-865a-4f44-bda2-63018020065b-c000.csv']


In [50]:
import shutil

final_file = "/Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/final_combined_dataset.csv"

# Rename the first part file to the final combined file name
shutil.move(part_files[0], final_file)

# Remove the output directory
shutil.rmtree(output_path)

print(f"Combined CSV saved as {final_file}")

Combined CSV saved as /Users/pallavmahajan/Downloads/DATA-228_Big_Data/Project/final_combined_dataset.csv


In [51]:
data = spark.read.csv("final_combined_dataset.csv", header=True, inferSchema=True)
data.printSchema()
data.show(5)

                                                                                

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- demographic_group: string (nullable = true)
 |-- apparel_section: string (nullable = true)
 |-- category: string (nullable = true)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+------------

In [52]:
apparel_data = data.filter(data["category"] == "Apparel")

In [53]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="product_id", outputCol="product_index")
apparel_data = indexer.fit(apparel_data).transform(apparel_data)

                                                                                

In [54]:
# Split the data into training (80%) and testing (20%) datasets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

print(f"Training data count: {train_data.count()}")
print(f"Testing data count: {test_data.count()}")

                                                                                

Training data count: 4725721




Testing data count: 1180612


                                                                                