In [None]:
# Data pipeline for Amazon Customer Review Analysis

from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import to_date, date_format,col,trim,lower,regexp_replace
from pyspark.sql.functions import month, year, dayofmonth


# Task 1: Data Extraction
#  ● Source: The data is in Parquet format stored on AWS S3
#  ● Task: Extract the data using PySpark, Python, or SQL to fetch the required datasets.

# Create a Spark session with Hadoop AWS dependencies
spark = SparkSession.builder \
    .appName("ReadParquetFromS3") \
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.1,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIAWRSNL5JLEAVAYRDC")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

df = spark.read.parquet("s3a://reviewwanalysisbucket/review.parquet")
df.show()  # Display the DataFrame


+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
| 1995-06-24|         US|   53096571| RHL4UW17ZK72A| 521314925|        9.81E8|Invention and Evo...|           Books|          5|            9|          9|false|            false|      BUY THIS BOOK!|This is a beautif...|
| 1995-06-24|         US|   53096571|R34N4QWDXX58WB| 870210092|        4.43E8|Arming and Fittin...|           Books|

In [None]:
# Task 2: Data Cleaning and Preprocessing

# Remove duplicates
df = df.dropDuplicates()
df.count()

150

In [None]:
# Correct data types (e.g., ensuring review_date is in a Date format, star_rating is an integer, etc.).
df.printSchema()
df=df.withColumn("review_date",col("review_date").cast("date"))
df=df.withColumn("customer_id",col("customer_id").cast("int"))
df=df.withColumn("star_rating",col("star_rating").cast("int"))
df=df.withColumn("helpful_votes",col("helpful_votes").cast("int"))
df=df.withColumn("total_votes",col("total_votes").cast("int"))
df.printSchema()

root
 |-- review_date: string (nullable = true)
 |-- marketplace: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: double (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: long (nullable = true)
 |-- helpful_votes: long (nullable = true)
 |-- total_votes: long (nullable = true)
 |-- vine: boolean (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)

root
 |-- review_date: date (nullable = true)
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: double (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nulla

In [None]:
# Handle missing or null values by either dropping or filling them.

# Identifying the null values
df_with_missing_values = df.filter(
    reduce(lambda a, b: a | b, [col(c).isNull() for c in df.columns])
)
df_with_missing_values.show()

# Dropping the null rows
df=df.dropna()

# Identifying invalid dates
df_with_invalid_dates = df.filter(col("review_date") == "0000-00-00")
df_with_invalid_dates.show()

#Finding empty strings
df_with_empty_review_headline = df.filter((col("review_headline").isNull()) | (col("review_headline") == ""))
df_with_empty_review_headline.show()
df_with_empty_review_body = df.filter((col("review_body").isNull()) | (col("review_body") == ""))
df_with_empty_review_body.show()

# Filling the missing values with dafault
df=df.fillna({"review_headline":"No review",
              "review_body":"No review",
              "star_rating":'0',
              "helpful_votes":'0',
              "total_votes":'0'})
df.count()

+-----------+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+
|review_date|marketplace|customer_id|review_id|product_id|product_parent|product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body|
+-----------+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+
+-----------+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+

+-----------+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+
|review_date|marketplace|cust

150

In [None]:
# Standardize fields like product category, marketplace, etc. to avoid inconsistencies.
df = df.withColumn("marketplace", lower(trim(col("marketplace"))))
df = df.withColumn("product_title", lower(trim(col("product_title"))))
df = df.withColumn("product_category", lower(trim(col("product_category"))))
df = df.withColumn("review_headline", lower(trim(col("review_headline"))))
df = df.withColumn("review_body", lower(trim(col("review_body"))))
df.show()

+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
| 1995-08-07|         us|   53096398| R61VUUBXYT6EL|042510107X|   3.4724671E7|    red storm rising|           books|          5|            0|          1|false|            false|the single best m...|red storm rising ...|
| 1995-09-29|         us|   53094816| RT94SPZ5CHLK6| 133627659|  3.20981057E8|criminal justice ...|           books|

In [None]:
#Task 3: Data Transformation

# Format the date to a standard format (e.g., 'YYYY-MM-DD')
df = df.withColumn("review_date", to_date(col("review_date"), "yyyy-mm-dd"))


In [None]:
# Normalize text fields like review_body and review_headline to handle case sensitivity
# and remove unwanted characters.
df = df.withColumn("product_title", lower(trim(col("product_title"))))
df = df.withColumn("review_id", lower(trim(col("review_id"))))
df = df.withColumn("review_headline", lower(trim(col("review_headline"))))
df = df.withColumn("review_body", lower(trim(col("review_body"))))
df=df.withColumn("product_title",regexp_replace(col("product_title"),"[^a-zA-Z0-9\\s]",""))
df = df.withColumn("review_headline", regexp_replace(col("review_headline"), "[^a-zA-Z0-9\\s]", ""))
df = df.withColumn("review_body", regexp_replace(col("review_body"), "[^a-zA-Z0-9\\s]", ""))
df.show(15)

+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
| 1995-08-07|         us|   53096398| r61vuubxyt6el|042510107X|   3.4724671E7|    red storm rising|           books|          5|            0|          1|false|            false|the single best m...|red storm rising ...|
| 1995-09-29|         us|   53094816| rt94spz5chlk6| 133627659|  3.20981057E8|criminal justice ...|           books|

In [None]:
# Create additional features (e.g., "review_month" from review_date).

# Extracting review_month, review_year, and review_day
df = df.withColumn('review_year', year(col('review_date'))) \
       .withColumn('review_month', month(col('review_date'))) \
       .withColumn('review_day', dayofmonth(col('review_date')))

df.show(truncate=False)

+-----------+-----------+-----------+--------------+----------+--------------+-----------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+-----+-----------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# saving the cleaned data back to AWS S3

df.write.mode("overwrite").parquet("s3a://reviewwanalysisbucket/cleaned_review/")


In [None]:
# Extracting the cleaned data from AWs S3
df_cleaned = spark.read.parquet("s3a://reviewwanalysisbucket/cleaned_review/")
df_cleaned.show()

+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+-----------+------------+----------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|review_year|review_month|review_day|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+-----------+------------+----------+
| 1995-08-07|         us|   53096398| r61vuubxyt6el|042510107X|   3.4724671E7|    red storm rising|           books|          5|            0|          1|false|            false|the single best m...|red storm rising ...|       19