# CW1 - Amazon Bestsellers Analysis with PySpark


In this assignment you will be tasked with exploring a dataset containing the Top 50 best-selling books from Amazon between 2009-2019. You should complete the exercises presented in the Google Colab Notebook below. This assignment will be graded using CodeGrade.

Exercise 1 (5 Marks): Find the authors with the most entries in the bestseller’s lists, find the number of unique titles for each, the average rating, total number of reviews, and highest position in the ranking.

Exercise 2 (5 Marks): For fiction and non-fiction books, find the average and total number of reviews for the top 10, 25, and 50 of the bestsellers lists, in each year.

Exercise 3 (10 Marks): For each year, find the average price of a fiction and non-fiction book in the top 10, 25 and 50 of the bestsellers lists.

Exercise 4 (10 Marks): For free books—where the price is zero—fine the number of unique titles and authors. Compare the average rating and number of reviews in each year between free and priced books.


In [1]:
# CodeGrade Tag Init1

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# CodeGrade Tag Init2

# Apache Spark uses Java, so first we must install that
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Unpack Spark from google drive
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"

# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [3]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

from pyspark.sql import functions as F

In [4]:
# Load the AmazonBooks.csv file into your notebook as a pyspark dataframe

CsvPath = '/content/drive/MyDrive/AmazonBooks-1.csv'

# Load .csv with header, ',' seperators and inferred schema
BooksDF = spark.read\
                     .option('header', 'True')\
                     .option('sep', ',')\
                     .option('inferSchema', 'True')\
                     .csv(CsvPath)



In [5]:
# CodeGrade Tag Init3

BooksDF.printSchema()
BooksDF.show()

root
 |-- Name: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- User Rating: double (nullable = true)
 |-- Reviews: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Rank: integer (nullable = true)

+--------------------+----------------+-----------+-------+-----+----+-----------+----+
|                Name|          Author|User Rating|Reviews|Price|Year|      Genre|Rank|
+--------------------+----------------+-----------+-------+-----+----+-----------+----+
|            The Help|Kathryn Stockett|        4.8|  13871|    6|2009|    Fiction|   1|
|Where the Wild Th...|  Maurice Sendak|        4.8|   9967|   13|2009|    Fiction|   2|
|The Last Olympian...|    Rick Riordan|        4.8|   4628|    7|2009|    Fiction|   3|
|Diary of a Wimpy ...|     Jeff Kinney|        4.8|   3837|   15|2009|    Fiction|   4|
|            Watchmen|      Alan Moore|        4.8|   3829|   42|2009|  

In [6]:
# pyspark.sql.functions countains all the transformations and actions you will
# need
from pyspark.sql import functions as F

# Exercise 1

Find the authors with the most entries in the bestseller’s lists, find the number of unique titles for each, the average rating, total number of reviews, and highest position in the ranking.

Author, Number of books, Average Rating, Total Ratings, Highest rank

Sort by the number of titles in descending order.

In [8]:
# CodeGrade Tag Ex1
# Perform aggregation on the DataFrame to calculate statistics grouped by author
from pyspark.sql.functions import col, countDistinct, avg, sum, min, round, count

# Group data by author and calculate various statistics
author_statistics = BooksDF.groupBy("Author").agg(
    count("Name").alias("Number of Titles"),
    countDistinct("Name").alias("Unique_Number_of_books"),
    round(avg("User Rating"), 2).alias("Average_rating"),
    sum("Reviews").alias("Total_reviews"),
    min("Rank").alias("Highest_rank")
)

# Sort the results by the number of books in descending order
author_statistics = author_statistics.orderBy(col("Unique_Number_of_books").desc())

# Display the calculated statistics
author_statistics.show()



+----------------+----------------+----------------------+--------------+-------------+------------+
|          Author|Number of Titles|Unique_Number_of_books|Average_rating|Total_reviews|Highest_rank|
+----------------+----------------+----------------------+--------------+-------------+------------+
|     Jeff Kinney|              13|                    13|          4.81|        93529|           4|
|    Rick Riordan|              11|                    10|          4.77|        44169|           3|
| Stephenie Meyer|               8|                     8|          4.68|       108273|          12|
|      Dav Pilkey|               8|                     7|           4.9|        82541|           4|
|   Bill O'Reilly|               7|                     6|          4.64|        63787|          13|
|    J.K. Rowling|               6|                     6|          4.45|        70535|           2|
| Suzanne Collins|              12|                     6|          4.67|       315502|    

# Exercise 2

For fiction and non-fiction books, find the average and total number of reviews for the top 10, 25, and 50 of the bestsellers lists, in each year.

Year, Genre, Average Number of Reviews, Total Reviews.

Sort by the year in ascending order.

In [9]:
# CodeGrade Tag Ex2
# Filter the top 10, 25, and 50 best-selling books
top_10_books = BooksDF.filter("Rank <= 10")
top_25_books = BooksDF.filter("Rank <= 25")
top_50_books = BooksDF.filter("Rank <= 50")

# Grouping and aggregation for the top 10, 25, and 50 best-selling books
grouped_data_top10 = top_10_books.groupby("Year", "Genre").agg(sum("Reviews").alias("TotalReviews_Top10"),
                                                               round(avg("Reviews"), 2).alias("AverageReviews_Top10"))
grouped_data_top25 = top_25_books.groupby("Year", "Genre").agg(sum("Reviews").alias("TotalReviews_Top25"),
                                                               round(avg("Reviews"), 2).alias("AverageReviews_Top25"))
grouped_data_top50 = top_50_books.groupby("Year", "Genre").agg(sum("Reviews").alias("TotalReviews_Top50"),
                                                               round(avg("Reviews"), 2).alias("AverageReviews_Top50"))

# Join all the DataFrames
combined_top_books_data = grouped_data_top10.join(grouped_data_top25, ["Year", "Genre"], "outer") \
                                            .join(grouped_data_top50, ["Year", "Genre"], "outer")

# Display the joined DataFrame
combined_top_books_data.show(25)

+----+-----------+------------------+--------------------+------------------+--------------------+------------------+--------------------+
|Year|      Genre|TotalReviews_Top10|AverageReviews_Top10|TotalReviews_Top25|AverageReviews_Top25|TotalReviews_Top50|AverageReviews_Top50|
+----+-----------+------------------+--------------------+------------------+--------------------+------------------+--------------------+
|2009|    Fiction|             39313|             6552.17|            112100|             7473.33|            156824|             6534.33|
|2009|Non Fiction|             10364|              2591.0|             38198|              3819.8|             78682|             3026.23|
|2010|    Fiction|             96180|            10686.67|            121849|              8703.5|            168185|             8409.25|
|2010|Non Fiction|             29673|             29673.0|             67152|             6104.73|            105796|             3526.53|
|2011|    Fiction|         

# Exercise 3

For each year, find the average price of fiction and non-fiction books in the top 10, 25 and 50 of the bestsellers list. Make a dataframe where the columns are:

Year, Genre, Avg Price in Top 10, Avg Price in Top 25 and Avg Price in Top 50

Sort by the year in ascending order.

In [10]:
# CodeGrade Tag Ex3
# Filter the top 10, 25, and 50 best-selling books
top_10_books = BooksDF.filter("Rank <= 10")
top_25_books = BooksDF.filter("Rank <= 25")
top_50_books = BooksDF.filter("Rank <= 50")

# Grouping and aggregation for average price
avg_price_top10 = top_10_books.groupBy("Year", "Genre").agg(avg("Price").alias("AvgPrice_Top10"))
avg_price_top25 = top_25_books.groupBy("Year", "Genre").agg(avg("Price").alias("AvgPrice_Top25"))
avg_price_top50 = top_50_books.groupBy("Year", "Genre").agg(avg("Price").alias("AvgPrice_Top50"))

# Create a DataFrame with the specified columns
avg_price_df = avg_price_top10.join(avg_price_top25, ["Year", "Genre"], "outer") \
                              .join(avg_price_top50, ["Year", "Genre"], "outer") \
                              .select("Year", "Genre", "AvgPrice_Top10", "AvgPrice_Top25", "AvgPrice_Top50") \
                              .orderBy("Year")

# Display the DataFrame
avg_price_df.show()



+----+-----------+------------------+------------------+------------------+
|Year|      Genre|    AvgPrice_Top10|    AvgPrice_Top25|    AvgPrice_Top50|
+----+-----------+------------------+------------------+------------------+
|2009|    Fiction|15.833333333333334|18.866666666666667|15.583333333333334|
|2009|Non Fiction|              14.5|              19.1| 15.23076923076923|
|2010|    Fiction|10.777777777777779|10.928571428571429|               9.7|
|2010|Non Fiction|              16.0|15.636363636363637|              16.0|
|2011|    Fiction|            10.375|10.923076923076923|11.619047619047619|
|2011|Non Fiction|              12.0|15.666666666666666|17.620689655172413|
|2012|    Fiction|13.333333333333334|              11.9|12.285714285714286|
|2012|Non Fiction|             17.25|17.933333333333334|17.482758620689655|
|2013|    Fiction| 9.333333333333334| 9.357142857142858|10.708333333333334|
|2013|Non Fiction|               8.0|14.363636363636363|18.192307692307693|
|2014|Non Fi

# Exercise 4

For free books, find the total number of unique title and author, store these as variables called ```free_titles``` and ```free_authors```.

Compare the average rating and number of reviews for free and priced books, in each year of the dataset. Create a dataframe where the columns are:

Year, Avg Rating Free, Avg Rating Priced, Total Ratings Free, Total Ratings Priced

Sort by the year in ascending order.

In [11]:
# CodeGrade Tag Ex4a
# Filter the DataFrame to include only books with a price of zero
free_books = BooksDF.filter(col("Price") == 0)

# Count the number of unique titles and authors separately
free_titles = free_books.select(countDistinct("Name").alias("Number_of_unique_titles")).collect()[0][0]
free_authors = free_books.select(countDistinct("Author").alias("Number_of_unique_authors")).collect()[0][0]

# Display the results
print("Number of unique titles for free books:", free_titles)
print("Number of unique authors for free books:", free_authors)


Number of unique titles for free books: 9
Number of unique authors for free books: 6


In [12]:
# CodeGrade Tag Exb
# Filter the DataFrame to include only free books (where the price is zero)
free_books = BooksDF.filter(col("Price") == 0)

# Group and aggregate data for free books by year
free_books_stats = free_books.groupBy("Year") \
                             .agg(avg("User Rating").alias("AvgRating_FreeBooks"),
                                  sum("Reviews").alias("TotalReviews_FreeBooks"))

# Filter the DataFrame to include only priced books
priced_books = BooksDF.filter(col("Price") > 0)

# Group and aggregate data for priced books by year
priced_books_stats = priced_books.groupBy("Year") \
                                 .agg(avg("User Rating").alias("AvgRating_PricedBooks"),
                                      sum("Reviews").alias("TotalReviews_PricedBooks"))

# Join the datasets
comparison_data = free_books_stats.join(priced_books_stats, "Year", "outer") \
                                  .orderBy("Year") \
                                  .na.fill(0)

# Provide a meaningful name to the joined DataFrame
comparison_data = comparison_data.withColumnRenamed("Year", "Year") \
                                 .withColumnRenamed("AvgRating_FreeBooks", "AvgRating_FreeBooks") \
                                 .withColumnRenamed("TotalReviews_FreeBooks", "TotalReviews_FreeBooks") \
                                 .withColumnRenamed("AvgRating_PricedBooks", "AvgRating_PricedBooks") \
                                 .withColumnRenamed("TotalReviews_PricedBooks", "TotalReviews_PricedBooks")

# Display the joined DataFrame
comparison_data.show()

+----+-------------------+----------------------+---------------------+------------------------+
|Year|AvgRating_FreeBooks|TotalReviews_FreeBooks|AvgRating_PricedBooks|TotalReviews_PricedBooks|
+----+-------------------+----------------------+---------------------+------------------------+
|2009|                0.0|                     0|                4.584|                  235506|
|2010|                4.6|                  2122|    4.557142857142857|                  271859|
|2011|                4.8|                  4505|    4.553061224489797|                  400536|
|2012|                0.0|                     0|                4.532|                  654546|
|2013|                4.8|                 33046|              4.54375|                  621861|
|2014|               4.75|                 32738|    4.610869565217391|                  760259|
|2015|                4.8|                 26234|    4.644897959183671|                  685435|
|2016|                4.8|    