In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Amazon Reviews Analysis").getOrCreate()

# Load JSON data
df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/zainsaeed024@gmail.com/Automotive_5-1.json")

# Inspect the data
df.printSchema()
df.show(5)

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

+----------+--------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|   reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
|B00002243X|  [4, 4]|    5.0|I needed a set of...|08 17, 2011|A3F73SC1LY51OO|Alan Montgomery|Work Well - Shoul...|    1313539200|
|B00002243X|  [1, 1]|    4.0|These long cables...| 09 4, 2011|

## Basic Statistics

In [0]:
print(f"Total Reviews: {df.count()}")

Total Reviews: 20473


## Check for nulls

In [0]:
from pyspark.sql.functions import col

null_counts = df.select([col(c).isNull().alias(c) for c in df.columns]).groupBy().sum()
null_counts.show()

++
||
++
||
++



## Generic Filtering of nulls

In [0]:
clean_df = df.filter(
    col("reviewerID").isNotNull() &
    col("reviewText").isNotNull() &
    col("overall").isNotNull()
)

In [0]:
# Average ratings by product
avg_ratings = clean_df.groupBy("asin").avg("overall").orderBy("avg(overall)", ascending=False)
avg_ratings.show(10)

+----------+------------+
|      asin|avg(overall)|
+----------+------------+
|B000221M94|         5.0|
|B000CSD6RE|         5.0|
|B0000UUX34|         5.0|
|B0002UEN1A|         5.0|
|B000B8WCBG|         5.0|
|B000C6KMAK|         5.0|
|B000A0CA6W|         5.0|
|B000BYDA8E|         5.0|
|B000CQOIVE|         5.0|
|B000C5O1VW|         5.0|
+----------+------------+
only showing top 10 rows



In [0]:
# Most reviewed products
most_reviews = clean_df.groupBy("asin").count().orderBy("count", ascending=False)
most_reviews.show(10)

+----------+-----+
|      asin|count|
+----------+-----+
|B000CITK8S|  169|
|B007TG7HFO|  118|
|B001V8U12M|  111|
|B002BC4N5I|   90|
|B002OUMVWY|   82|
|B0009IQZFM|   79|
|B00068XCQU|   79|
|B000NCOKZQ|   79|
|B0014Y82UQ|   79|
|B001LHVOVK|   77|
+----------+-----+
only showing top 10 rows



In [0]:
# Sentiment classification
from pyspark.sql.functions import when

sentiment_df = clean_df.withColumn(
    "sentiment",
    when(col("reviewText").rlike("(?i)good|excellent|amazing|great|love"), "Positive")
    .when(col("reviewText").rlike("(?i)bad|terrible|poor|hate"), "Negative")
    .otherwise("Neutral")
)

sentiment_counts = sentiment_df.groupBy("sentiment").count()
sentiment_counts.show()

+---------+-----+
|sentiment|count|
+---------+-----+
| Positive| 9635|
|  Neutral|10286|
| Negative|  552|
+---------+-----+



In [0]:
helpful_df = clean_df.withColumn("helpful_votes", col("helpful")[0]).withColumn("total_votes", col("helpful")[1])
helpful_df.show(5)

+----------+--------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+-------------+-----------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|   reviewerName|             summary|unixReviewTime|helpful_votes|total_votes|
+----------+--------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+-------------+-----------+
|B00002243X|  [4, 4]|    5.0|I needed a set of...|08 17, 2011|A3F73SC1LY51OO|Alan Montgomery|Work Well - Shoul...|    1313539200|            4|          4|
|B00002243X|  [1, 1]|    4.0|These long cables...| 09 4, 2011|A20S66SKYXULG2|       alphonse|    Okay long cables|    1315094400|            1|          1|
|B00002243X|  [0, 0]|    5.0|Can't comment muc...|07 25, 2013|A2I8LFSN2IS5EO|          Chris|Looks and feels h...|    1374710400|            0|          0|
|B00002243X|[19, 19]|    5.0|I absolutley love...|12 21, 2010|A3

In [0]:
# Reviews with highest helpfullness ratio

from pyspark.sql.functions import expr

top_helpful_reviews = helpful_df.withColumn(
    "helpfulness_ratio",
    expr("helpful_votes / total_votes")
).orderBy("helpfulness_ratio", ascending=False)
top_helpful_reviews.show(10)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+-------------+-----------+-----------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|helpful_votes|total_votes|helpfulness_ratio|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+-------------+-----------+-----------------+
|B008FYKOX6|  [1, 1]|    5.0|I got the 21&#34;...| 01 5, 2013| A1E1LEVQ9VQNK|         J. Chambers|My first impressi...|    1357344000|            1|          1|              1.0|
|B0028PJ10K|  [1, 1]|    5.0|No more boards.  ...|07 11, 2013|A1MC3OZBUUYMYQ|    Andrew D Gardner|Durable and easy ...|    1373500800|            1|          1|              1.0|
|B0000AXNMO|  [2, 2]|    4.0|Although the trea...|03 31, 2014|A34KAA7V803KJA|Charlie E. "Charlie"|       