In [22]:
#installing pyspark
!pip install pyspark



In [23]:
#importing sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("AmazonReviewsBigData") \
.getOrCreate()

In [24]:
#Load Dataset
df = spark.read.csv("/content/drive/MyDrive/Reviews.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Go

In [25]:
# Data Cleaning
from pyspark.sql.functions import col


# Remove rows with null critical values
clean_df = df.dropna(subset=["ProductId", "UserId", "Score", "Time"])

In [26]:
#convert unix time into time
from pyspark.sql.functions import from_unixtime, year


clean_df = clean_df.withColumn("review_date", from_unixtime(col("Time")))
clean_df = clean_df.withColumn("year", year(col("review_date")))


clean_df.select("Time", "review_date", "year").show(5)

+----------+-------------------+----+
|      Time|        review_date|year|
+----------+-------------------+----+
|1303862400|2011-04-27 00:00:00|2011|
|1346976000|2012-09-07 00:00:00|2012|
|1219017600|2008-08-18 00:00:00|2008|
|1307923200|2011-06-13 00:00:00|2011|
|1350777600|2012-10-21 00:00:00|2012|
+----------+-------------------+----+
only showing top 5 rows


In [27]:
#Analysis 1: Rating Distribution
rating_dist = clean_df.groupBy("Score").count().orderBy("Score")
rating_dist.show()

+--------------+-----+
|         Score|count|
+--------------+-----+
|          ..."|   23|
|     Author"""|    3|
|   Comp sci"""|    1|
|     Critic"""|    1|
|        Dad"""|    1|
|     Dance..."|    4|
|        Design|    1|
|        Ed..."|    1|
|       Hugs"""|    1|
|     Lyme ..."|    1|
|     Medit..."|    2|
|        Moscow|    1|
| Music Fan..."|    8|
|         RN"""|   23|
|        Sm..."|    1|
|      USA ..."|    1|
|   Video Games|    1|
|         a..."|   10|
|     and F..."|    1|
| and Kitten"""|  104|
+--------------+-----+
only showing top 20 rows


In [28]:
#Analysis 2: Top 10 Products by Number of Reviews
top_products = clean_df.groupBy("ProductId") \
.count() \
.orderBy(col("count").desc()) \
.limit(10)


top_products.show()

+----------+-----+
| ProductId|count|
+----------+-----+
|B007JFMH8M|  913|
|B0026RQTGE|  632|
|B002QWP89S|  632|
|B002QWHJOU|  632|
|B002QWP8H0|  632|
|B003B3OOPA|  623|
|B001EO5Q64|  567|
|B0013NUGDE|  564|
|B007M832YY|  564|
|B0026KNQSA|  564|
+----------+-----+



In [29]:
#Analysis 3: Helpfulness Ratio
from pyspark.sql.functions import when, col

help_df = clean_df.withColumn(
    "helpfulness_ratio",
    when(
        col("HelpfulnessDenominator").cast("int") > 0,
        col("HelpfulnessNumerator").cast("double") /
        col("HelpfulnessDenominator").cast("double")
    ).otherwise(0.0)
)

help_df.select("Score", "helpfulness_ratio").show(5)


+-----+-----------------+
|Score|helpfulness_ratio|
+-----+-----------------+
|    5|              1.0|
|    1|              0.0|
|    4|              1.0|
|    2|              1.0|
|    5|              0.0|
+-----+-----------------+
only showing top 5 rows


In [30]:
#Analysis 4: Most Helpful Reviews
from pyspark.sql.functions import col, when, expr

help_df = clean_df.withColumn(
    "helpfulness_ratio",
    when(
        expr("try_cast(HelpfulnessDenominator as double)") > 0,
        expr("try_cast(HelpfulnessNumerator as double)") /
        expr("try_cast(HelpfulnessDenominator as double)")
    ).otherwise(0.0)
)


In [31]:
#Analysis 5: Reviews Over Time (Date-wise â€“ Only DMY)
from pyspark.sql.functions import col, from_unixtime, to_date, expr

clean_df = clean_df.withColumn(
    "review_dmy",
    to_date(from_unixtime(expr("try_cast(Time as long)")))
)

# Date-wise review count
datewise_reviews = clean_df.groupBy("review_dmy") \
    .count() \
    .orderBy("review_dmy")

datewise_reviews.show()


+----------+-----+
|review_dmy|count|
+----------+-----+
|      NULL|    8|
|1970-01-01| 2738|
|1999-10-08|    1|
|1999-10-25|    1|
|1999-12-02|    1|
|1999-12-06|    3|
|2000-01-03|    1|
|2000-01-09|    3|
|2000-01-19|    3|
|2000-01-24|    1|
|2000-02-26|    3|
|2000-06-03|    3|
|2000-06-23|    1|
|2000-06-29|    1|
|2000-07-31|    1|
|2000-08-09|    2|
|2000-08-15|    3|
|2000-10-03|    3|
|2000-12-05|    1|
|2000-12-19|    3|
+----------+-----+
only showing top 20 rows
