In [1]:
# !pip install pyspark

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
# spark = SparkSession.builder.appName('BRS').getOrCreate()
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

In [5]:
spark

In [6]:
# Books dataset

In [7]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, LongType

In [8]:
schema1=StructType().add("ISBN",LongType(),True).add("BookTitle",StringType(),True).add("BookAuthor",StringType(),True).add("YearOfPublication",IntegerType(),True).add("Publisher",StringType(),True).add("ImageURLS",StringType(),True).add("ImageURLM",StringType(),True).add("ImageURLL",StringType(),True)

In [9]:
print(schema1) 

StructType([StructField('ISBN', LongType(), True), StructField('BookTitle', StringType(), True), StructField('BookAuthor', StringType(), True), StructField('YearOfPublication', IntegerType(), True), StructField('Publisher', StringType(), True), StructField('ImageURLS', StringType(), True), StructField('ImageURLM', StringType(), True), StructField('ImageURLL', StringType(), True)])


In [10]:
df_books=spark.read.format("csv").option("header","True").schema(schema1).load("C:/Users/pppon/BRS_FINAL/Cleaned Data/cleaned_books.csv")

In [11]:
df_books.printSchema()

root
 |-- ISBN: long (nullable = true)
 |-- BookTitle: string (nullable = true)
 |-- BookAuthor: string (nullable = true)
 |-- YearOfPublication: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- ImageURLS: string (nullable = true)
 |-- ImageURLM: string (nullable = true)
 |-- ImageURLL: string (nullable = true)



In [12]:
df_books.toPandas()

Unnamed: 0,ISBN,BookTitle,BookAuthor,YearOfPublication,Publisher,ImageURLS,ImageURLM,ImageURLL
0,195153448.0,Classical Mythology,Mark P. O. Morford,2002.0,Oxford University Press,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...
1,2005018.0,Clara Callan,Richard Bruce Wright,2001.0,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...
2,60973129.0,Decision in Normandy,Carlo D'Este,1991.0,HarperPerennial,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...
3,374157065.0,Flu: The Story of the Great Influenza Pandemic...,Gina Bari Kolata,1999.0,Farrar Straus Giroux,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...
4,393045218.0,The Mummies of Urumchi,E. J. W. Barber,1999.0,W. W. Norton &amp; Company,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...
...,...,...,...,...,...,...,...,...
271355,440400988.0,There's a Bat in Bunk Five,Paula Danziger,1988.0,Random House Childrens Pub (Mm),http://images.amazon.com/images/P/0440400988.0...,http://images.amazon.com/images/P/0440400988.0...,http://images.amazon.com/images/P/0440400988.0...
271356,525447644.0,From One to One Hundred,Teri Sloat,1991.0,Dutton Books,http://images.amazon.com/images/P/0525447644.0...,http://images.amazon.com/images/P/0525447644.0...,http://images.amazon.com/images/P/0525447644.0...
271357,,Lily Dale : The True Story of the Town that Ta...,Christine Wicker,2004.0,HarperSanFrancisco,http://images.amazon.com/images/P/006008667X.0...,http://images.amazon.com/images/P/006008667X.0...,http://images.amazon.com/images/P/006008667X.0...
271358,192126040.0,Republic (World's Classics),Plato,1996.0,Oxford University Press,http://images.amazon.com/images/P/0192126040.0...,http://images.amazon.com/images/P/0192126040.0...,http://images.amazon.com/images/P/0192126040.0...


In [13]:
df_books.registerTempTable("books")



In [14]:
# Total number of publishers

In [15]:
query1=spark.sql("select distinct Publisher from books")

In [16]:
query1.count()

16841

In [17]:
# Total number of Books

In [18]:
query2=spark.sql("select distinct BookTitle from books")

In [19]:
query2.count()

242134

In [20]:
# Total number of Authors

In [21]:
query3=spark.sql("select distinct BookAuthor from books")

In [22]:
query3.count()

102051

In [23]:
# Year span

In [24]:
# query4=spark.sql("select count(distinct(books.YearOfPublication)) as Years, min(YearOfPublication) as Minimum, max(YearOfPublication) as Maximum from books")
query4=spark.sql("select min(YearOfPublication) as Minimum, max(YearOfPublication) as Maximum from books")

In [25]:
query4.show()

+-------+-------+
|Minimum|Maximum|
+-------+-------+
|   1806|   2021|
+-------+-------+



In [26]:
# Finding in which year maximum number of books were published

In [27]:
query5=spark.sql("select YearOfPublication, count(YearOfPublication) as Count from books group by YearOfPublication order by Count desc limit 1")

In [28]:
query5.show()

+-----------------+-----+
|YearOfPublication|Count|
+-----------------+-----+
|             2002|22255|
+-----------------+-----+



In [29]:
# Users dataset

In [30]:
schema2=StructType().add("UserID",LongType(),True).add("Location",StringType(),True).add("Age",IntegerType(),True)

In [31]:
print(schema2) 

StructType([StructField('UserID', LongType(), True), StructField('Location', StringType(), True), StructField('Age', IntegerType(), True)])


In [32]:
df_users=spark.read.format("csv").option("header","True").schema(schema2).load("C:/Users/pppon/BRS_FINAL/Cleaned Data/cleaned_users.csv")

In [33]:
df_users.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)



In [34]:
df_users.show()

+------+--------------------+---+
|UserID|            Location|Age|
+------+--------------------+---+
|     1|  nyc, new york, usa| 24|
|     2|stockton, califor...| 18|
|     3|moscow, yukon ter...| 24|
|     4|porto, v.n.gaia, ...| 17|
|     5|farnborough, hant...| 24|
|     6|santa monica, cal...| 61|
|     7| washington, dc, usa| 24|
|     8|timmins, ontario,...| 24|
|     9|germantown, tenne...| 24|
|    10|albacete, wiscons...| 26|
|    11|melbourne, victor...| 14|
|    12|fort bragg, calif...| 24|
|    13|barcelona, barcel...| 26|
|    14|mediapolis, iowa,...| 24|
|    15|calgary, alberta,...| 24|
|    16|albuquerque, new ...| 24|
|    17|chesapeake, virgi...| 24|
|    18|rio de janeiro, r...| 25|
|    19|           weston, ,| 14|
|    20|langhorne, pennsy...| 19|
+------+--------------------+---+
only showing top 20 rows



In [35]:
df_users.registerTempTable("users")

In [36]:
# Total number of users

In [37]:
query6=spark.sql("select distinct UserID from users")

In [38]:
query6.count()

278859

In [39]:
# Age of maximum no.of users

In [40]:
query7=spark.sql("select Age,count(Age) as Count from users group by Age order by Count desc limit 1")

In [41]:
query7.show()

+---+------+
|Age| Count|
+---+------+
| 24|117428|
+---+------+



In [42]:
# Finding average age of the users

In [43]:
query8=spark.sql("select round(avg(Age)) as Average_Age from users")

In [44]:
query8.show()

+-----------+
|Average_Age|
+-----------+
|       30.0|
+-----------+



In [45]:
# Ratings dataset

In [46]:
schema3=StructType().add("UserID",LongType(),True).add("ISBN",LongType(),True).add("BookRating",IntegerType(),True)

In [47]:
print(schema3) 

StructType([StructField('UserID', LongType(), True), StructField('ISBN', LongType(), True), StructField('BookRating', IntegerType(), True)])


In [48]:
df_ratings=spark.read.format("csv").option("header","True").schema(schema3).load("C:/Users/pppon/BRS_FINAL/Cleaned Data/cleaned_ratings.csv")

In [49]:
df_ratings.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- ISBN: long (nullable = true)
 |-- BookRating: integer (nullable = true)



In [50]:
df_ratings.registerTempTable("ratings")

In [51]:
# Books with maximum ratings

In [52]:
query9=spark.sql("select b.BookTitle, sum(r.BookRating) as Total_Ratings from books b join ratingS r on b.ISBN = r.ISBN group by b.BookTitle order by Total_Ratings desc limit 1")
query9.toPandas()

Unnamed: 0,BookTitle,Total_Ratings
0,The Lovely Bones: A Novel,5787


In [53]:
# Authors with maximum ratings

In [54]:
query10=spark.sql("select b.BookAuthor, sum(r.BookRating) as Total_Ratings from books b join ratingS r on b.ISBN = r.ISBN group by b.BookAuthor order by Total_Ratings desc limit 1")

In [55]:
query10.toPandas()

Unnamed: 0,BookAuthor,Total_Ratings
0,Stephen King,34202


In [56]:
# Publishers which have published maximum number of books

In [57]:
query11=spark.sql("select Publisher, count(Publisher) as Count from books group by Publisher order by Count desc limit 1")

In [58]:
query11.toPandas()

Unnamed: 0,Publisher,Count
0,Harlequin,7533
