# Data Cleaning and Analysis

The stored parquet files for each cateogry (review and meta) will be accessed and merged. Following the merge they will be cleaned and the consolidated into one large dataset

In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkContext 
import pyspark
from datasets import load_from_disk
from pyspark.sql.functions import col, length, trim, when, lit, from_json, split, size, from_unixtime, year, count, avg, countDistinct, first
from pyspark.sql.types import StructType, StructField, StringType
from functools import reduce
from pyspark.sql import DataFrame
import matplotlib.pyplot as plt
from pyspark.sql.functions import isnan
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.recommendation import ALS
import random
from pyspark.ml.clustering import KMeans

In [2]:
findspark.init()

In [3]:
spark = SparkSession.builder \
.master("local[*]") \
.appName("Amazon Reviews") \
.config("spark.driver.memory", "14g") \
.config("spark.executor.memory", "14g") \
.config("spark.local.dir", "D:/BigData/spark_temp") \
.getOrCreate()

In [4]:
all_categories_cleaned = []

In [5]:
categories = [ 
    "All_Beauty", "Amazon_Fashion", "Appliances", "Arts_Crafts_and_Sewing", "Automotive",
    "Baby_Products", "Beauty_and_Personal_Care", "Books", "CDs_and_Vinyl",
    "Cell_Phones_and_Accessories", "Clothing_Shoes_and_Jewelry", "Digital_Music", "Electronics",
    "Gift_Cards", "Grocery_and_Gourmet_Food", "Handmade_Products", "Health_and_Household",
    "Health_and_Personal_Care", "Home_and_Kitchen", "Industrial_and_Scientific", "Kindle_Store",
    "Magazine_Subscriptions", "Movies_and_TV", "Musical_Instruments", "Office_Products",
    "Patio_Lawn_and_Garden", "Pet_Supplies", "Software", "Sports_and_Outdoors",
    "Subscription_Boxes", "Tools_and_Home_Improvement", "Toys_and_Games", "Video_Games", "Unknown"
]

### Function: Assigning the Brand

This function will parse the 'details' column for brand or will otherwise extract the brand name from the non-null value in the 'store' column.

In [6]:
def set_brand(df):
    details_schema = StructType([
        StructField("Brand", StringType(), True)
    ])

    df = df.withColumn("details_parsed", from_json(col("details"), details_schema))

    df = df.withColumn(
        "brand",
        when(
            col("details_parsed.Brand").isNotNull() & (trim(col("details_parsed.Brand")) != ""),
            trim(col("details_parsed.Brand"))
        ).when(
            col("store").isNotNull() & (trim(col("store")) != ""),
            trim(col("store"))
        ).otherwise(lit("Unknown"))
    )

    df = df.drop("details_parsed")

    return df

### Function: Cleaning the Dataset

This function will combine all the necessary cleaning tasks. It also includes extracting the brand which has its own function call. After the review and meta dataset is merged for each category, this function will be called on the dataset.

In [7]:
def clean_dataset(df):
    col("rating").isin([1.0, 2.0, 3.0, 4.0, 5.0])
    df = df.filter((col("text").isNotNull()) & (length(trim(col("text"))) > 0))
    df = set_brand(df)
    df = df.dropDuplicates(["user_id", "asin", "text"])
    df = df.withColumn(
    "review_length", 
    size(split(col("text"), r"\s+"))
    )
    df = df.withColumn(
    "year", 
    when(
        col("timestamp").isNotNull(), 
        year(from_unixtime(col("timestamp") / 1000))
    ).otherwise(None)
    )
    return df

In [8]:
for category in categories:
    try:
        review_path = f"D:/BigData/review_parquet_{category}"
        meta_path = f"D:/BigData/meta_parquet_{category}"

        review_df = spark.read.parquet(review_path)
        meta_df = spark.read.parquet(meta_path)

        review_df = review_df.withColumnRenamed("images", "review_images")
        review_df = review_df.withColumnRenamed("title", "review_title")
        meta_df = meta_df.withColumnRenamed("images", "meta_images")
        meta_df = meta_df.withColumnRenamed("title", "meta_title")

        merged_df = review_df.join(meta_df, on="parent_asin", how="inner")

        cleaned_df = clean_dataset(merged_df)
        all_categories_cleaned.append(cleaned_df)

        print(f"{category}: cleaned successfully")

    except Exception as e:
        print(f"{category}: failed with error - {e}")

All_Beauty: cleaned successfully
Amazon_Fashion: cleaned successfully
Appliances: cleaned successfully
Arts_Crafts_and_Sewing: cleaned successfully
Automotive: cleaned successfully
Baby_Products: cleaned successfully
Beauty_and_Personal_Care: cleaned successfully
Books: cleaned successfully
CDs_and_Vinyl: cleaned successfully
Cell_Phones_and_Accessories: cleaned successfully
Clothing_Shoes_and_Jewelry: cleaned successfully
Digital_Music: cleaned successfully
Electronics: cleaned successfully
Gift_Cards: cleaned successfully
Grocery_and_Gourmet_Food: cleaned successfully
Handmade_Products: cleaned successfully
Health_and_Household: cleaned successfully
Health_and_Personal_Care: cleaned successfully
Home_and_Kitchen: cleaned successfully
Industrial_and_Scientific: cleaned successfully
Kindle_Store: cleaned successfully
Magazine_Subscriptions: cleaned successfully
Movies_and_TV: cleaned successfully
Musical_Instruments: cleaned successfully
Office_Products: cleaned successfully
Patio_Lawn

In [9]:
consolidated_df = reduce(DataFrame.unionByName, all_categories_cleaned)
consolidated_df

DataFrame[parent_asin: string, rating: double, review_title: string, text: string, review_images: array<struct<attachment_type:string,large_image_url:string,medium_image_url:string,small_image_url:string>>, asin: string, user_id: string, timestamp: bigint, helpful_vote: bigint, verified_purchase: boolean, main_category: string, meta_title: string, average_rating: double, rating_number: bigint, features: array<string>, description: array<string>, price: string, meta_images: struct<hi_res:array<string>,large:array<string>,thumb:array<string>,variant:array<string>>, videos: struct<title:array<string>,url:array<string>,user_id:array<string>>, store: string, categories: array<string>, details: string, bought_together: string, subtitle: string, author: string, brand: string, review_length: int, year: int]

In [10]:
consolidated_df.printSchema()

root
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- review_images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- main_category: string (nullable = true)
 |-- meta_title: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |

In [None]:
categories = consolidated_df.select("main_category").distinct().rdd.flatMap(lambda x: x).collect()

fractions = {cat: 0.1 for cat in categories}

sampled_df = consolidated_df.stat.sampleBy("main_category", fractions, seed=42)

## Exploratory Data Analysis (EDA)


Rating Histogram for ratings 1–5

In [None]:
rating_histogram = (
    consolidated_df
    .filter(col("rating").between(1, 5))
    .groupBy("rating")
    .count()
    .orderBy("rating")
)

rating_histogram.show()

Bar chart of categories by total review count

In [None]:
category_counts = (
    consolidated_df
    .filter(col("main_category").isNotNull())
    .groupBy("main_category")
    .count()
    .orderBy("count", ascending=False)
)

category_counts.show(10)

Bar chart of brand by total review count excluding “Unknown” from the top 10

In [None]:
brand_counts = (
    consolidated_df
    .filter((col("brand").isNotNull()) & (col("brand") != "Unknown"))
    .groupBy("brand")
    .count()
    .orderBy("count", ascending=False)
)

brand_counts.show(10)

Time-Based Trend: a line chart of average star rating per year

In [None]:
avg_rating_by_year = (
    consolidated_df
    .filter(col("year").isNotNull() & col("rating").isNotNull())
    .groupBy("year")
    .agg(avg("rating").alias("avg_rating"))
    .orderBy("year")
)

avg_rating_by_year.show()

Pearson correlation between review length and star rating

In [None]:
correlation = consolidated_df.stat.corr("review_length", "rating")
print(f"Pearson correlation between review length and rating: {correlation:.4f}")

##  Binary Sentiment Prediction (Logistic Regression)

A label column will be created to represent positive (1) if rating > 3, otherwise negative (0)

In [12]:
labeled_df = consolidated_df.withColumn("label", when(col("rating") > 3, 1).otherwise(0)) \
                     .select("text", "label") \
                     .filter(col("text").isNotNull())

Train/Test Split: 80/20 split, random shuffling

In [13]:
train_df, test_df = labeled_df.randomSplit([0.8, 0.2], seed=42)

TF-IDF on review text (lowercase, split on whitespace/punctuation), discarding tokens in fewer than 5 reviews or in over 80% of reviews

In [14]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")

stop_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="raw_features",
                                   minDF=5.0, maxDF=0.8)

idf = IDF(inputCol="raw_features", outputCol="features")

Classifier: Logistic Regression (default hyperparameters)

In [15]:
lr = LogisticRegression(featuresCol="features", labelCol="label")

In [None]:
pipeline = Pipeline(stages=[regex_tokenizer, stop_remover, count_vectorizer, idf, lr])
model = pipeline.fit(train_df)

Evaluation

In [None]:
predictions = model.transform(test_df)

evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

preds_rdd = predictions.select("prediction", "label").rdd.map(tuple)
metrics = MulticlassMetrics(preds_rdd)

conf_matrix = metrics.confusionMatrix().toArray()
TP = int(conf_matrix[1][1])
FP = int(conf_matrix[0][1])
TN = int(conf_matrix[0][0])
FN = int(conf_matrix[1][0])

In [None]:
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print("Confusion Matrix:")
print(f"TP: {TP}, FP: {FP}, TN: {TN}, FN: {FN}")

## Recommender (ALS)
Creating a collaborative filtering model using Alternating Least Squares (ALS)

Data Setup: Retain (user id, product id,rating). Drop users with fewer than 5 total reviews

In [None]:
user_review_counts = consolidated_df.groupBy("user_id").agg(count("rating").alias("review_count"))
filtered_users = user_review_counts.filter("review_count >= 5").select("user_id")

als_data = consolidated_df.select("user_id", "asin", "rating") \
                   .join(filtered_users, on="user_id", how="inner") \
                   .filter(col("rating").isNotNull())

In [None]:
user_indexer = StringIndexer(inputCol="user_id", outputCol="userIndex").fit(als_data)
item_indexer = StringIndexer(inputCol="asin", outputCol="itemIndex").fit(als_data)

train_indexed = user_indexer.transform(train_df)
train_indexed = item_indexer.transform(train_indexed)

test_indexed = user_indexer.transform(test_df)
test_indexed = item_indexer.transform(test_indexed)

als = ALS(userCol="userIndex", itemCol="itemIndex", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)

als_model = als.fit(train_indexed)

Evaluation: RMSE on the test set (predicted rating vs. actual)

In [None]:
predictions = als_model.transform(test_indexed)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(f"RMSE on test data: {rmse:.4f}")

Demo: Show top 5 recommendations for 3 random users in the test set, including predicted ratings

In [None]:
test_users = test_indexed.select("userIndex").distinct().rdd.map(lambda r: r[0]).collect()
sample_users = random.sample(test_users, 3)

user_subset = spark.createDataFrame([(uid,) for uid in sample_users], ["userIndex"])
user_recs = als_model.recommendForUserSubset(user_subset, 5)

user_recs.show(truncate=False)

## Clustering / Segmentation (k-means)
Segmentation of products using k-means with k = 5

Features (per product): (mean rating, total reviews, brand id, category id)
- mean rating: Average user rating per product (based on your merged data, not necessarily average rating from metadata, though you could compare them.)
- total reviews: Count of all reviews for that product
- brand id: Map each distinct brand string to an integer
- category id: Map each main category or top-level category string to an integer

In [11]:
product_features = consolidated_df.groupBy("asin").agg(
    avg("rating").alias("mean_rating"),
    count("rating").alias("total_reviews"),
    countDistinct("user_id").alias("unique_users"),
    avg("average_rating").alias("avg_meta_rating"),
    count("review_title").alias("title_count"),
    first("brand").alias("brand"),
    first("main_category").alias("main_category")
).filter(col("mean_rating").isNotNull())

In [None]:
brand_indexer = StringIndexer(inputCol="brand", outputCol="brand_id", handleInvalid="keep").fit(product_features)
category_indexer = StringIndexer(inputCol="main_category", outputCol="category_id", handleInvalid="keep").fit(product_features)

indexed_df = brand_indexer.transform(product_features)
indexed_df = category_indexer.transform(indexed_df)

In [None]:
assembler = VectorAssembler(
    inputCols=["mean_rating", "total_reviews", "brand_id", "category_id"],
    outputCol="features"
)
feature_df = assembler.transform(indexed_df)

k-means: Exactly k = 5, default initialization, until convergence.

In [None]:
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=5, seed=42)
kmeans_model = kmeans.fit(feature_df)
clustered_df = kmeans_model.transform(feature_df)

Cluster Analysis (for each cluster)
- Size: number of products in the cluster
- Average mean rating, average total reviews
- Average brand id and category id
- A short interpretation (e.g., high-rating electronics, unknown-brand items, etc.)

In [None]:
cluster_summary = clustered_df.groupBy("cluster").agg(
    count("asin").alias("num_products"),
    avg("mean_rating").alias("avg_rating"),
    avg("total_reviews").alias("avg_reviews"),
    avg("brand_id").alias("avg_brand_id"),
    avg("category_id").alias("avg_category_id")
).orderBy("cluster")

cluster_summary.show(truncate=False)