**Setting up PySpark and creating a Spark Session (since this a new Notebook)**

**Step 1**: Install Spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
# install findspark using pip
!pip install -q findspark

In [None]:
!pip3 install pyspark==3.0.2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.2
  Downloading pyspark-3.0.2.tar.gz (204.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.8/204.8 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 KB[0m [31m17.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186690 sha256=333eff5a842a7719bf096e0e90d35157b4a95c99ae9509e8eec5e382a3d77002
  Stored in directory: /root/.cache/pip/wheels/aa/8e/b9/ed8017fb2997a648f5868a4b728881f320e3d1bd2b0274f137
Successfully built pyspark
Installing collected packages: py4j, py

In [None]:
import findspark
findspark.init()

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark


In [None]:
# Create spark_session
spark_session = SparkSession.builder.getOrCreate()

In [None]:
sc = spark_session.sparkContext

In [None]:
df = spark_session.read.option("header", "true").csv("/content/drive/My Drive/Colab Notebooks/datasets/books.csv")

In [None]:
df.show()

+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|bookID|               title|             authors|average_rating|      isbn|       isbn13|language_code|  num_pages|ratings_count|text_reviews_count|publication_date|           publisher|
+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|     1|Harry Potter and ...|J.K. Rowling/Mary...|          4.57|0439785960|9780439785969|          eng|        652|      2095690|             27591|       9/16/2006|     Scholastic Inc.|
|     2|Harry Potter and ...|J.K. Rowling/Mary...|          4.49|0439358078|9780439358071|          eng|        870|      2153167|             29221|        9/1/2004|     Scholastic Inc.|
|     4|Harry Potter and ...|        J.K. Rowling|          

In [None]:
#Display the data types of the columns

df.dtypes

[('bookID', 'string'),
 ('title', 'string'),
 ('authors', 'string'),
 ('average_rating', 'string'),
 ('isbn', 'string'),
 ('isbn13', 'string'),
 ('language_code', 'string'),
 ('  num_pages', 'string'),
 ('ratings_count', 'string'),
 ('text_reviews_count', 'string'),
 ('publication_date', 'string'),
 ('publisher', 'string')]

# Using SQL to interface with Spark

**Step 1**: Register the DataFrame as a SQL temporary view so that you can interact with it using SQL commands


In [None]:
df.createOrReplaceTempView("books")

**Step 2**: Query the temporary view using SQL

In [None]:
sqlDF = spark_session.sql("SELECT * FROM books")
sqlDF.show()

+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|bookID|               title|             authors|average_rating|      isbn|       isbn13|language_code|  num_pages|ratings_count|text_reviews_count|publication_date|           publisher|
+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|     1|Harry Potter and ...|J.K. Rowling/Mary...|          4.57|0439785960|9780439785969|          eng|        652|      2095690|             27591|       9/16/2006|     Scholastic Inc.|
|     2|Harry Potter and ...|J.K. Rowling/Mary...|          4.49|0439358078|9780439358071|          eng|        870|      2153167|             29221|        9/1/2004|     Scholastic Inc.|
|     4|Harry Potter and ...|        J.K. Rowling|          

**Step 3**: Filter the data so that only records where the average rating is greater than 4.5

In [None]:
sqlDF.filter("average_rating > 4.5").show()

+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|bookID|               title|             authors|average_rating|      isbn|       isbn13|language_code|  num_pages|ratings_count|text_reviews_count|publication_date|           publisher|
+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+
|     1|Harry Potter and ...|J.K. Rowling/Mary...|          4.57|0439785960|9780439785969|          eng|        652|      2095690|             27591|       9/16/2006|     Scholastic Inc.|
|     5|Harry Potter and ...|J.K. Rowling/Mary...|          4.56|043965548X|9780439655484|          eng|        435|      2339585|             36325|        5/1/2004|     Scholastic Inc.|
|     8|Harry Potter Boxe...|J.K. Rowling/Mary...|          

**Step 4**: Filter the dataset to include books that have an average rating of greater than 4.5 and the publisher is 'Ballantine Books'

In [None]:
sqlDF.filter("average_rating > 4.5").filter("publisher == 'Ballantine Books'").show()

+------+--------------------+--------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+----------------+
|bookID|               title|       authors|average_rating|      isbn|       isbn13|language_code|  num_pages|ratings_count|text_reviews_count|publication_date|       publisher|
+------+--------------------+--------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+----------------+
|    30|J.R.R. Tolkien 4-...|J.R.R. Tolkien|          4.59|0345538374|9780345538376|          eng|       1728|       101233|              1550|       9/25/2012|Ballantine Books|
+------+--------------------+--------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+----------------+



**Step 5**: Add a new column that depicts for each book what proportion of the overall ratings count are text review counts

In [None]:
import pyspark.sql.functions as F

sqlDF = sqlDF.withColumn("proportion_of_ratings", F.bround((sqlDF.text_reviews_count/sqlDF.ratings_count),2))
sqlDF = sqlDF.withColumn("avg_rating_decimal", sqlDF.average_rating.cast('Decimal(4,2)'))
sqlDF.show()

+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+---------------------+------------------+
|bookID|               title|             authors|average_rating|      isbn|       isbn13|language_code|  num_pages|ratings_count|text_reviews_count|publication_date|           publisher|proportion_of_ratings|avg_rating_decimal|
+------+--------------------+--------------------+--------------+----------+-------------+-------------+-----------+-------------+------------------+----------------+--------------------+---------------------+------------------+
|     1|Harry Potter and ...|J.K. Rowling/Mary...|          4.57|0439785960|9780439785969|          eng|        652|      2095690|             27591|       9/16/2006|     Scholastic Inc.|                 0.01|              4.57|
|     2|Harry Potter and ...|J.K. Rowling/Mary...|          4.49|0439358078|97804393

# Aggregating data using GroupBy

**Step 6**: Find the lowest average_rating where the publisher if 'Broadway Books'

In [None]:
sqlDF.filter("publisher == 'Broadway Books'").groupBy().min("avg_rating_decimal").show()


+-----------------------+
|min(avg_rating_decimal)|
+-----------------------+
|                   3.25|
+-----------------------+



**Step 7**: Find the highest average_rating where the publisher if 'Broadway Books'


In [None]:
sqlDF.filter("publisher == 'Broadway Books'").groupBy().max("avg_rating_decimal").show()

+-----------------------+
|max(avg_rating_decimal)|
+-----------------------+
|                   4.43|
+-----------------------+



**Step 8**: Ascertain the cumulative count of ratings received for Broadway Books

In [None]:
#First convert from string type to integer
sqlDF = sqlDF.withColumn("total_rating_count", sqlDF.ratings_count.cast('Integer'))
sqlDF.filter("publisher == 'Broadway Books'").groupBy().sum("total_rating_count").show()

+-----------------------+
|sum(total_rating_count)|
+-----------------------+
|                 695164|
+-----------------------+



**Step 9:** Ascertain the average  count of ratings received for Broadway Books

In [None]:
sqlDF.filter("publisher == 'Broadway Books'").groupBy().avg("total_rating_count").show()

+-----------------------+
|avg(total_rating_count)|
+-----------------------+
|     14187.020408163266|
+-----------------------+



**Step 10**: Show average count of ratings for each publisher

In [None]:
sqlDF.groupBy("publisher").avg("total_rating_count").show()


+--------------------+-----------------------+
|           publisher|avg(total_rating_count)|
+--------------------+-----------------------+
|       The New Press|                 1693.0|
|        Chosen Books|                  536.5|
|       Digireads.com|     24628.166666666668|
|               Ember|               351258.4|
|           IVP Books|                   10.5|
|       No Exit Press|                43253.8|
|Dabel Brothers Pr...|                  276.0|
|                 DAW|     5079.1463414634145|
|      Celestial Arts|                  442.0|
|Arcadia Publishin...|                    5.0|
|Hachette Littérature|                    1.0|
|         Cleis Press|                  428.5|
|Chicago Review Press|      6315.857142857143|
|                 HQN|                 7769.0|
|World Wrestling E...|                 1154.0|
|Wayne State Unive...|                   45.0|
|           Doubleday|      8012.916666666667|
|               Verso|     1204.8666666666666|
|   Time Life

We will utilize **.agg()** method - this method lets you pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions submodule.

The *pyspark.sql.functions* submodule includes a number of useful functions for performing a variety of computations like standard deviations. Note that all the aggregation functions in this submodule take the name of a column in a GroupedData table as demonstrated via the example below

**Step 11**: Compute the standard deviation of ratings count by publisher

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F


# Standard deviation of ratings count by publisher
sqlDF.groupBy("publisher").agg(F.stddev("ratings_count")).show()

+--------------------+--------------------------+
|           publisher|stddev_samp(ratings_count)|
+--------------------+--------------------------+
|       The New Press|        4021.5151825694215|
|        Chosen Books|        457.49808742769625|
|       Digireads.com|        51201.525162505335|
|               Ember|         691000.7816122931|
|           IVP Books|        14.849242404917497|
|       No Exit Press|         17332.11097068098|
|Dabel Brothers Pr...|                       NaN|
|                 DAW|         6372.894532945668|
|      Celestial Arts|         612.3544725075502|
|Arcadia Publishin...|        2.9439202887759492|
|Hachette Littérature|                       NaN|
|         Cleis Press|        488.69792987761537|
|Chicago Review Press|         9251.830511283904|
|                 HQN|         7846.056844045931|
|World Wrestling E...|                       NaN|
|Wayne State Unive...|         46.66904755831214|
|           Doubleday|        24104.695025350382|


# Joining datasets using Spark SQL

In [None]:
# Create the tables that will be joined

valuesA = [('Dog',1),('Monkey',2),('Elephant',3),('Penguin',4)]
TableA = spark_session.createDataFrame(valuesA,['name','id'])
 
valuesB = [('Lizard',1),('Dog',2),('Elephant',3),('Rat',4)]
TableB = spark_session.createDataFrame(valuesB,['name','id'])
 
TableA.show()
TableB.show()

+--------+---+
|    name| id|
+--------+---+
|     Dog|  1|
|  Monkey|  2|
|Elephant|  3|
| Penguin|  4|
+--------+---+

+--------+---+
|    name| id|
+--------+---+
|  Lizard|  1|
|     Dog|  2|
|Elephant|  3|
|     Rat|  4|
+--------+---+



In [None]:
# Create table aliases

T1 = TableA.alias('T1')
T2 = TableB.alias('T2')

In [None]:
# Inner Join on name - returns matching records from the 2 tables respectively

T1.join(T2, T1.name == T2.name, 'inner').show()


+--------+---+--------+---+
|    name| id|    name| id|
+--------+---+--------+---+
|Elephant|  3|Elephant|  3|
|     Dog|  1|     Dog|  2|
+--------+---+--------+---+



In [None]:
# Left Join Example - If you want to select all records from table 1 and return data from table 2 when it matches, you choose ‘left’ 

T1.join(T2, T1.name == T2.name, 'left').show()

+--------+---+--------+----+
|    name| id|    name|  id|
+--------+---+--------+----+
|Elephant|  3|Elephant|   3|
|  Monkey|  2|    null|null|
|     Dog|  1|     Dog|   2|
| Penguin|  4|    null|null|
+--------+---+--------+----+



In [None]:
# Left Join - filtering out nulls

left_join = T1.join(T2, T1.name == T2.name, 'left')
left_join.filter((T2.name).isNotNull()).show()

+--------+---+--------+---+
|    name| id|    name| id|
+--------+---+--------+---+
|Elephant|  3|Elephant|  3|
|     Dog|  1|     Dog|  2|
+--------+---+--------+---+



In [None]:
# Right Join Example - If you want to select all records from table 2 and return data from table 1 when it matches, you choose ‘right’ 

T1.join(T2, T1.name == T2.name, 'right').show()

+--------+----+--------+---+
|    name|  id|    name| id|
+--------+----+--------+---+
|Elephant|   3|Elephant|  3|
|    null|null|     Rat|  4|
|    null|null|  Lizard|  1|
|     Dog|   1|     Dog|  2|
+--------+----+--------+---+



In [None]:
# Full Outer Join - This shows all records from the left table and all the records from the right table and nulls where the two do not match

T1.join(T2, T1.name == T2.name,how='full').show()

+--------+----+--------+----+
|    name|  id|    name|  id|
+--------+----+--------+----+
|Elephant|   3|Elephant|   3|
|    null|null|     Rat|   4|
|    null|null|  Lizard|   1|
|  Monkey|   2|    null|null|
|     Dog|   1|     Dog|   2|
| Penguin|   4|    null|null|
+--------+----+--------+----+

