# Tutorial 2: Exploring the Movielens dataset with Spark Dataframes and Spark SQL

In this session we will use the movielens dataset to introduce the essential features of the Spark DataFrame API and showcase its power. This tutorial also has important links to the Spark documentation and/or other relevant material.

# Prerequisites

## Install Spark Environment
Since we are not running on databricks, we will need to install Spark by ourselves, every time we run the session.  

In [1]:
!pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at /var/folders/sm/lw2bbffs15g5l3r4ldd25hc00000gn/T/ipykernel_64606/2894928871.py:6 

In [3]:
import matplotlib.pyplot as plt
%matplotlib inline

## Optional step : Enabling Spark UI through secure tunnel

This step is useful if you want to look at Spark UI.
First, you need to create a free ngrok account : https://dashboard.ngrok.com/login.  
Then connect on the website and copy your AuthToken.

In [4]:
"""# this step downloads ngrok, configures your AuthToken, then starts the tunnel
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
#!./ngrok authtoken my_ngrok_auth_token_retrieved_from_website # <-------------- change this line !
get_ipython().system_raw('./ngrok http 4050 &')"""

"# this step downloads ngrok, configures your AuthToken, then starts the tunnel\n!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip\n!unzip ngrok-stable-linux-amd64.zip\n#!./ngrok authtoken my_ngrok_auth_token_retrieved_from_website # <-------------- change this line !\nget_ipython().system_raw('./ngrok http 4050 &')"

## Other imports

In [5]:
%matplotlib inline

import urllib
import urllib.request as req
import zipfile
import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# PySpark DataFrame
import pyspark.sql.functions as F




## Downloading and unzipping the data (run only once !)

In [41]:
"""url = 'http://files.grouplens.org/datasets/movielens/ml-20m.zip'
filehandle, _ = urllib.request.urlretrieve(url)
zip_file_object = zipfile.ZipFile(filehandle, 'r')
zip_file_object.namelist()"""

"url = 'http://files.grouplens.org/datasets/movielens/ml-20m.zip'\nfilehandle, _ = urllib.request.urlretrieve(url)\nzip_file_object = zipfile.ZipFile(filehandle, 'r')\nzip_file_object.namelist()"

In [8]:
# zip_file_object.extractall()

In [9]:
!cd ml-20m ; ls

README.txt          genome-tags.csv     movies.csv          [1m[36msampled_ratings.csv[m[m
genome-scores.csv   links.csv           ratings.csv         tags.csv


## Spark DataFrames essentials


### Reading data

In [10]:
movies_path = "ml-20m/movies.csv"
ratings_path = "ml-20m/ratings.csv"

We read the csv files using [`spark.read`](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html)

In [14]:
movies_df = spark.read.options(header=True).csv(movies_path)
ratings_df = spark.read.options(header=True).csv(ratings_path).sample(0.01)

We cache the read dataframes to avoid reloading them in subsequent computation.

In [15]:
movies_df.cache()
ratings_df.cache()

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

We then print a few rows from each dataframe.

In [16]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+


In [17]:
ratings_df.show(5)



+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|   3000|   3.5|1112484569|
|     1|   3265|   3.5|1112484525|
|     3|   1214|   5.0| 944918856|
|     4|    733|   5.0| 840879322|
|     5|    454|   5.0| 851527723|
+------+-------+------+----------+


                                                                                

### Manipulating data

In [18]:
movies_df.select("title").show(5)

+--------------------+
|               title|
+--------------------+
|    Toy Story (1995)|
|      Jumanji (1995)|
|Grumpier Old Men ...|
|Waiting to Exhale...|
|Father of the Bri...|
+--------------------+


In [19]:
ratings_df.filter("rating=5").show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     3|   1214|   5.0| 944918856|
|     4|    733|   5.0| 840879322|
|     5|    454|   5.0| 851527723|
|    10|    858|   5.0| 943497439|
|    11|   1225|   5.0|1230784460|
+------+-------+------+----------+


In [20]:
ratings_df.groupby("userId").agg({"movieId": "count"}).show(5)



+------+--------------+
|userId|count(movieId)|
+------+--------------+
|   675|             1|
|   829|             1|
|  1090|             2|
|  1159|             1|
|  1436|             3|
+------+--------------+


                                                                                

In [21]:
ratings_df.withColumn("is_rating_high", ratings_df["rating"] >= 4).show(5)

+------+-------+------+----------+--------------+
|userId|movieId|rating| timestamp|is_rating_high|
+------+-------+------+----------+--------------+
|     1|   3000|   3.5|1112484569|         false|
|     1|   3265|   3.5|1112484525|         false|
|     3|   1214|   5.0| 944918856|          true|
|     4|    733|   5.0| 840879322|          true|
|     5|    454|   5.0| 851527723|          true|
+------+-------+------+----------+--------------+


In [22]:
ratings_df.withColumn("is_rating_low", ratings_df.rating < 4).show(5)

+------+-------+------+----------+-------------+
|userId|movieId|rating| timestamp|is_rating_low|
+------+-------+------+----------+-------------+
|     1|   3000|   3.5|1112484569|         true|
|     1|   3265|   3.5|1112484525|         true|
|     3|   1214|   5.0| 944918856|        false|
|     4|    733|   5.0| 840879322|        false|
|     5|    454|   5.0| 851527723|        false|
+------+-------+------+----------+-------------+


In [23]:
ratings_df.withColumnRenamed("rating", "note").show(5)

+------+-------+----+----------+
|userId|movieId|note| timestamp|
+------+-------+----+----------+
|     1|   3000| 3.5|1112484569|
|     1|   3265| 3.5|1112484525|
|     3|   1214| 5.0| 944918856|
|     4|    733| 5.0| 840879322|
|     5|    454| 5.0| 851527723|
+------+-------+----+----------+


https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

### Built-in transformations and aggregations

In [24]:
ratings_df.select(F.avg("rating"), F.min("rating"), F.max("rating")).show()

+-----------------+-----------+-----------+
|      avg(rating)|min(rating)|max(rating)|
+-----------------+-----------+-----------+
|3.528868591573276|        0.5|        5.0|
+-----------------+-----------+-----------+


### Joining Dataframes

In [25]:
ratings_df.join(movies_df, "movieId").show(5)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|   3000|     1|   3.5|1112484569|Princess Mononoke...|Action|Adventure|...|
|   3265|     1|   3.5|1112484525|Hard-Boiled (Lat ...|Action|Crime|Dram...|
|   1214|     3|   5.0| 944918856|        Alien (1979)|       Horror|Sci-Fi|
|    733|     4|   5.0| 840879322|    Rock, The (1996)|Action|Adventure|...|
|    454|     5|   5.0| 851527723|    Firm, The (1993)|      Drama|Thriller|
+-------+------+------+----------+--------------------+--------------------+


### User Defined functions (UDFs)

In [26]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType


def length(string: str):
    return len(string)


length_udf = udf(length, LongType())

In [27]:
movies_df.select(length_udf("title")).show(5)

+-------------+
|length(title)|
+-------------+
|           16|
|           14|
|           23|
|           24|
|           34|
+-------------+


In [28]:
@udf("string")
def length2(string: str):
    return len(string)

In [29]:
movies_df.select(length2("title")).show(5)

+--------------+
|length2(title)|
+--------------+
|            16|
|            14|
|            23|
|            24|
|            34|
+--------------+


In [30]:
title_lengths = movies_df.select(length2("title").alias("title_length"))

In [31]:
title_lengths.select(F.max("title_length")).show()

+-----------------+
|max(title_length)|
+-----------------+
|               99|
+-----------------+


In [32]:
title_lengths.select(F.min("title_length")).show()

+-----------------+
|min(title_length)|
+-----------------+
|               10|
+-----------------+


### Query plan inspection and caching

In [33]:
title_lengths.select(F.max("title_length")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max(title_length#1326)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=677]
      +- SortAggregate(key=[], functions=[partial_max(title_length#1326)])
         +- Project [pythonUDF0#1539 AS title_length#1326]
            +- BatchEvalPython [length2(title#18)#1325], [pythonUDF0#1539]
               +- InMemoryTableScan [title#18]
                     +- InMemoryRelation [movieId#17, title#18, genres#19], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- FileScan csv [movieId#17,title#18,genres#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/quyenlinhta/IASD/bigdata/ml-20m/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<movieId:string,title:string,genres:string>


In [34]:
title_lengths.select(F.min("title_length")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[min(title_length#1326)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=699]
      +- SortAggregate(key=[], functions=[partial_min(title_length#1326)])
         +- Project [pythonUDF0#1590 AS title_length#1326]
            +- BatchEvalPython [length2(title#18)#1325], [pythonUDF0#1590]
               +- InMemoryTableScan [title#18]
                     +- InMemoryRelation [movieId#17, title#18, genres#19], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- FileScan csv [movieId#17,title#18,genres#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/quyenlinhta/IASD/bigdata/ml-20m/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<movieId:string,title:string,genres:string>


In [35]:
title_lengths.cache()
title_lengths.select(F.max("title_length")).show()

+-----------------+
|max(title_length)|
+-----------------+
|               99|
+-----------------+


In [36]:
title_lengths.select(F.min("title_length")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[min(title_length#1326)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=775]
      +- SortAggregate(key=[], functions=[partial_min(title_length#1326)])
         +- InMemoryTableScan [title_length#1326]
               +- InMemoryRelation [title_length#1326], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- AdaptiveSparkPlan isFinalPlan=false
                        +- Project [pythonUDF0#1638 AS title_length#1326]
                           +- BatchEvalPython [length2(title#18)#1325], [pythonUDF0#1638]
                              +- InMemoryTableScan [title#18]
                                    +- InMemoryRelation [movieId#17, title#18, genres#19], StorageLevel(disk, memory, deserialized, 1 replicas)
                                          +- FileScan csv [movieId#17,title#18,genres#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemo

### Writing csv

In [37]:
movies_df.sample(0.1).write.csv("ml-20m/movies-sample.csv")

In [38]:
movies_df.sample(0.1).write.mode("overwrite").csv("ml-20m/movies-sample.csv")

Make sure you have written on filesystem correctly.

In [39]:
!cd ml-20m ; ls

README.txt          links.csv           ratings.csv
genome-scores.csv   [1m[36mmovies-sample.csv[m[m   [1m[36msampled_ratings.csv[m[m
genome-tags.csv     movies.csv          tags.csv


In [40]:
import os

os.listdir("ml-20m")

['links.csv',
 'sampled_ratings.csv',
 'movies-sample.csv',
 'tags.csv',
 'genome-tags.csv',
 'ratings.csv',
 'README.txt',
 'genome-scores.csv',
 'movies.csv']

This command writes a dataframe in parquet format :

In [41]:
ratings_parquet_path = "ml-20m/ratings.parquet"
spark.read.options(header=True).csv(ratings_path).write.parquet(ratings_parquet_path)

23/12/21 18:45:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

# Questions

## Question 0: Compare processing time and amount of executors used, when reading from csv versus reading from parquet, for the following pipelines:
- count total amount of records
- count total amount of records for user 1
- distinct count of timestamps

**hint** `countDistinct` method can be used for third pipeline

In [44]:
from prettytable import PrettyTable

In [42]:
def timeit(func):
    import time
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"Execution time: {end - start}")
        return result

    return wrapper

In [43]:
import warnings

warnings.filterwarnings("ignore")

In [75]:
@timeit
def count_records(df):
    return df.count()


@timeit
def count_records_for_user(df, user_id):
    return df.filter(f"userId={user_id}").count()


@timeit
def count_distinct_timestamps(df):
    return df.select(F.countDistinct("timestamp")).show()


for_df = spark.read.options(header=True).csv(ratings_path)
for_parquet = spark.read.parquet(ratings_parquet_path)
print("Counting records for csv:", count_records(for_df))
print("Counting records for parquet:", count_records(for_parquet))

print("Counting records for user 1 for csv:", count_records_for_user(for_df, 1))
print("Counting records for user 1 for parquet:", count_records_for_user(for_parquet, 1))

print("Counting distinct timestamps for csv:")
count_distinct_timestamps(for_df)
print("Counting distinct timestamps for parquet:")
count_distinct_timestamps(for_parquet)

                                                                                

Execution time: 0.9606108665466309
Counting records for csv: 20000263
Execution time: 0.21493005752563477
Counting records for parquet: 20000263


                                                                                

Execution time: 3.397679328918457
Counting records for user 1 for csv: 175


                                                                                

Execution time: 0.9519250392913818
Counting records for user 1 for parquet: 175
Counting distinct timestamps for csv:


23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:51 WARN RowBasedKeyValueBatch: Calling spill() on

+-------------------------+
|count(DISTINCT timestamp)|
+-------------------------+
|                 15351121|
+-------------------------+

Execution time: 9.734538078308105
Counting distinct timestamps for parquet:


23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/19 15:47:59 WARN RowBasedKeyValueBatch: Calling spill() on

+-------------------------+
|count(DISTINCT timestamp)|
+-------------------------+
|                 15351121|
+-------------------------+

Execution time: 6.573633909225464


                                                                                

## Question 1: Compute the (average, max, min) rating per movie, and get the highest and lowest rated movies ?

**hint** Straightforward GroupBy then Aggregate

In [45]:
agg_ratings = ratings_df.groupBy("movieId").agg(
    F.avg("rating").alias("avg_rating"),
    F.max("rating").alias("max_rating"),
    F.min("rating").alias("min_rating")
)

agg_ratings_with_title = agg_ratings.join(movies_df, "movieId")
highest_rated = agg_ratings_with_title.orderBy(F.desc("avg_rating")).first()
lowest_rated = agg_ratings_with_title.orderBy("avg_rating").first()

print("Highest Rated Movie:", highest_rated)
print("Lowest Rated Movie:", lowest_rated)

                                                                                

Highest Rated Movie: Row(movieId='100444', avg_rating=5.0, max_rating='5.0', min_rating='5.0', title="Rumble in the Air-Conditioned Auditorium: O'Reilly vs. Stewart 2012, The (2012)", genres='Comedy')
Lowest Rated Movie: Row(movieId='101243', avg_rating=0.5, max_rating='0.5', min_rating='0.5', title='Klip (Clip) (2012)', genres='Drama|Romance')


## Question 2: Amongst movies that were rated by at least 20 users, what are the movies with highest and lowest rating standard deviation ?

**hint** How do you use a join to keep only a subset of movies ?

In [46]:
std_dev_ratings = ratings_df.groupBy("movieId").agg(
    F.stddev("rating").alias("rating_std_dev"),
    F.count("rating").alias("rating_count")
)

std_dev_ratings_filtered = std_dev_ratings.filter("rating_count >= 20")

std_dev_ratings_with_title = std_dev_ratings_filtered.join(movies_df, "movieId")

highest_std_dev = std_dev_ratings_with_title.orderBy(F.desc("rating_std_dev")).first()
lowest_std_dev = std_dev_ratings_with_title.orderBy("rating_std_dev").first()

print("Movie with Highest Standard Deviation:", highest_std_dev)
print("Movie with Lowest Standard Deviation:", lowest_std_dev)

                                                                                

Movie with Highest Standard Deviation: Row(movieId='711', rating_std_dev=1.459992790176863, rating_count=20, title='Flipper (1996)', genres='Adventure|Children')
Movie with Lowest Standard Deviation: Row(movieId='3196', rating_std_dev=0.44577791412086437, rating_count=27, title='Stalag 17 (1953)', genres='Drama|War')


## Question 3: Compute the (average, max, min) rating per genre and get the highest and lowest rated genres, as well as the ones with the highest rating standard deviation ?

**hint** How can you extract the individual genres from the genres column ? How do you use a custom function to do this ?

In [47]:
from pyspark.sql.functions import explode

exploded_genres = movies_df.withColumn("genre", explode(F.split("genres", "\\|")))

genre_ratings = exploded_genres.join(ratings_df, "movieId")

genre_agg = genre_ratings.groupBy("genre").agg(
    F.avg("rating").alias("avg_rating"),
    F.max("rating").alias("max_rating"),
    F.min("rating").alias("min_rating"),
    F.stddev("rating").alias("std_dev_rating")
)

highest_rated_genre = genre_agg.orderBy(F.desc("avg_rating")).first()
lowest_rated_genre = genre_agg.orderBy("avg_rating").first()

highest_std_dev_genre = genre_agg.orderBy(F.desc("std_dev_rating")).first()

print("Highest Rated Genre:", highest_rated_genre)
print("Lowest Rated Genre:", lowest_rated_genre)
print("Genre with Highest Standard Deviation:", highest_std_dev_genre)

                                                                                

Highest Rated Genre: Row(genre='Film-Noir', avg_rating=4.00327868852459, max_rating='5.0', min_rating='0.5', std_dev_rating=0.8607272946276987)
Lowest Rated Genre: Row(genre='Horror', avg_rating=3.267199514366653, max_rating='5.0', min_rating='0.5', std_dev_rating=1.140700034285225)
Genre with Highest Standard Deviation: Row(genre='Horror', avg_rating=3.267199514366653, max_rating='5.0', min_rating='0.5', std_dev_rating=1.140700034285225)


## Question 4: Extract the year information from the title and compute the average rating per year (for years where more than 10 movies came out), how does the this quantity evolve ?

**hint** Extracting the year from the title can be done with a Regular Expression

In [48]:
from pyspark.sql.functions import regexp_extract

movies_df_with_year = movies_df.withColumn("year", regexp_extract("title", "\\((\\d{4})\\)", 1))
yearly_ratings = ratings_df.join(movies_df_with_year, "movieId")
avg_rating_per_year = yearly_ratings.groupBy("year").agg(F.avg("rating").alias("avg_rating"))
movies_per_year = movies_df_with_year.groupBy("year").count()
years_with_more_than_10_movies = movies_per_year.filter("count > 10").select("year")
result = avg_rating_per_year.join(years_with_more_than_10_movies, "year").orderBy("year")
result.show()

+----+------------------+
|year|        avg_rating|
+----+------------------+
|    |               3.5|
|1915|               4.0|
|1916|               2.5|
|1917|               4.0|
|1919|               3.8|
|1920|3.4285714285714284|
|1921|3.6666666666666665|
|1922|3.8289473684210527|
|1923|3.5833333333333335|
|1924|3.1666666666666665|
|1925|3.9482758620689653|
|1926|          3.953125|
|1927|4.1923076923076925|
|1928| 3.893939393939394|
|1929|3.7916666666666665|
|1930|3.9423076923076925|
|1931|               4.0|
|1932|3.8424657534246576|
|1933| 3.733128834355828|
|1934|3.8472222222222223|
+----+------------------+


## Question 5: What are the top 3 genres per year ?

**hint** Look at the answer here https://stackoverflow.com/questions/38397796/retrieve-top-n-in-each-group-of-a-dataframe-in-pyspark

In [51]:
from pyspark.sql.functions import regexp_extract, explode
from pyspark.sql.window import Window

movies_df_with_year = movies_df.withColumn("year", regexp_extract("title", "\\((\\d{4})\\)", 1))
exploded_genres = movies_df_with_year.withColumn("genre", explode(F.split("genres", "\\|")))
genre_ratings = exploded_genres.join(ratings_df, "movieId")
genre_year_avg = genre_ratings.groupBy("year", "genre").agg(F.avg("rating").alias("avg_rating"))
windowSpec = Window.partitionBy("year").orderBy(F.desc("avg_rating"))

ranked_genres = genre_year_avg.withColumn("rank", F.rank().over(windowSpec))
top_3_genres_per_year = ranked_genres.filter("rank <= 3")
top_3_genres_per_year.show()

[Stage 89:>                                                         (0 + 8) / 8]

+----+---------+------------------+----+
|year|    genre|        avg_rating|rank|
+----+---------+------------------+----+
|    |   Sci-Fi|               3.5|   1|
|    |   Comedy|               3.5|   1|
|1903|    Crime|               3.5|   1|
|1903|  Western|               3.5|   1|
|1915|      War|               4.0|   1|
|1915|    Drama|               4.0|   1|
|1916|Adventure|               4.0|   1|
|1916|   Action|               4.0|   1|
|1916|   Sci-Fi|               4.0|   1|
|1917|    Drama|               4.0|   1|
|1918|      War|               3.0|   1|
|1918|   Comedy|               3.0|   1|
|1919|  Romance|               4.5|   1|
|1919|   Comedy|               4.0|   2|
|1919|    Drama|               3.8|   3|
|1920|    Drama|               4.0|   1|
|1920|   Sci-Fi|               4.0|   1|
|1920|   Horror|3.4285714285714284|   3|
|1921|    Drama|3.7777777777777777|   1|
|1921|   Comedy| 3.772727272727273|   2|
+----+---------+------------------+----+


                                                                                

## Question 6: What words of the titles cooccure the most with each genre ? Is the number of cooccurence enough ? Compute the [pointwise mutual information](https://en.wikipedia.org/wiki/Pointwise_mutual_information) between genres and movie title words, and filter out words that appear fewer than 100 times.

In [50]:
from pyspark.sql.functions import split, explode, size

movies_with_words = movies_df.withColumn("word", explode(split("title", "\\s+")))
genre_word = exploded_genres.join(movies_with_words, "movieId")

word_genre_count = genre_word.groupBy("genre", "word").count()
filtered_word_genre_count = word_genre_count.filter("count >= 100")
total_count = filtered_word_genre_count.groupBy().sum("count").first()[0]
pmi = filtered_word_genre_count.withColumn("pmi", F.log2((F.col("count") / total_count) /
                                                         (F.sum("count").over(
                                                             Window.partitionBy("genre")) / total_count) /
                                                         (F.sum("count").over(
                                                             Window.partitionBy("word")) / total_count)))
pmi.show()

+------+------+-----+------------------+
| genre|  word|count|               pmi|
+------+------+-----+------------------+
|Comedy|     &|  134|1.4457503875762543|
| Drama|     &|  111|0.3952230392473445|
| Drama|(1970)|  116|1.5374451118998091|
| Drama|(1971)|  102|1.5374451118998091|
| Drama|(1972)|  111|1.5374451118998091|
| Drama|(1982)|  106|1.5374451118998091|
| Drama|(1984)|  114|1.5374451118998091|
| Drama|(1985)|  103|1.5374451118998096|
| Drama|(1986)|  117|1.5374451118998091|
|Comedy|(1987)|  131|1.3162991361210525|
| Drama|(1987)|  131|0.5374451118998093|
|Comedy|(1988)|  111|1.1623477708864811|
| Drama|(1988)|  136|0.6765407215654711|
|Comedy|(1989)|  119|1.2922536621544014|
| Drama|(1989)|  123|0.5610963799644545|
|Comedy|(1990)|  109|1.1072035373980624|
| Drama|(1990)|  143|0.7200365251782824|
|Comedy|(1991)|  125|1.1946205795328002|
| Drama|(1991)|  147|0.6496546154858341|
|Comedy|(1992)|  135|1.2132056431569491|
+------+------+-----+------------------+
