In [1]:
# Uncomment this block to run in Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!curl https://dlcdn.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz -o spark-3.1.3-bin-hadoop2.7.tgz
!tar -xzf spark-3.1.3-bin-hadoop2.7.tgz
!pip install -q findspark

import findspark
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"


findspark.init()

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  216M  100  216M    0     0   233M      0 --:--:-- --:--:-- --:--:--  233M


In [3]:
from pathlib import Path
import requests
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, desc, min, avg, max, split, explode, regexp_extract

In [4]:
data_dir = Path('./data')
data_dir.mkdir(exist_ok=True)

ml_data_dir = data_dir/'ml-latest-small'

In [5]:
if not ml_data_dir.exists():
    zip_output_path = data_dir/"ml-latest-small.zip"
    data_url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
    response = requests.get(data_url)
    open(zip_output_path, "wb").write(response.content)
    with zipfile.ZipFile(zip_output_path, 'r') as zip_ref:
        zip_ref.extractall(data_dir)

In [7]:
spark = SparkSession \
            .builder \
            .appName("movielens-nb") \
            .master("local") \
            .config("spark.executor.memory", "512m") \
            .getOrCreate() \

In [8]:
movies_dataset_path = ml_data_dir/"movies.csv"
ratings_dataset_path = ml_data_dir/"ratings.csv"
tags_dataset_path = ml_data_dir/"tags.csv"

assert movies_dataset_path.exists()
assert ratings_dataset_path.exists()
assert tags_dataset_path.exists()

In [9]:
movies = spark.read.csv(str(movies_dataset_path), header=True)
ratings = spark.read.csv(str(ratings_dataset_path), header=True)
tags = spark.read.csv(str(tags_dataset_path), header=True)

## How many movies of genre `drama` are there?

In [10]:
movies.where(movies.genres.contains('Drama')).count()

4361

## How many unique movies are rated, how many are not rated?

In [11]:
print(f"Total number of movies in the dataset: {movies.count()}")

Total number of movies in the dataset: 9742


In [12]:
movie_ratings = movies.join(ratings, on="movieId", how="outer")

In [13]:
num_unique_rated_movies = movie_ratings \
                          .where(movie_ratings.rating.isNotNull()) \
                          .select("movieId") \
                          .distinct() \
                          .count()

print(f"Number of unique rated movies: {num_unique_rated_movies}")

Number of unique rated movies: 9724


In [14]:
num_unique_unrated_movies = movie_ratings \
                          .where(movie_ratings.rating.isNull()) \
                          .select("movieId") \
                          .distinct() \
                          .count()

print(f"Number of unique unrated movies: {num_unique_unrated_movies}")

Number of unique unrated movies: 18


## Who gave the most ratings, how many rates did the person make?

In [15]:
ratings \
  .groupBy("userId") \
  .agg(count("movieId").alias("numRatings")) \
  .sort(desc("numRatings")) \
  .show(1)

+------+----------+
|userId|numRatings|
+------+----------+
|   414|      2698|
+------+----------+
only showing top 1 row



## Compute min, average, max rating per movie.

In [16]:
ratings \
  .groupBy("movieId") \
  .agg(min("rating").alias("minRating"),
       max("rating").alias("maxRating"),
       avg("rating").alias("avgRating")) \
  .show(20)

+-------+---------+---------+------------------+
|movieId|minRating|maxRating|         avgRating|
+-------+---------+---------+------------------+
| 100553|      4.5|      4.5|               4.5|
| 102684|      3.5|      4.0|              3.75|
|   1090|      1.0|      5.0| 3.984126984126984|
| 112911|      0.5|      4.0|               2.0|
| 115713|      0.5|      5.0|3.9107142857142856|
| 117630|      1.0|      1.0|               1.0|
| 119655|      1.0|      3.5|              2.25|
| 120478|      3.5|      5.0| 4.333333333333333|
| 121007|      4.0|      4.0|               4.0|
|   1572|      2.5|      3.5|               3.0|
| 158813|      1.0|      3.0|               2.0|
| 173535|      4.5|      4.5|               4.5|
|   2069|      3.5|      5.0|              4.25|
|   2088|      1.0|      4.0|               2.5|
|   2136|      0.5|      5.0|2.4642857142857144|
|   2162|      1.0|      3.5|               2.5|
|   2294|      1.5|      5.0|3.2444444444444445|
|  26082|      4.0| 

## Output data-set containing users that have rated a movie but not tagged it.

In [17]:
# This join gives us the rows in ratings such that its `userId, movieId`
# combo does not exist in the `tags` table
rate_without_tagging = ratings.join(
    tags,
    how='left_anti',
    on=['userId', 'movieId']
).select('userId').distinct()

In [18]:
print(f'{rate_without_tagging.count()} users rated a movie without tagging it.')

610 users rated a movie without tagging it.


## Output data-set containing users that have rated AND tagged a movie.

In [19]:
rate_and_tag_users = ratings.join(
    tags,
    how='inner',
    on=['userId', 'movieId']
).select('userId').distinct()

In [20]:
print(f'{rate_and_tag_users.count()} users rated a movie and also tagged it.')

54 users rated a movie and also tagged it.


In [21]:
print(f"Total number of users that left ratings: {ratings.select('userId').distinct().count()}")

Total number of users that left ratings: 610


## Output data-set showing the number of movies per genre, per year

In [22]:
movies \
  .withColumn("genre_array", split(movies.genres, "\|")) \
  .withColumn('genre', explode(col('genre_array'))) \
  .withColumn('year', regexp_extract(movies.title, '\((\d+)\)', 1)) \
  .groupBy('genre', 'year') \
  .agg(count("movieId").alias("numMovies")) \
  .orderBy(['genre', 'year'], ascending=False) \
  .show(10) 

+-------+----+---------+
|  genre|year|numMovies|
+-------+----+---------+
|Western|2017|        2|
|Western|2016|        1|
|Western|2015|        4|
|Western|2014|        3|
|Western|2013|        1|
|Western|2012|        1|
|Western|2011|        2|
|Western|2010|        5|
|Western|2009|        2|
|Western|2008|        5|
+-------+----+---------+
only showing top 10 rows

