In [1]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [7]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

!head -5 /content/drive/MyDrive/data/IMDb_movies.csv
!head -5 /content/drive/MyDrive/data/IMDb_ratings.csv
!head -5 /content/drive/MyDrive/data/netflix_dataset.csv



imdb_title_id,title,original_title,year,date_published,genre,duration,country,language,director,writer,production_company,actors,description,avg_vote,votes,budget,usa_gross_income,worlwide_gross_income,metascore,reviews_from_users,reviews_from_critics
tt0000009,Miss Jerry,Miss Jerry,1894,1894-10-09,Romance,45,USA,None,Alexander Black,Alexander Black,Alexander Black Photoplays,"Blanche Bayliss, William Courtenay, Chauncey Depew",The adventures of a female reporter in the 1890s.,5.9,154,,,,,1,2
tt0000574,The Story of the Kelly Gang,The Story of the Kelly Gang,1906,26-12-1906,"Biography, Crime, Drama",70,Australia,None,Charles Tait,Charles Tait,J. and N. Tait,"Elizabeth Tait, John Tait, Norman Campbell, Bella Cola, Will Coyne, Sam Crewes, Jack Ennis, John Forde, Vera Linden, Mr. Marshall, Mr. McKenzie, Frank Mills, Ollie Wilson",True story of notorious Australian outlaw Ned Kelly (1855-80).,6.1,589,$ 2250,,,,7,7
tt0001892,Den sorte drøm,Den sorte drøm,1911,19-08-1911,Drama,53,"Germany,

In [9]:
movie_schema = StructType([
        StructField("imdb_title_id", StringType(), True),
        StructField("title", StringType(), True),
        StructField("original_title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("date_published", StringType(), True),
        StructField("genre", StringType(), True),
        StructField("duration", IntegerType(), True),
        StructField("country", StringType(), True),
        StructField("language", StringType(), True),
        StructField("director", StringType(), True)])

df_movie = spark.read.csv('/content/drive/MyDrive/data/IMDb_movies.csv', sep=',',schema = movie_schema, header=True)

df_movie.show()

+-------------+--------------------+--------------------+----+--------------+--------------------+--------+----------------+---------------+--------------------+
|imdb_title_id|               title|      original_title|year|date_published|               genre|duration|         country|       language|            director|
+-------------+--------------------+--------------------+----+--------------+--------------------+--------+----------------+---------------+--------------------+
|    tt0000009|          Miss Jerry|          Miss Jerry|1894|    1894-10-09|             Romance|      45|             USA|           None|     Alexander Black|
|    tt0000574|The Story of the ...|The Story of the ...|1906|    26-12-1906|Biography, Crime,...|      70|       Australia|           None|        Charles Tait|
|    tt0001892|      Den sorte drøm|      Den sorte drøm|1911|    19-08-1911|               Drama|      53|Germany, Denmark|           null|           Urban Gad|
|    tt0002101|           Cl

In [71]:
ratings_schema = StructType([
        StructField("imdb_title_id", StringType(), True),
        StructField("weighted_average_vote", StringType(), True),
        StructField("total_votes", IntegerType(), True),
        StructField("mean_vote", FloatType(), True),
        StructField("median_vote", IntegerType(), True),
        StructField("vote_10", IntegerType(), True),
        StructField("vote_9", IntegerType(), True),
        StructField("vote_8", IntegerType(), True),
        StructField("vote_7", IntegerType(), True),
        StructField("vote_6", IntegerType(), True)])
      
df_ratings = spark.read.csv('/content/drive/MyDrive/data/IMDb_ratings.csv', sep=',',schema = ratings_schema, header=True)       
df_ratings.show()

+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|imdb_title_id|weighted_average_vote|total_votes|mean_vote|median_vote|vote_10|vote_9|vote_8|vote_7|vote_6|
+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|    tt0000009|                  5.9|        154|      5.9|          6|     12|     4|    10|    43|    28|
|    tt0000574|                  6.1|        589|      6.3|          6|     57|    18|    58|   137|   139|
|    tt0001892|                  5.8|        188|      6.0|          6|      6|     6|    17|    44|    52|
|    tt0002101|                  5.2|        446|      5.3|          5|     15|     8|    16|    62|    98|
|    tt0002130|                    7|       2237|      6.9|          7|    210|   225|   436|   641|   344|
|    tt0002199|                  5.7|        484|      5.8|          6|     33|    15|    48|    80|   123|
|    tt0002423|             

In [44]:
netflix_schema = StructType([
        StructField("show_id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("title", StringType(), True),
        StructField("director", StringType(), True),
        StructField("cast", StringType(), True),
        StructField("country", StringType(), True),
        StructField("date_added", StringType(), True),
        StructField("release_year", IntegerType(), True),
        StructField("rating", StringType(), True),
        StructField("duration", StringType(), True)])
      
df_netflix = spark.read.csv('/content/drive/MyDrive/data/netflix_dataset.csv', sep=',',schema = netflix_schema, header=True)       
df_netflix.show()

+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+
|show_id|   type| title|            director|                cast|             country|       date_added|release_year|rating| duration|
+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+
|     s1|TV Show|    3%|                null|João Miguel, Bian...|              Brazil|  August 14, 2020|        2020| TV-MA|4 Seasons|
|     s2|  Movie| 07:19|   Jorge Michel Grau|Demián Bichir, Hé...|              Mexico|December 23, 2016|        2016| TV-MA|   93 min|
|     s3|  Movie| 23:59|        Gilbert Chan|Tedd Chan, Stella...|           Singapore|December 20, 2018|        2011|     R|   78 min|
|     s4|  Movie|     9|         Shane Acker|Elijah Wood, John...|       United States|November 16, 2017|        2009| PG-13|   80 min|
|     s5|  Movie|    21|      Robert Luketic|Jim

In [45]:
df_movie_model=df_movie.select('imdb_title_id', 'title')
df_movie_model.show( truncate= False)

+-------------+---------------------------------------------------+
|imdb_title_id|title                                              |
+-------------+---------------------------------------------------+
|tt0000009    |Miss Jerry                                         |
|tt0000574    |The Story of the Kelly Gang                        |
|tt0001892    |Den sorte drøm                                     |
|tt0002101    |Cleopatra                                          |
|tt0002130    |L'Inferno                                          |
|tt0002199    |From the Manger to the Cross; or, Jesus of Nazareth|
|tt0002423    |Madame DuBarry                                     |
|tt0002445    |Quo Vadis?                                         |
|tt0002452    |Independenta Romaniei                              |
|tt0002461    |Richard III                                        |
|tt0002646    |Atlantis                                           |
|tt0002844    |Fantômas - À l'ombre de la guillo

In [56]:
df_ratings_model=df_ratings.select('imdb_title_id', 'weighted_average_vote')
df_ratings_model.show( truncate= False)

+-------------+---------------------+
|imdb_title_id|weighted_average_vote|
+-------------+---------------------+
|tt0000009    |5.9                  |
|tt0000574    |6.1                  |
|tt0001892    |5.8                  |
|tt0002101    |5.2                  |
|tt0002130    |7                    |
|tt0002199    |5.7                  |
|tt0002423    |6.8                  |
|tt0002445    |6.2                  |
|tt0002452    |6.7                  |
|tt0002461    |5.5                  |
|tt0002646    |6.6                  |
|tt0002844    |7                    |
|tt0003014    |7.1                  |
|tt0003037    |7                    |
|tt0003102    |6.2                  |
|tt0003131    |6.5                  |
|tt0003165    |7                    |
|tt0003167    |5.8                  |
|tt0003419    |6.5                  |
|tt0003471    |6                    |
+-------------+---------------------+
only showing top 20 rows



In [57]:
df_movie_ratings = df_ratings_model.join(df_movie_model, 'imdb_title_id',) \
                                                .select(df_movie_model['title'],  
                                                df_ratings_model['imdb_title_id'],
                                                df_ratings_model['weighted_average_vote'])
                                                
df_movie_ratings.show()

+--------------------+-------------+---------------------+
|               title|imdb_title_id|weighted_average_vote|
+--------------------+-------------+---------------------+
|          Miss Jerry|    tt0000009|                  5.9|
|The Story of the ...|    tt0000574|                  6.1|
|      Den sorte drøm|    tt0001892|                  5.8|
|           Cleopatra|    tt0002101|                  5.2|
|           L'Inferno|    tt0002130|                    7|
|From the Manger t...|    tt0002199|                  5.7|
|      Madame DuBarry|    tt0002423|                  6.8|
|          Quo Vadis?|    tt0002445|                  6.2|
|Independenta Roma...|    tt0002452|                  6.7|
|         Richard III|    tt0002461|                  5.5|
|            Atlantis|    tt0002646|                  6.6|
|Fantômas - À l'om...|    tt0002844|                    7|
|Il calvario di un...|    tt0003014|                  7.1|
|Juve contre Fantômas|    tt0003037|                    

In [48]:
df_netflix_model=df_netflix.select('title', 'type', 'rating')
df_netflix_model.show( truncate= False)

+------+-------+------+
|title |type   |rating|
+------+-------+------+
|3%    |TV Show|TV-MA |
|07:19 |Movie  |TV-MA |
|23:59 |Movie  |R     |
|9     |Movie  |PG-13 |
|21    |Movie  |PG-13 |
|46    |TV Show|TV-MA |
|122   |Movie  |TV-MA |
|187   |Movie  |R     |
|706   |Movie  |TV-14 |
|1920  |Movie  |TV-MA |
|1922  |Movie  |TV-MA |
|1983  |TV Show|TV-MA |
|1994  |TV Show|TV-MA |
|2,215 |Movie  |TV-MA |
|3022  |Movie  |R     |
|Oct-01|Movie  |TV-14 |
|Feb-09|TV Show|TV-14 |
|22-Jul|Movie  |R     |
|15-Aug|Movie  |TV-14 |
|'89   |Movie  |TV-PG |
+------+-------+------+
only showing top 20 rows



In [49]:
df_netflix_ratings = df_netflix_model.join(df_movie_ratings, 'title',) \
                                                .select(df_movie_ratings['weighted_average_vote'], 
                                                df_netflix_model['title'],
                                                df_netflix_model['type'])

df_netflix_rating_clean = df_netflix_ratings.na.drop()
df_netflix_rating_clean.show()                                         


+---------------------+--------------------+-------+
|weighted_average_vote|               title|   type|
+---------------------+--------------------+-------+
|                    6|            The Trap|  Movie|
|                    6|Sweeney Todd: The...|  Movie|
|                    7|             Pandora|  Movie|
|                    6|         The Fighter|  Movie|
|                    7|                Amar|  Movie|
|                    7|              Gumrah|  Movie|
|                    6|            Jonathan|  Movie|
|                    7|          Dad's Army|TV Show|
|                    8|            Bawarchi|  Movie|
|                    8|            Veronica|  Movie|
|                    7|              Faraar|  Movie|
|                    8|     Chashme Buddoor|  Movie|
|                    7|             Requiem|TV Show|
|                    5|                 Joy|  Movie|
|                    6|               Pukar|  Movie|
|                    5|              Sahara|  

In [50]:
df_netflix_rating_clean.sort(df_netflix_rating_clean.weighted_average_vote.desc()).show(truncate=False)

+---------------------+---------------------+-------+
|weighted_average_vote|title                |type   |
+---------------------+---------------------+-------+
|9                    |Breakout             |TV Show|
|9                    |Innocent             |TV Show|
|8                    |Bawarchi             |Movie  |
|8                    |Veronica             |Movie  |
|8                    |Brother              |Movie  |
|8                    |My Sassy Girl        |TV Show|
|8                    |Chashme Buddoor      |Movie  |
|8                    |Dallas Buyers Club   |Movie  |
|8                    |Magnolia             |Movie  |
|8                    |I Am Kalam           |Movie  |
|8                    |Company              |Movie  |
|8                    |Casino Royale        |Movie  |
|8                    |Sin City             |Movie  |
|8                    |Ip Man               |Movie  |
|8                    |Dev.D                |Movie  |
|8                    |The H

In [51]:
df_netflix = df_netflix_rating_clean.sort(df_netflix_rating_clean.weighted_average_vote.asc()).show(truncate=False)


+---------------------+--------------------------------------+-------+
|weighted_average_vote|title                                 |type   |
+---------------------+--------------------------------------+-------+
|2                    |Amy                                   |Movie  |
|2                    |Jackpot                               |Movie  |
|2                    |Killers                               |Movie  |
|2                    |Battle                                |Movie  |
|3                    |Knock Knock                           |Movie  |
|3                    |Death House                           |Movie  |
|3                    |Detention                             |TV Show|
|3                    |The Last Resort                       |Movie  |
|3                    |Haunted                               |TV Show|
|3                    |A Scandall                            |Movie  |
|3                    |Drive                                 |Movie  |
|3    

In [67]:
df_ratings.show()

df_text = df_ratings.select('imdb_title_id', 'weighted_average_vote')
df_text.show()

+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|imdb_title_id|weighted_average_vote|total_votes|mean_vote|median_vote|vote_10|vote_9|vote_8|vote_7|vote_6|
+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|    tt0000009|                  5.9|        154|      5.9|          6|     12|     4|    10|    43|    28|
|    tt0000574|                  6.1|        589|      6.3|          6|     57|    18|    58|   137|   139|
|    tt0001892|                  5.8|        188|      6.0|          6|      6|     6|    17|    44|    52|
|    tt0002101|                  5.2|        446|      5.3|          5|     15|     8|    16|    62|    98|
|    tt0002130|                    7|       2237|      6.9|          7|    210|   225|   436|   641|   344|
|    tt0002199|                  5.7|        484|      5.8|          6|     33|    15|    48|    80|   123|
|    tt0002423|             

In [72]:
df_text.dtypes

[('imdb_title_id', 'string'), ('weighted_average_vote', 'string')]

In [40]:
!pip install vaderSentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer



In [81]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
analyser = SentimentIntensityAnalyzer()
vader_udf = udf(lambda text: analyser.polarity_scores(text).get('pos'), FloatType())

In [82]:
# changed weight avg vote to String in the schema (in the chapt10 sentiment notes - string appeared to work, float type does not)
# however, the positivity is still showing as 0 not sure why - also tried changing the Float in the previous code section/import the StringType - no luck

df_sentiment = df_text.withColumn("positivity", vader_udf("weighted_average_vote"))
df_sentiment.show()

+-------------+---------------------+----------+
|imdb_title_id|weighted_average_vote|positivity|
+-------------+---------------------+----------+
|    tt0000009|                  5.9|       0.0|
|    tt0000574|                  6.1|       0.0|
|    tt0001892|                  5.8|       0.0|
|    tt0002101|                  5.2|       0.0|
|    tt0002130|                    7|       0.0|
|    tt0002199|                  5.7|       0.0|
|    tt0002423|                  6.8|       0.0|
|    tt0002445|                  6.2|       0.0|
|    tt0002452|                  6.7|       0.0|
|    tt0002461|                  5.5|       0.0|
|    tt0002646|                  6.6|       0.0|
|    tt0002844|                    7|       0.0|
|    tt0003014|                  7.1|       0.0|
|    tt0003037|                    7|       0.0|
|    tt0003102|                  6.2|       0.0|
|    tt0003131|                  6.5|       0.0|
|    tt0003165|                    7|       0.0|
|    tt0003167|     

In [73]:
df_ratings.show()

df_text_2 = df_ratings.select('imdb_title_id', 'total_votes')
df_text_2.show()

df_text_2.dtypes

+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|imdb_title_id|weighted_average_vote|total_votes|mean_vote|median_vote|vote_10|vote_9|vote_8|vote_7|vote_6|
+-------------+---------------------+-----------+---------+-----------+-------+------+------+------+------+
|    tt0000009|                  5.9|        154|      5.9|          6|     12|     4|    10|    43|    28|
|    tt0000574|                  6.1|        589|      6.3|          6|     57|    18|    58|   137|   139|
|    tt0001892|                  5.8|        188|      6.0|          6|      6|     6|    17|    44|    52|
|    tt0002101|                  5.2|        446|      5.3|          5|     15|     8|    16|    62|    98|
|    tt0002130|                    7|       2237|      6.9|          7|    210|   225|   436|   641|   344|
|    tt0002199|                  5.7|        484|      5.8|          6|     33|    15|    48|    80|   123|
|    tt0002423|             

[('imdb_title_id', 'string'), ('total_votes', 'int')]

In [70]:
# also tried with total votes to test it out (switched total_votes schema to string as well) - did not work 

df_sentiment = df_text_2.withColumn("positivity", vader_udf("total_votes"))
df_sentiment.show()

+-------------+-----------+----------+
|imdb_title_id|total_votes|positivity|
+-------------+-----------+----------+
|    tt0000009|        154|       0.0|
|    tt0000574|        589|       0.0|
|    tt0001892|        188|       0.0|
|    tt0002101|        446|       0.0|
|    tt0002130|       2237|       0.0|
|    tt0002199|        484|       0.0|
|    tt0002423|        753|       0.0|
|    tt0002445|        273|       0.0|
|    tt0002452|        198|       0.0|
|    tt0002461|        225|       0.0|
|    tt0002646|        331|       0.0|
|    tt0002844|       1944|       0.0|
|    tt0003014|        948|       0.0|
|    tt0003037|       1349|       0.0|
|    tt0003102|        100|       0.0|
|    tt0003131|        124|       0.0|
|    tt0003165|       1050|       0.0|
|    tt0003167|        187|       0.0|
|    tt0003419|       1768|       0.0|
|    tt0003471|        552|       0.0|
+-------------+-----------+----------+
only showing top 20 rows



In [74]:
vader_neg_udf = udf(lambda text: analyser.polarity_scores(text).get('neg'), FloatType())

In [78]:
# testing to see if maybe negativity would work - did not
# I imagine it has something to do with the data - but can't figure out what exactly 

df_sentiment = df_text.withColumn("negativity", vader_neg_udf("weighted_average_vote"))
df_sentiment.show(truncate = False)

+-------------+---------------------+----------+
|imdb_title_id|weighted_average_vote|negativity|
+-------------+---------------------+----------+
|tt0000009    |5.9                  |0.0       |
|tt0000574    |6.1                  |0.0       |
|tt0001892    |5.8                  |0.0       |
|tt0002101    |5.2                  |0.0       |
|tt0002130    |7                    |0.0       |
|tt0002199    |5.7                  |0.0       |
|tt0002423    |6.8                  |0.0       |
|tt0002445    |6.2                  |0.0       |
|tt0002452    |6.7                  |0.0       |
|tt0002461    |5.5                  |0.0       |
|tt0002646    |6.6                  |0.0       |
|tt0002844    |7                    |0.0       |
|tt0003014    |7.1                  |0.0       |
|tt0003037    |7                    |0.0       |
|tt0003102    |6.2                  |0.0       |
|tt0003131    |6.5                  |0.0       |
|tt0003165    |7                    |0.0       |
|tt0003167    |5.8  