### Practice 1: Task 1: We will work with a new dataset from BookCrossing (http://www.bookcrossing.com), a global community of book lovers who exchange books and share their experiences.

In [63]:
#pip install pyspark
#pip install findspark

In [64]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import*
from pyspark.sql.window import Window

In [65]:
spark = SparkSession.builder.appName("Merve_Homework").getOrCreate()

In [66]:
# fields = [StructField("Title", StringType(), True), 
#           StructField("Author", StringType(), True), 
#           StructField("Year", IntegerType(), True), 
#           StructField("Publisher", StringType(), True)]

In [67]:
# import all data from csv files
df1 = spark.read.csv("archive/Books.csv", sep = ";", header = True, inferSchema = True)
df1.show()

df2 = spark.read.csv("archive/Ratings.csv", sep = ";", header = True, inferSchema = True)
df2.show()

df3 = spark.read.csv("archive/Users.csv", sep = ";", header = True, inferSchema = True)
df3.show()

+----------+--------------------+--------------------+----+--------------------+
|      ISBN|               Title|              Author|Year|           Publisher|
+----------+--------------------+--------------------+----+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|1999|W. W. Norton & Co...|
|0399135782|The Kitchen God's...|             Amy Tan|1991|    Putnam Pub Group|
|0425176428|What If?: The Wor...|       Robert Cowley|2000|Berkley Publishin...|
|0671870432|     PLEADING GUILTY|         Scott Turow|1993|          Audioworks|
|0679425608|Under the Black F...|     David Cordingly|1996|        Random House|
|074322678X|Where You'll Fin

In [68]:
# Join two dataframes (Books and Ratings) on ISBN
df4 = df1.join(df2, on = "ISBN", how = "inner")

# Join two dataframes (Users and df4) on User-ID
df5 = df4.join(df3, on = "User-ID", how = "inner")

# Reorder columns in order User-ID, Age, Title, Author, Year, Publisher, ISBN, Rating
df5 = df5.select("User-ID", "Age", "Title", "Author", "Year", "Publisher", "ISBN", "Rating")
df5.show()

+-------+----+--------------------+----------------+----+--------------------+----------+------+
|User-ID| Age|               Title|          Author|Year|           Publisher|      ISBN|Rating|
+-------+----+--------------------+----------------+----+--------------------+----------+------+
|  23902|null|Nothing Can Be Be...|    Barns & Budd|1996|            Atlantic|0001048473|     0|
|  16319|  43|       Which Colour?|     Sue Dreamer|1989|HarperCollins Pub...|0001372564|     0|
|  26583|null|Huck Scarry's Ste...|     Huck Scarry|1979|HarperCollins Pub...|0001382381|     0|
| 176062|  37|ARE YOU MY MOTHER...|           Seuss|1993|HarperCollins Pub...|0001718126|     0|
| 114216|null|                Bess|   Robert Leeson|1975|             Collins|0001840517|     0|
|  11676|null| THE COAL HOUSE T/PB|   Andrew Taylor|1986|HarperCollins Pub...|0001848445|     0|
| 131402|null| THE COAL HOUSE T/PB|   Andrew Taylor|1986|HarperCollins Pub...|0001848445|     0|
| 145927|  53| THE COAL HOUSE 

## Question1

Find the title of the book with the highest number of ratings for each publisher.

In [69]:
from pyspark.sql.functions import dense_rank
from pyspark.sql.functions import max, min, count, col

# Show the title of book had highest rating from each Publisher by Spark Window Functions and sort by Rating
# windowSpec = Window.partitionBy(df4["Publisher"]).orderBy(df4["Rating"].desc())
# df4.withColumn("rank", dense_rank().over(windowSpec)).orderBy(df4["Rating"].desc()).filter("rank == 1").select("Title", "Publisher", "Rating").show()

# Show the maximum number of rating of each Publisher and then sort by Number of Ratings
df5 = df4.groupBy("Publisher", "Title").agg(count("Rating").alias("Number of Ratings"))
df5 = df5.groupBy("Publisher", "Title").agg(max("Number of Ratings")).orderBy("max(Number of Ratings)", ascending = False).show()

+--------------------+--------------------+----------------------+
|           Publisher|               Title|max(Number of Ratings)|
+--------------------+--------------------+----------------------+
|             Too Far|         Wild Animus|                  2502|
|       Little, Brown|The Lovely Bones:...|                  1295|
|           Doubleday|   The Da Vinci Code|                   884|
|           Perennial|Divine Secrets of...|                   732|
|         Picador USA|The Red Tent (Bes...|                   723|
|Dell Publishing C...|     A Painted House|                   649|
|        Warner Books|        The Notebook|                   647|
|   Vintage Books USA|Snow Falling on C...|                   618|
|       Penguin Books|The Secret Life o...|                   615|
|         Pocket Star|     Angels & Demons|                   586|
|        Warner Books|Where the Heart I...|                   585|
|Arthur A. Levine ...|Harry Potter and ...|                   

## Question2

Calculate the difference between:

- The number of ratings for each book

- The number of ratings of the highest-rated book from the same publisher



In [70]:
from pyspark.sql.functions import max, min, count, col

# Show the maximum number of rating of each Publisher and then sort by Number of Ratings
df6 = df4.groupBy("Publisher", "Title").agg(count("Rating").alias("Number of Ratings"))
df7 = df6.groupBy("Publisher").agg(max("Number of Ratings").alias("Max Number of Ratings in Publisher"))
# df7.orderBy(df7["Max Number of Ratings in Publisher"].desc()).show()

df8 = df6.select("Publisher", "Title", "Number of Ratings").join(df7, on = "Publisher", how = "inner")

# Find the differences between the max rating of each publisher and the rating of each book
df8 = df8.withColumn("Difference", df8["Max Number of Ratings in Publisher"] - df8["Number of Ratings"])
df8.show()
# df8.orderBy(df8["Number of Ratings"].desc()).show()

+--------------------+--------------------+-----------------+----------------------------------+----------+
|           Publisher|               Title|Number of Ratings|Max Number of Ratings in Publisher|Difference|
+--------------------+--------------------+-----------------+----------------------------------+----------+
|Harper Mass Marke...| Playing With Cobras|                4|                               144|       140|
|Harper Mass Marke...|         The Natural|                1|                               144|       143|
|Harper Mass Marke...|       Heart's Honor|                4|                               144|       140|
|Harper Mass Marke...|Omega Sub (Omega ...|                3|                               144|       141|
|Harper Mass Marke...|        City of Gold|                5|                               144|       139|
|Harper Mass Marke...|   Heaven Comes Home|                1|                               144|       143|
|Harper Mass Marke...|      