# Create Spark Session and Load data in pyspark dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length



# Create a Spark session
spark = SparkSession.builder \
    .appName("AmazonReviews")\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.hadoop.home.dir","file:///") \
    .config("spark.jars", "C:/Users/Unnati/anaconda3/envs/spark_env/Lib/site-packages/pyspark/jars/postgresql-42.7.1.jar") \
    .getOrCreate()

# Read the reviews file
reviews_df = spark.read.json("../Downloads/Digital_Music.json.gz")

# Display schema and first few rows
reviews_df.printSchema()
reviews_df.show(5)


root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)

+----------+-----+-------+--------------------+-----------+--------------+------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|      reviewerName|               style|             summary|unixReviewTime|verified|vote|
+--------

----

# Item having least rating

### a) Items recieved a rating equal to least rating in enitre dataset

In [2]:
from pyspark.sql.functions import min

# Find the minimum rating
min_rating = reviews_df.agg(min("overall")).first()[0]

# Filter items with the overall minimum rating and take distinct items
least_rated_items = reviews_df.filter(col("overall") == min_rating).select("asin", "overall").distinct()

least_rated_items.show()

least_rated_items.count()


+----------+-------+
|      asin|overall|
+----------+-------+
|B0011Z3ETM|    1.0|
|B0012328SQ|    1.0|
|B00136LDKQ|    1.0|
|B001O03NRM|    1.0|
|B0020VTEZO|    1.0|
|B002WXJU4A|    1.0|
|B004Z6KSF2|    1.0|
|B00534REC0|    1.0|
|B006JHVRNW|    1.0|
|B00ANGVEJM|    1.0|
|B00EE96DWM|    1.0|
|B00GR0FD86|    1.0|
|B00MHGH50I|    1.0|
|B00O6DQIYE|    1.0|
|B00W8DLF7Y|    1.0|
|B0108UOUBG|    1.0|
|B000079BCM|    1.0|
|B00065L59Y|    1.0|
|B000MM1H1W|    1.0|
|B000QOLYN2|    1.0|
+----------+-------+
only showing top 20 rows



26043

### b) Items with least average rating (for items having atleast 10 ratings )

In [3]:
from pyspark.sql.functions import avg, count

# Set a threshold for the minimum number of ratings an item should have
min_ratings_threshold = 10  # You can adjust this based on your dataset and requirements

# Calculate average rating and count of ratings per item
item_ratings_agg = (
    reviews_df.groupBy("asin")
    .agg(avg("overall").alias("avg_rating"), count("overall").alias("num_ratings"))
)

# Filter items with at least the minimum number of ratings
filtered_items = item_ratings_agg.filter(col("num_ratings") >= min_ratings_threshold)

# Find the minimum average rating
min_avg_rating = filtered_items.agg({"avg_rating": "min"}).collect()[0][0]

# Filter items with the minimum average rating
least_avg_rated_items = (
    filtered_items
    .filter(col("avg_rating") == min_avg_rating)
    .select("asin", "avg_rating", "num_ratings")
)

least_avg_rated_items.show()


+----------+----------+-----------+
|      asin|avg_rating|num_ratings|
+----------+----------+-----------+
|B00S33PD6W|       1.0|         73|
|B00NIJY63W|       1.0|         44|
|B012VAF74U|       1.0|         11|
+----------+----------+-----------+



----

# Item having most rating

### a) Items recieved a rating equal to max rating in enitre dataset

In [4]:
from pyspark.sql.functions import max

# Find the maximum rating
max_rating = reviews_df.agg(max("overall")).first()[0]

# Filter items with the overall maximum rating and take distinct items
most_rated_items = reviews_df.filter(col("overall") == max_rating).select("asin", "overall").distinct()

most_rated_items.show()

most_rated_items.count()


+----------+-------+
|      asin|overall|
+----------+-------+
|B000QONMQ4|    5.0|
|B000QVL4NU|    5.0|
|B000S43HH6|    5.0|
|B000SXHB6K|    5.0|
|B000SX8DW6|    5.0|
|B000TEIZ0Y|    5.0|
|B000V61688|    5.0|
|B000V615YS|    5.0|
|B000VRQSVM|    5.0|
|B000VZWMZ0|    5.0|
|B000W1AO4Y|    5.0|
|B000WLF1UG|    5.0|
|B000XEI154|    5.0|
|B000Z7S78U|    5.0|
|B00113SLBA|    5.0|
|B0011W7H5C|    5.0|
|B001229998|    5.0|
|B00122IQP6|    5.0|
|B00122YSJ4|    5.0|
|B00123FVMQ|    5.0|
+----------+-------+
only showing top 20 rows



392580

### b) Items with max average rating (for items having atleast 10 ratings )

In [5]:
from pyspark.sql.functions import avg, count

# Set a threshold for the minimum number of ratings an item should have
max_ratings_threshold = 10  # You can adjust this based on your dataset and requirements

# Calculate average rating and count of ratings per item
item_ratings_agg = (
    reviews_df.groupBy("asin")
    .agg(avg("overall").alias("avg_rating"), count("overall").alias("num_ratings"))
)

# Filter items with at least the minimum number of ratings
filtered_items = item_ratings_agg.filter(col("num_ratings") >= min_ratings_threshold)

# Find the minimum average rating
max_avg_rating = filtered_items.agg({"avg_rating": "max"}).collect()[0][0]

# Filter items with the maximum average rating
max_avg_rated_items = (
    filtered_items
    .filter(col("avg_rating") == max_avg_rating)
    .select("asin", "avg_rating", "num_ratings")
)

max_avg_rated_items.show()
max_avg_rated_items.count()


+----------+----------+-----------+
|      asin|avg_rating|num_ratings|
+----------+----------+-----------+
|B000YOIQEY|       5.0|         20|
|B0011W4304|       5.0|         14|
|B00137IN62|       5.0|         18|
|B00137QLPW|       5.0|         12|
|B004ABW9V8|       5.0|         17|
|B00E7H70F0|       5.0|         11|
|B00GAVAJC2|       5.0|         42|
|B017L0HNE2|       5.0|         11|
|B010XMNW8Q|       5.0|         10|
|B000X6TZ6G|       5.0|         36|
|B00122NUXE|       5.0|         12|
|B001EL6940|       5.0|         11|
|B001R0IINI|       5.0|         11|
|B005F1TI9I|       5.0|         11|
|B006U60MK6|       5.0|         11|
|B00MXBUWV6|       5.0|         14|
|B00PXO0NM8|       5.0|         10|
|B0159RK5OE|       5.0|         10|
|B00021Z7TQ|       5.0|         12|
|B00123N8Z8|       5.0|         12|
+----------+----------+-----------+
only showing top 20 rows



2294

-----

# Items having longest reviews

### a) Item with the longest review

In [6]:
longest_review_item = reviews_df.withColumn("review_length", length("reviewText")).orderBy(col("review_length").desc()).limit(1)

longest_review_item.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+-------------+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|               style|             summary|unixReviewTime|verified|vote|review_length|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+-------------+
|B00FZ11C0G| NULL|    3.0|This is my novel-...|03 26, 2013|A2NAWWR03ZBUTB|Just Some Guy|{NULL,  Audio CD,...|What if Thomas Ke...|    1364256000|    true|  26|        32501|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+-------------+



### b) Item with the max average review length

In [7]:
from pyspark.sql.functions import length,avg,count

# Calculate the length of reviews per item
item_reviews_length = (
    reviews_df.groupBy("asin")
    .agg(avg("overall").alias("avg_rating"), count("overall").alias("num_ratings"), avg(length("reviewText")).alias("avg_review_length"))
)

# Find the maximum average review length
max_avg_review_length = item_reviews_length.agg({"avg_review_length": "max"}).collect()[0][0]

# Filter items with the maximum average review length
longest_reviews_items = (
    item_reviews_length
    .filter(col("avg_review_length") == max_avg_review_length)
    .select("asin", "avg_rating", "num_ratings", "avg_review_length")
)

longest_reviews_items.show()

+----------+----------+-----------+-----------------+
|      asin|avg_rating|num_ratings|avg_review_length|
+----------+----------+-----------+-----------------+
|B0018IE3JM|       4.0|          1|          29701.0|
+----------+----------+-----------+-----------------+



----

# Transform: change the date MM-DD-YYYY format.

In [8]:
from pyspark.sql.functions import date_format,to_date



# Transform: Change the date format to MM-DD-YYYY
reviews_df_transformed = reviews_df.withColumn("reviewTime", to_date(reviews_df["reviewTime"], "MM dd, yyyy")).withColumn("reviewTime", date_format("reviewTime", "MM-dd-yyyy"))

reviews_df_transformed.show()

+----------+-----+-------+--------------------+----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText|reviewTime|    reviewerID|       reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+
|0001388703| NULL|    5.0|This is a great c...|12-22-2013|A1ZCPG3D3HGRSS|     mark l. massey|{NULL,  Audio CD,...|    Great worship cd|    1387670400|    true|NULL|
|0001388703| NULL|    5.0|So creative!  Lov...|09-11-2013| AC2PL52NKPL29|       Norma Mushen|{NULL,  Audio CD,...|Gotta listen to t...|    1378857600|    true|NULL|
|0001388703| NULL|    5.0|Keith Green, gone...|03-02-2013|A1SUZXBDZSDQ3A| Herbert W. Shurley|{NULL,  Audio CD,...|Great approach st...|    1362182400|    true|NULL|
|000138870

-----
# Time Trend Analysis of Ratings (Commulative Average Ratings )eeds.

The operation involves

1. Extracting the year and month from the review date.
2. Using a window function to calculate the cumulative average rating over time for each item.

This operation demonstrates how user ratings have evolved for different items over time.

In [9]:
from pyspark.sql.functions import to_date, avg,year,month
from pyspark.sql.window import Window
from pyspark.sql import functions as F

reviews_df_transformed = reviews_df_transformed.withColumn("year_month",  date_format(to_date(reviews_df_transformed["reviewTime"], "MM-dd-yyyy"), "MM-yyyy"))


# Define a window specification for the aggregation

window_spec = Window().partitionBy("asin").orderBy("year_month").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate the cumulative average rating over time for each item
cumulative_avg_ratings = (
    reviews_df_transformed.groupBy("asin", "year_month")
    .agg(avg("overall").alias("avg_rating"))
    .withColumn("cumulative_avg_rating", F.avg("avg_rating").over(window_spec))
)

cumulative_avg_ratings.show()


+----------+----------+----------+---------------------+
|      asin|year_month|avg_rating|cumulative_avg_rating|
+----------+----------+----------+---------------------+
|0633039640|   10-2010|       5.0|                  5.0|
|0819815586|   01-2014|       5.0|                  5.0|
|0819815586|   01-2018|       1.0|                  3.0|
|0819815586|   08-2013|       5.0|   3.6666666666666665|
|0898005965|   06-2010|       4.5|                  4.5|
|0910558337|   03-2018|       5.0|                  5.0|
|0910558337|   04-2017|       2.5|                 3.75|
|0972433503|   12-2013|       1.0|                  1.0|
|097761493X|   11-2013|       5.0|                  5.0|
|097761493X|   12-2013|       5.0|                  5.0|
|0983470812|   08-2011|       5.0|                  5.0|
|0984099239|   03-2011|       4.0|                  4.0|
|1070313335|   12-2010|       5.0|                  5.0|
|1570198268|   04-2008|       5.0|                  5.0|
|1570198985|   11-2013|       5

------
# Save the data into a postgres table

In [10]:
# JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
jdbc_properties = {"user": "postgres", "password": "12345678", "driver": "org.postgresql.Driver"}

# Save the DataFrame to PostgreSQL
flattened_df=reviews_df_transformed.withColumn("style_color",col("style.Color:"))\
                        .withColumn("style_color",col("style.Color:"))\
                        .withColumn("style_color",col("style.Color:"))\
                        .drop("style")

flattened_df.write.jdbc(url=jdbc_url, table="your_table", mode="overwrite", properties=jdbc_properties)


------
# Save Data in parquet file

In [11]:
# Convert the whole file into Parquet format

flattened_df.write.mode("overwrite").parquet("../Downloads/Transformed_Digital_Music.parquet")
