# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.3.2 with hadoop 3.3, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [1]:
import os
import sys
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

if IN_COLAB:
  !sudo apt update
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  #Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
  !wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
  !tar xf spark-3.5.1-bin-hadoop3.tgz
  !pip install -q findspark
  !pip install pyspark
  !pip install py4j

  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Co[0m[33m0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadconte[0m                                                                               Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
[33m0% [Waiting for headers] [2 InRelease 14.2 kB/110 kB 13%] [Connected to ppa.lau[0m                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelea

# Start a Local Cluster

In [119]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

findspark.init()
spark_url = 'local'

spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

In [3]:
!wget -q https://github.com/pvateekul/2110446_DSDE_2023s2/raw/main/code/Week10_Spark/netflix-rotten-tomatoes-metacritic-imdb.csv

# Spark Assignment

Based on the movie review dataset in 'netflix-rotten-tomatoes-metacritic-imdb.csv', answer the below questions.

**Note:** do not clean or remove missing data

In [120]:
path = '/content/netflix-rotten-tomatoes-metacritic-imdb.csv'
df = spark.read.option("delimiter", ",").option("header", True).csv(path)

In [121]:
cols = [c.replace(' ', '_') for c in df.columns]
df = df.toDF(*cols)

In [122]:
df.columns, df.printSchema(), df.show(3)

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Languages: string (nullable = true)
 |-- Series_or_Movie: string (nullable = true)
 |-- Hidden_Gem_Score: string (nullable = true)
 |-- Country_Availability: string (nullable = true)
 |-- Runtime: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Writer: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- View_Rating: string (nullable = true)
 |-- IMDb_Score: string (nullable = true)
 |-- Rotten_Tomatoes_Score: string (nullable = true)
 |-- Metacritic_Score: string (nullable = true)
 |-- Awards_Received: string (nullable = true)
 |-- Awards_Nominated_For: string (nullable = true)
 |-- Boxoffice: string (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Netflix_Release_Date: string (nullable = true)
 |-- Production_House: string (nullable = true)
 |-- Netflix_Link: string (nullable = true)
 |-- IMDb_Link: string (null

(['Title',
  'Genre',
  'Tags',
  'Languages',
  'Series_or_Movie',
  'Hidden_Gem_Score',
  'Country_Availability',
  'Runtime',
  'Director',
  'Writer',
  'Actors',
  'View_Rating',
  'IMDb_Score',
  'Rotten_Tomatoes_Score',
  'Metacritic_Score',
  'Awards_Received',
  'Awards_Nominated_For',
  'Boxoffice',
  'Release_Date',
  'Netflix_Release_Date',
  'Production_House',
  'Netflix_Link',
  'IMDb_Link',
  'Summary',
  'IMDb_Votes',
  'Image',
  'Poster',
  'TMDb_Trailer',
  'Trailer_Site'],
 None,
 None)

In [123]:
df = df.withColumn('Hidden_Gem_Score',df.Hidden_Gem_Score.cast('double'))

## What is the maximum and average of the overall hidden gem score?

In [124]:
df.agg({'Hidden_Gem_score': 'avg', 'Hidden_Gem_Score': 'max'}).show()

+---------------------+---------------------+
|avg(Hidden_Gem_score)|max(Hidden_Gem_Score)|
+---------------------+---------------------+
|    5.937551386501226|                  9.8|
+---------------------+---------------------+



## How many movies that are available in Korea?

*   List item
*   List item



In [183]:
from pyspark.sql.functions import lower
df.filter(lower(col('Languages')).contains('korea')).count()

735

In [180]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
def is_available_in_korea(country):
  return 'korea' in country.lower()

is_available = udf(is_available_in_korea, BooleanType())

df\
.filter(df['Country_Availability'].isNotNull())\
.select(is_available('Country_Availability'))\
.groupBy('is_available_in_korea(Country_Availability)')\
.count()\
.show()

+-------------------------------------------+-----+
|is_available_in_korea(Country_Availability)|count|
+-------------------------------------------+-----+
|                                       true| 4845|
|                                      false|10616|
+-------------------------------------------+-----+



In [181]:
from pyspark.sql.functions import split

country = df\
.filter(col('Country_Availability').isNotNull())\
.select('Country_Availability')\
.collect()

sum (1 for i in country if 'korea' in i['Country_Availability'].lower())

4845

## Which director has the highest average hidden gem score?

In [103]:
from pyspark.sql.functions import avg

In [104]:
df.groupby('Director') \
.agg(avg('Hidden_Gem_Score')) \
.filter(col('avg(Hidden_Gem_Score)').isNotNull()) \
.orderBy('avg(Hidden_Gem_Score)',ascending = False) \
.show(1)

+-----------+---------------------+
|   Director|avg(Hidden_Gem_Score)|
+-----------+---------------------+
|Dorin Marcu|                  9.8|
+-----------+---------------------+
only showing top 1 row



## How many genres are there in the dataset?

In [105]:
from pyspark.sql.functions import split

genres = df \
.filter(col('Genre').isNotNull()) \
.withColumn('Genre', split(col('Genre'), ', ')) \
.select('Genre') \
.distinct() \
.collect()
s = set()

for i in genres:
  if type(i['Genre']) == list:
    s = s | (set(i['Genre']))

print(len(s))


28


In [118]:
spark.stop()