In [1]:
# pip install pyspark --default-timeout=100

In [1]:
import os
import subprocess
import sys

In [3]:
''' Code to set JAVA HOME and Create Environment for Spark Session
os.environ["JAVA_HOME"] = "C:/Users/student/Desktop/Lillian Charles/Y3 S2/COMP 3610 - Big Data/Assignments/Java/jdk8u452-b09"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
'''

In [5]:
#Code to Check Java Setup
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))

try:
    result = subprocess.run(["java", "-version"], capture_output=True, text=True)
    print(result.stderr)
except Exception as e:
    print("Error running java:", e)

JAVA_HOME: C:/Users/student/Desktop/Lillian Charles/Y3 S2/COMP 3610 - Big Data/Assignments/Java/jdk8u452-b09
openjdk version "1.8.0_452"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_452-b09)
OpenJDK 64-Bit Server VM (Temurin)(build 25.452-b09, mixed mode)



In [None]:
#Creating the Spark Session
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName("Amazon Review Project").config("spark.driver.memory", "12g").getOrCreate()

In [None]:
from datasets import load_from_disk, concatenate_datasets
from pyspark.sql.functions import col, udf, size, split, to_timestamp, year
from pyspark.sql.types import StringType
import os
import pandas as pd

In [7]:
# Function to convert rows in the data to String. Prevents inference errors
import json

def stringify_fields(rows, fields):
    for row in rows:
        for field in fields:
            if field in row and row[field] is not None:
                try:
                    row[field] = json.dumps(row[field])
                except:
                    row[field] = str(row[field])
    return rows

In [13]:
# Function to remove empyt columns from the dataset
def remove_empty_columns(rows):
    if not rows:
        return rows
    keys = list(rows[0].keys())
    for key in keys:
        if all(row.get(key) is None for row in rows):
            for row in rows:
                row.pop(key, None)
    return rows

In [9]:
from pyspark.sql.types import *

#Function to automatically set the schema for the datasets.
def infer_spark_schema(sample_row):
    type_mapping = {
        str: StringType(),
        int: IntegerType(),
        float: FloatType(),
        bool: BooleanType(),
        type(None): StringType()  # Treat None as StringType
    }

    fields = []
    for key, value in sample_row.items():
        if isinstance(value, (dict, list)):
            fields.append(StructField(key, StringType(), True))  # stringify complex stuff
        else:
            spark_type = type_mapping.get(type(value), StringType())
            fields.append(StructField(key, spark_type, True))
    
    return StructType(fields)

In [11]:
# Function to load the categories into the program and execute data cleaning
def load_and_process_categories(base_path, category):

    print(f"⚙️ Loading category {category}...")
    
    review_path = os.path.join(base_path, f"raw_review_{category}")
    meta_path = os.path.join(base_path, f"raw_meta_{category}")

    review_data = load_from_disk(review_path)["full"]
    meta_data = load_from_disk(meta_path)["full"]
    
    review_rows = review_data.to_list()
    meta_rows = meta_data.to_list()

   # Auto schema: meta
    meta_schema = infer_spark_schema(meta_rows[0])

    # Stringify problematic fields
    problem_fields = ["images", "features", "description", "specs", "feature_vectors"]
    review_rows = stringify_fields(review_rows, problem_fields)
    meta_rows = stringify_fields(meta_rows, problem_fields)

    review_rows = remove_empty_columns(review_rows)
    meta_rows = remove_empty_columns(meta_rows)

    # Create Spark DataFrames safely
    try:
        review_schema = infer_spark_schema(review_rows[0])
        review_df = spark.createDataFrame(review_rows, schema=review_schema)
    except Exception as e:
        print(f"⚠️ Could not infer schema for review in {category}: {e}. Using Spark's automatic inference.")
        review_df = spark.createDataFrame(review_rows)
    
    meta_df = spark.createDataFrame(meta_rows, schema=meta_schema)


    for col_name in ["images", "title"]:
        if col_name in meta_df.columns:
            meta_df = meta_df.withColumnRenamed(col_name, f"{col_name}_meta")
        
    # Merge on parent_asin
    merged_df = review_df.join(meta_df, on="parent_asin", how="left")

    #Drop rows where rating is missing or not in [1–5], drop rows if text (the review body) is empty or null
    merged_df = merged_df.filter((col("rating").between(1, 5) & (col("Text").isNotNull()) & (col("text") != "")))

    def get_brand(details, store):
        if isinstance(details, dict) and "brand" in details:
            return details["brand"]
        return store if store else "Unknown"

    brand_udf = udf(get_brand, StringType())
    merged_df = merged_df.withColumn("brand", brand_udf(col("details"), col("store")))

    # Remove duplicates
    merged_df = merged_df.dropDuplicates(["user_id", "asin", "text"])

    # Derived columns
    merged_df = merged_df.withColumn("review_length",size(split(col("text"), "")))
    merged_df = merged_df.withColumn("timestamp", to_timestamp((col("timestamp")/1000).cast("long")))
    merged_df = merged_df.withColumn("year", year(col("timestamp")))

    return merged_df

In [13]:
from bigdata_a3_utils import VALID_CATEGORIES

# Path where files are stored
base_path = "./review_files"
all_dfs = [] # array to hold the processed and loaded categories

# Code to split the categories into 5 batches for processing
batch_size = 5
category_batches = [VALID_CATEGORIES[i:i + batch_size] for i in range(0, len(VALID_CATEGORIES), batch_size)]

In [45]:
# Categories already processed
processed_categories = set([cat for cat, _ in all_dfs])

# loop to process categories
for category in category_batches:
        
    if category in processed_categories:
        print(f"⏭️ Skipping {category}, already processed.")
        continue
            
    try:
        df = load_and_process_categories(base_path, category)
    
        all_dfs.append((category, df))
    
        print(f"✅ {category} addedd to all_dfs")
    
        del df
    
    except Exception as e:
        print(f"❌ Failed on {category}: {e}")

⚙️ Loading category All_Beauty...
⚠️ Could not infer schema for review in All_Beauty: [VALUE_OUT_OF_BOUND] Value for `obj` must be greater than 2147483647 or less than -2147483648, got 1588687728923. Using Spark's automatic inference.
✅ All_Beauty addedd to all_dfs
⚙️ Loading category Amazon_Fashion...
⚠️ Could not infer schema for review in Amazon_Fashion: [VALUE_OUT_OF_BOUND] Value for `obj` must be greater than 2147483647 or less than -2147483648, got 1578528394489. Using Spark's automatic inference.
✅ Amazon_Fashion addedd to all_dfs
⚙️ Loading category Appliances...
⚠️ Could not infer schema for review in Appliances: [VALUE_OUT_OF_BOUND] Value for `obj` must be greater than 2147483647 or less than -2147483648, got 1519317108692. Using Spark's automatic inference.
✅ Appliances addedd to all_dfs
⚙️ Loading category Arts_Crafts_and_Sewing...
⚠️ Could not infer schema for review in Arts_Crafts_and_Sewing: [VALUE_OUT_OF_BOUND] Value for `obj` must be greater than 2147483647 or less tha

# Unified Output

In [47]:
# Extract the DataFrames from the (category, df) tuples
dfs_only = [df for _, df in all_dfs]

# Start with the first DataFrame
merged_df = dfs_only[0]

# Union the rest of them
for df in dfs_only[1:]:
    merged_df = merged_df.unionByName(df, allowMissingColumns=True)

In [51]:
merged_df.printSchema()

root
 |-- parent_asin: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- main_category: string (nullable = true)
 |-- title_meta: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- rating_number: integer (nullable = true)
 |-- features: string (nullable = true)
 |-- description: string (nullable = true)
 |-- price: string (nullable = true)
 |-- images_meta: string (nullable = true)
 |-- videos: string (nullable = true)
 |-- store: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- details: string (nullable = true)
 |-- bought_together: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- aut

In [None]:
merged_df.show(5)

# Exploratory Data Analysis (EDA)

In [59]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import string

In [None]:
#Star Rating Histogram
filtered_ratings = merged_df.filter((col("rating") >= 1) & (col("rating") <= 5))

rating_counts = filtered_ratings.groupBy("rating").count().orderBy("rating")

rating_counts_pd = rating_counts.limit(10).toPandas()

# Plotting the histogram
plt.figure(figsize=(8, 6))
plt.bar(rating_counts_pd['rating'], rating_counts_pd['count'], color='skyblue', edgecolor='black')
plt.title('Histogram of Star Ratings (1-5)', fontsize=14)
plt.xlabel('Rating', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.xticks(range(1, 6))  # Set x-ticks to be the star ratings 1-5
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
#Bar chart showing top 10 categories
category_counts = merged_df.groupBy("category").count().orderBy("count", ascending=False)

top_10_categories = category_counts.limit(10)

top_10_categories_pd = top_10_categories.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(top_10_categories_pd['category'], top_10_categories_pd['count'], color='orange', edgecolor='black')
plt.title('Top 10 Categories by Total Review Count', fontsize=14)
plt.xlabel('Category', fontsize=12)
plt.ylabel('Total Review Count', fontsize=12)
plt.xticks(rotation=45, ha='right')  # Rotate category names for better readability
plt.grid(True, alpha=0.3)
plt.show()


In [None]:
#Bar chart showing top 10 brands (excluding "Unknown")
filtered_df = merged_df.filter(merged_df["brand"] != "Unknown")

brand_counts = filtered_df.groupBy("brand").count().orderBy("count", ascending=False)

top_10_brands = brand_counts.limit(10)

top_10_brands_pd = top_10_brands.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(top_10_brands_pd['brand'], top_10_brands_pd['count'], color='blue', edgecolor='black')
plt.title('Top 10 Brands by Total Review Count (Excluding "Unknown")', fontsize=14)
plt.xlabel('Brand', fontsize=12)
plt.ylabel('Total Review Count', fontsize=12)
plt.xticks(rotation=45, ha='right')  # Rotate brand names for better readability
plt.grid(True, alpha=0.3)
plt.show()


In [None]:
#Line chart of average star rating per year
from pyspark.sql import functions as F

time_based_trend = (
    merged_df
    .filter(col("rating").isNotNull())  # Filter out null ratings
    .groupBy("year")
    .agg(F.avg("rating").alias("avg_rating"))
    .orderBy("year")
)

time_based_trend_pd = time_based_trend.toPandas()

# Step 3: Plot the time-based trend
plt.figure(figsize=(10, 6))
plt.plot(time_based_trend_pd['year'], time_based_trend_pd['avg_rating'], marker='o', color='b', linestyle='-', linewidth=2, markersize=6)
plt.title('Average Star Rating Per Year')
plt.xlabel('Year')
plt.ylabel('Average Star Rating')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
#Calculate Pearson correlation
from pyspark.sql.functions import length, col

merged_df_with_length = merged_df.withColumn('review_length', length(col('text')))

correlation = merged_df_with_length.stat.corr('review_length', 'rating')

print(f"Pearson Correlation between review length and star rating: {correlation}")

# Binary Sentiment Prediction (Logistic Regression)

In [68]:
from pyspark.sql.functions import when, col

# Create binary sentiment label: 1 = positive, 0 = negative
binary_df = df.withColumn(
    "label",
    when(col("rating") >= 4, 1).otherwise(0)
).select("label", "text")

In [70]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tf = HashingTF(inputCol="filtered", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

In [72]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

In [74]:
pipeline = Pipeline(stages=[tokenizer, remover, tf, idf, lr])

In [None]:
train_data, test_data = binary_df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_data)
predictions = model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"✅ AUC: {roc_auc:.4f}")

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

f1_score = evaluator_f1.evaluate(predictions)
print(f"F1 Score: {f1_score:.4f}")

In [None]:
confusion_df = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
confusion_df.show()

# Recommender System (ALS)

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
ratings_df = df.select("user_id", "asin", "rating") \
    .dropna() \
    .withColumn("user_id", df["user_id"].cast("string")) \
    .withColumn("asin", df["asin"].cast("string")) \
    .withColumn("rating", df["rating"].cast("float"))

In [None]:
# Converting string IDs to numeric
from pyspark.ml.feature import StringIndexer
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index").fit(ratings_df)
item_indexer = StringIndexer(inputCol="asin", outputCol="item_index").fit(ratings_df)

indexed = user_indexer.transform(ratings_df)
indexed = item_indexer.transform(indexed)

In [None]:
# ALS model
als = ALS(
    userCol="user_index",
    itemCol="item_index",
    ratingCol="rating",
    coldStartStrategy="drop",  # drops rows with NaN predictions
    implicitPrefs=False
)

model = als.fit(indexed)

In [None]:
# Evaluate
predictions = model.transform(indexed)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(f"RMSE: {rmse:.4f}")

# Clustering

In [None]:
from pyspark.sql.functions import avg, count
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

user_features = df.groupBy("user_id").agg(
    count("asin").alias("review_count"),
    avg("rating").alias("avg_rating"),
    avg("review_length").alias("avg_review_length")
).dropna()

assembler = VectorAssembler(
    inputCols=["review_count", "avg_rating", "avg_review_length"],
    outputCol="features"
)

feature_df = assembler.transform(user_features)

In [None]:
kmeans = KMeans(k=4, seed=42)
model = kmeans.fit(feature_df)

clusters = model.transform(feature_df)
clusters.select("user_id", "prediction").show(10)

In [None]:
from pyspark.sql.functions import avg, count
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans

product_stats = merged_df.groupBy("asin").agg(
    avg("rating").alias("mean_rating"),
    count("rating").alias("total_reviews")
)


product_meta = merged_df.select("asin", "brand", "main_category").dropDuplicates(["asin"])


brand_indexer = StringIndexer(inputCol="brand", outputCol="brand_id")
product_meta = brand_indexer.fit(product_meta).transform(product_meta)


category_indexer = StringIndexer(inputCol="main_category", outputCol="category_id")
product_meta = category_indexer.fit(product_meta).transform(product_meta)


product_features = product_stats.join(product_meta.select("asin", "brand_id", "category_id"), on="asin", how="left")


assembler = VectorAssembler(
    inputCols=["mean_rating", "total_reviews", "brand_id", "category_id"],
    outputCol="features"
)

final_df = assembler.transform(product_features)

final_df.select("asin", "mean_rating", "total_reviews", "brand_id", "category_id", "features").show(10, truncate=False)

In [None]:
kmeans = KMeans(featuresCol="features", k=5, seed=1)
model = kmeans.fit(final_df)

clustered = model.transform(final_df)
clustered.select("asin", "prediction").show()