# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.3.3 with hadoop 3.3, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [None]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
    !tar xf spark-3.5.0-bin-hadoop3.tgz
    !mv spark-3.5.0-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"


In [None]:
!pip install  -q pyspark

# Start a Local Cluster

In [None]:
import findspark
findspark.init()

In [None]:
spark_url = 'local'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

# Spark Assignment

Based on the movie review dataset in 'netflix-rotten-tomatoes-metacritic-imdb.csv', answer the below questions.

**Note:** do not clean or remove missing data

In [None]:
!wget https://raw.githubusercontent.com/pvateekul/2110531_DSDE_2023s1/main/code/Week10_Spark/netflix-rotten-tomatoes-metacritic-imdb.csv

--2023-10-30 07:01:43--  https://raw.githubusercontent.com/pvateekul/2110531_DSDE_2023s1/main/code/Week10_Spark/netflix-rotten-tomatoes-metacritic-imdb.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14767685 (14M) [text/plain]
Saving to: ‘netflix-rotten-tomatoes-metacritic-imdb.csv.1’


2023-10-30 07:01:43 (90.7 MB/s) - ‘netflix-rotten-tomatoes-metacritic-imdb.csv.1’ saved [14767685/14767685]



In [None]:
path = '/content/netflix-rotten-tomatoes-metacritic-imdb.csv'

In [None]:
df = spark.read.option("delimiter", ",").option("header", True).csv(path)


In [None]:
df.show()

+--------------------+--------------------+--------------------+--------------------+---------------+----------------+--------------------+------------+--------------------+--------------------+--------------------+-----------+----------+---------------------+----------------+---------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+
|               Title|               Genre|                Tags|           Languages|Series or Movie|Hidden Gem Score|Country Availability|     Runtime|            Director|              Writer|              Actors|View Rating|IMDb Score|Rotten Tomatoes Score|Metacritic Score|Awards Received|Awards Nominated For|   Boxoffice|Release Date|Netflix Release Date|    Production House|        Netflix Link|           IMDb Link|             Summary|IMDb Votes|           

In [None]:
cols = [c.replace(' ', '_') for c in df.columns]
cols

['Title',
 'Genre',
 'Tags',
 'Languages',
 'Series_or_Movie',
 'Hidden_Gem_Score',
 'Country_Availability',
 'Runtime',
 'Director',
 'Writer',
 'Actors',
 'View_Rating',
 'IMDb_Score',
 'Rotten_Tomatoes_Score',
 'Metacritic_Score',
 'Awards_Received',
 'Awards_Nominated_For',
 'Boxoffice',
 'Release_Date',
 'Netflix_Release_Date',
 'Production_House',
 'Netflix_Link',
 'IMDb_Link',
 'Summary',
 'IMDb_Votes',
 'Image',
 'Poster',
 'TMDb_Trailer',
 'Trailer_Site']

In [None]:
df = df.toDF(*cols)

In [None]:
df.columns

['Title',
 'Genre',
 'Tags',
 'Languages',
 'Series_or_Movie',
 'Hidden_Gem_Score',
 'Country_Availability',
 'Runtime',
 'Director',
 'Writer',
 'Actors',
 'View_Rating',
 'IMDb_Score',
 'Rotten_Tomatoes_Score',
 'Metacritic_Score',
 'Awards_Received',
 'Awards_Nominated_For',
 'Boxoffice',
 'Release_Date',
 'Netflix_Release_Date',
 'Production_House',
 'Netflix_Link',
 'IMDb_Link',
 'Summary',
 'IMDb_Votes',
 'Image',
 'Poster',
 'TMDb_Trailer',
 'Trailer_Site']

In [None]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Languages: string (nullable = true)
 |-- Series_or_Movie: string (nullable = true)
 |-- Hidden_Gem_Score: string (nullable = true)
 |-- Country_Availability: string (nullable = true)
 |-- Runtime: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Writer: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- View_Rating: string (nullable = true)
 |-- IMDb_Score: string (nullable = true)
 |-- Rotten_Tomatoes_Score: string (nullable = true)
 |-- Metacritic_Score: string (nullable = true)
 |-- Awards_Received: string (nullable = true)
 |-- Awards_Nominated_For: string (nullable = true)
 |-- Boxoffice: string (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Netflix_Release_Date: string (nullable = true)
 |-- Production_House: string (nullable = true)
 |-- Netflix_Link: string (nullable = true)
 |-- IMDb_Link: string (null

Spark SQL seems to not perform any guess on datatype.  To convert to proper data type, we cast each column to proper type using **'cast'** and replace back to the same column using **'withColumn'**.

In [None]:
cols = ['Hidden_Gem_Score', 'IMDb_Score', 'Rotten_Tomatoes_Score', 'Metacritic_Score', 'Awards_Received', 'Awards_Nominated_For','IMDb_Votes']
for c in cols:
    df = df.withColumn(c, col(c).cast('double'))

In [None]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Languages: string (nullable = true)
 |-- Series_or_Movie: string (nullable = true)
 |-- Hidden_Gem_Score: double (nullable = true)
 |-- Country_Availability: string (nullable = true)
 |-- Runtime: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Writer: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- View_Rating: string (nullable = true)
 |-- IMDb_Score: double (nullable = true)
 |-- Rotten_Tomatoes_Score: double (nullable = true)
 |-- Metacritic_Score: double (nullable = true)
 |-- Awards_Received: double (nullable = true)
 |-- Awards_Nominated_For: double (nullable = true)
 |-- Boxoffice: string (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Netflix_Release_Date: string (nullable = true)
 |-- Production_House: string (nullable = true)
 |-- Netflix_Link: string (nullable = true)
 |-- IMDb_Link: string (null

In [None]:
df.show(3)

+-------------------+--------------------+--------------------+----------------+---------------+----------------+--------------------+------------+---------------+--------------------+--------------------+-----------+----------+---------------------+----------------+---------------+--------------------+----------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+
|              Title|               Genre|                Tags|       Languages|Series_or_Movie|Hidden_Gem_Score|Country_Availability|     Runtime|       Director|              Writer|              Actors|View_Rating|IMDb_Score|Rotten_Tomatoes_Score|Metacritic_Score|Awards_Received|Awards_Nominated_For| Boxoffice|Release_Date|Netflix_Release_Date|    Production_House|        Netflix_Link|           IMDb_Link|             Summary|IMDb_Votes|               Image|              

## What is the maximum and average of the overall hidden gem score?

In [None]:
from pyspark.sql.functions import avg, min, max, countDistinct

In [None]:
df.select(max('Hidden_Gem_Score'), avg('Hidden_Gem_Score')).show()

+---------------------+---------------------+
|max(Hidden_Gem_Score)|avg(Hidden_Gem_Score)|
+---------------------+---------------------+
|                  9.8|    5.937551386501226|
+---------------------+---------------------+



In [None]:
df.filter(df["Hidden_Gem_Score"] ==9.8).select('Director').show()

+-----------+
|   Director|
+-----------+
|Dorin Marcu|
+-----------+



## How many movies that are available in Korea?

Contains the other element. Returns a boolean Column based on a string match.
*`contains`*
link: https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.Column.contains.html#pyspark.sql.Column.contains

In [None]:
# filter rows from df which sentences in column "Languages" contains "Korean"
df2 = df.filter(df.Languages.contains('Korean')).collect()
len(df2)

735

## Which director has the highest average hidden gem score?

In [None]:
df3 = spark.createDataFrame(df.groupby('Director').agg({'Hidden_Gem_Score': 'avg'}).collect())

In [None]:
df3.show()

+--------------------+---------------------+
|            Director|avg(Hidden_Gem_Score)|
+--------------------+---------------------+
|       Arne Mattsson|                  7.7|
|   Gregory Kirchhoff|                  8.5|
|     Alexander Nanau|                  7.8|
|      Egor Abramenko|                  3.1|
|Christian Le Hémonet|                  7.3|
|        Danial Rifki|                  8.4|
| Natalie Erika James|                  3.4|
|Qaushiq Mukherjee...|                  6.0|
|      Péter Bergendy|                  7.7|
|         Erol Özlevi|                  6.4|
| Matthias Temmermans|                  7.6|
|   Wash Westmoreland|    3.233333333333333|
|      Kôichi Chigira|                 8.05|
|      Hong Chang-Pyo|                  7.9|
|       Patricia Font|                  6.8|
|   Jennifer Westcott|                  6.5|
|       Shingo Suzuki|                  8.1|
|        Go-Woon Jeon|                  8.2|
|          Manish Jha|                  5.9|
|     Taku

In [None]:
df3.select(max('avg(Hidden_Gem_Score)')).show()

+--------------------------+
|max(avg(Hidden_Gem_Score))|
+--------------------------+
|                       9.8|
+--------------------------+



In [None]:
df3.filter(df3['avg(Hidden_Gem_Score)']>= 9.8).show()

+-----------+---------------------+
|   Director|avg(Hidden_Gem_Score)|
+-----------+---------------------+
|Dorin Marcu|                  9.8|
+-----------+---------------------+



## How many genres are there in the dataset?

**ChatGPT recommendation**

This code does the following:

It creates a Spark session.

It creates a DataFrame from the sample data.

It splits the 'Genre' column by commas and then uses the explode function to separate each genre into a new row.

It counts the number of unique genres using the distinct function.

Finally, it prints the count of unique genres.

Make sure to adapt the code to your actual PySpark DataFrame and the 'Genre' column in your dataset.

In [None]:
#Thank you ChatGPT
from pyspark.sql.functions import split, explode
df4 = df.select(explode(split(df.Genre, ',\s*')).alias('Genre'))
unique_genres_count = df4.select('Genre').distinct().count()
print(f"Number of unique genres: {unique_genres_count}")

Number of unique genres: 28


In [None]:
df4.select('Genre').show()

+---------+
|    Genre|
+---------+
|    Crime|
|    Drama|
|  Fantasy|
|   Horror|
|  Romance|
|   Comedy|
|    Drama|
| Thriller|
|    Drama|
|Animation|
|    Short|
|    Drama|
|   Comedy|
|  Romance|
|    Drama|
|    Crime|
|    Drama|
|   Comedy|
|   Comedy|
|   Family|
+---------+
only showing top 20 rows



In [None]:
df.select('Genre').show()

+--------------------+
|               Genre|
+--------------------+
|Crime, Drama, Fan...|
|              Comedy|
|     Drama, Thriller|
|               Drama|
|Animation, Short,...|
|     Comedy, Romance|
|               Drama|
|               Crime|
|               Drama|
|              Comedy|
|Comedy, Family, F...|
|Crime, Drama, Fan...|
|               Drama|
|       Comedy, Drama|
|        Short, Drama|
|Crime, Drama, Thr...|
|Action, Adventure...|
|Adventure, Drama,...|
|               Music|
|     Drama, Thriller|
+--------------------+
only showing top 20 rows

