# Big Data Analytics with MovieLens Dataset

In this Jupyter Notebook, we will use the [MovieLens 20M Dataset](https://grouplens.org/datasets/movielens/20m/) on movie ratings to answer several tasks by using `pySpark`. The exercises are structured as a guideline to get familiar with the pyspark syntax. Have also a look on the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html). 

**Introduction to Movielens dataset**

The Introduction exercises have the following goals:
- Reading and understanding the schema of our movielens dataset
- Calculating some summary statistics of our dataset
- Learn how to perform joins and aggregations using Spark
- Sample exercise 1: Which movies are the most popular ones?
- Sample exercise 2: Creating RDD's


**Exercises**
- Basic exercise 1: Which movies have the highest number of ratings?
- Basic exercise 2: Something with RDD's
- Advanced exercise 1: Which movies are a matter of taste?

**Remark**: Pyspark needs to be installed to work with the Jupyter Notebook



## Initialize Sparksession

Execute the following cell to initialize a Sparksession:

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MovieLens').getOrCreate()

## Introduction: Reading the dataset

Our movielens dataset contains 20 million ratings and 465'000 tag applications applied to 27'000 movies by 138'000 users. It also includs tag genome data with 12 million relevance scores across 1100 tags.

The whole dataset contains six CSV files:
- genome-scores.csv
- genome-tags.csv
- links.csv
- movies.csv
- ratings.csv
- tags.csv

First, we will have a look on the **`movies`** and **`ratings`** dataframes.

To read a CSV file in our "ml-20m" folder, we access the `DataFrameReader` class through `read` and call the `csv()` method on it. We also specify `option("header", "true")` since the first row of the file contains our column names. 

In [2]:
ratings = spark.read.option("header", "true").csv("ml-20m/ratings.csv")
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



Each tuple of the `ratings` DataFrame represents one rating (`rating`) for one movie (`movieId`) by one user (`userId`). The ratings ranges from 0.5 stars (worst) up to 5.0 stars (best). 

We can also have look on the Schema of our dataset (column names and types) by using the `printSchema()` method.

In [3]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



Do the same for the `movies.csv` file. What kind of data is available and how does the schema look like?

In [4]:
movies = spark.read.option("header", "true").csv("ml-20m/movies.csv")
movies.show(5)

movies.printSchema()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



## Intro exercise 1: Which movies are the most popular ones?

To get the most popular movies, we are looking for the movies with the highest number of ratings. In this task, we assume the number of ratings as a representative for the number of views. To do this, we will perform the following *transformations* on the `ratings` DataFrame: 
- group by `movieId`
- count the number of users (`userId`) associated with each movie 
- rename this column to `num_ratings`
- sort by `num_ratings` in descending order 

We do these transformations in `pySpark` and store the DataFrame as `most_popular`. Have also a look on the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html).

**HINT**:
- Use `agg(count())` to perform an aggregate calculation on grouped data. 
- Don't forget that transformations are [lazy](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations) in spark. We need to call an action (e.g. `show()`, `take()`) explicitly to see the results. 

In [5]:
from pyspark.sql.functions import *

most_popular = ratings.groupBy("movieId").agg(count("userId")).withColumnRenamed("count(userId)", "num_ratings").sort(desc("num_ratings"))

most_popular.show(5)

+-------+-----------+
|movieId|num_ratings|
+-------+-----------+
|    296|      67310|
|    356|      66172|
|    318|      63366|
|    593|      63299|
|    480|      59715|
+-------+-----------+
only showing top 5 rows



Unfortunately, the resulting table only contains `movieId` and `num_ratings`. The title of the movie is stored in the `movies` DataFrame. So, we need an inner join of our `most_popular` DataFrame with the `movies` DataFrame on `movieId`.

In [6]:
most_popular_movies = most_popular.join(movies, most_popular.movieId == movies.movieId)
most_popular_movies.show(5)

+-------+-----------+-------+--------------------+--------------------+
|movieId|num_ratings|movieId|               title|              genres|
+-------+-----------+-------+--------------------+--------------------+
|    296|      67310|    296| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    356|      66172|    356| Forrest Gump (1994)|Comedy|Drama|Roma...|
|    318|      63366|    318|Shawshank Redempt...|         Crime|Drama|
|    593|      63299|    593|Silence of the La...|Crime|Horror|Thri...|
|    480|      59715|    480|Jurassic Park (1993)|Action|Adventure|...|
+-------+-----------+-------+--------------------+--------------------+
only showing top 5 rows



We now have a list of the most popular (or most rated) movies of our movielens dataset. 

## Intro exercise 2: Understanding RDD's (Resilient Distributed Datasets)

--> still ongoing

In [10]:
# Load the data into RDD
data = sc.textFile("ml-20m/ratings.csv")

# Split the RDD 
ratings = data.map(lambda l: l.split(','))
ratings.take(5)

[['userId', 'movieId', 'rating', 'timestamp'],
 ['1', '2', '3.5', '1112486027'],
 ['1', '29', '3.5', '1112484676'],
 ['1', '32', '3.5', '1112484819'],
 ['1', '47', '3.5', '1112484727']]

In [12]:
# map
Reversed = ratings.map(lambda rating: rating[::-1])
Reversed.take(5)

[['timestamp', 'rating', 'movieId', 'userId'],
 ['1112486027', '3.5', '2', '1'],
 ['1112484676', '3.5', '29', '1'],
 ['1112484819', '3.5', '32', '1'],
 ['1112484727', '3.5', '47', '1']]

In [18]:
# flatMap
characters = ratings.flatMap(lambda rating: list(rating))
characters.take(5)

['userId',
 'movieId',
 'rating',
 'timestamp',
 '1',
 '2',
 '3.5',
 '1112486027',
 '1',
 '29']

## Basic exercise 1: Which movies have the highest ratings (in average)?

Now we want to see which movies are rated to be the best. We will use the `ratings` DataFrame and: 

- Group by `movieId` 
- Calculate the average rating for each movie and rename this column to `avg_rating`
- Sort by `avg_rating` in descending order 
- Join the resulting DataFrame with the `movies` DataFrame to get the movienames.

**NOTE** Be sure that you read the movies file in the first part

In [7]:
top_rated = ratings.groupBy("movieId").agg(avg(col("rating"))).withColumnRenamed("avg(rating)", "avg_rating").sort(desc("avg_rating"))

top_rated_movies = top_rated.join(movies, top_rated.movieId == movies.movieId)

top_rated_movies.show(5)

+-------+----------+-------+--------------------+-------------+
|movieId|avg_rating|movieId|               title|       genres|
+-------+----------+-------+--------------------+-------------+
|  95517|       5.0|  95517|Barchester Chroni...|        Drama|
| 109715|       5.0| 109715|Inquire Within (2...|       Comedy|
| 111548|       5.0| 111548|Welcome to Austra...|  Documentary|
| 129905|       5.0| 129905|The Floating Cast...| Comedy|Drama|
|  98761|       5.0|  98761|Shaolin Temple 2:...|Action|Comedy|
+-------+----------+-------+--------------------+-------------+
only showing top 5 rows



The resulting Dataframe is maybe not meaningful. We should also consider the number of ratings by doing an aggregation `agg()` call. 

In [8]:
top_rated = ratings.groupBy("movieId").agg(count("userId"), avg(col("rating"))).withColumnRenamed("count(userId)", "num_ratings").withColumnRenamed("avg(rating)", "avg_rating")

top_rated_movies = top_rated.join(movies, top_rated.movieId == movies.movieId).sort(desc("avg_rating"), desc("num_ratings"))

top_rated_movies.show(5)

+-------+-----------+----------+-------+--------------------+-------------+
|movieId|num_ratings|avg_rating|movieId|               title|       genres|
+-------+-----------+----------+-------+--------------------+-------------+
| 108527|          2|       5.0| 108527|  Catastroika (2012)|  Documentary|
| 103871|          2|       5.0| 103871|Consuming Kids: T...|  Documentary|
| 109715|          1|       5.0| 109715|Inquire Within (2...|       Comedy|
|  98761|          1|       5.0|  98761|Shaolin Temple 2:...|Action|Comedy|
| 129905|          1|       5.0| 129905|The Floating Cast...| Comedy|Drama|
+-------+-----------+----------+-------+--------------------+-------------+
only showing top 5 rows



All of the movies with `avg_rating` of exactly 5.0 have 2 or less `num_ratings` . We must investigate the distribution of `num_ratings` to only consider movies that have a minimum number of ratings. Let's calculate some summary statistics within Spark.

In [35]:
# Calculate average, minimum, and maximum of num_ratings
top_rated_movies.select([mean('num_ratings'), min('num_ratings'), max('num_ratings')]).show(1)

+-----------------+----------------+----------------+
| avg(num_ratings)|min(num_ratings)|max(num_ratings)|
+-----------------+----------------+----------------+
|747.8411232425965|               1|           67310|
+-----------------+----------------+----------------+



In [9]:
top_rated_movies.where("num_ratings > 800").show(5)

+-------+-----------+-----------------+-------+--------------------+--------------------+
|movieId|num_ratings|       avg_rating|movieId|               title|              genres|
+-------+-----------+-----------------+-------+--------------------+--------------------+
|    318|      63366|4.446990499637029|    318|Shawshank Redempt...|         Crime|Drama|
|    858|      41355|4.364732196832306|    858|Godfather, The (1...|         Crime|Drama|
|     50|      47006|4.334372207803259|     50|Usual Suspects, T...|Crime|Mystery|Thr...|
|    527|      50054|4.310175010988133|    527|Schindler's List ...|           Drama|War|
|   1221|      27398|4.275640557704942|   1221|Godfather: Part I...|         Crime|Drama|
+-------+-----------+-----------------+-------+--------------------+--------------------+
only showing top 5 rows



**Testing SparkSQL**

In [64]:
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

rdd = sc.textFile("ml-20m/ratings.csv").map(lambda line: line.split(",").option("header", "true"))


In [66]:

rdd.createGlobalTempView("movielens")

#Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM movielens").show()

AttributeError: 'PipelinedRDD' object has no attribute 'createGlobalTempView'

In [50]:
RDDdata = spark.sql("SELECT * FROM csv.`ml-20m/ratings.csv`").show(5)


+------+-------+------+----------+
|   _c0|    _c1|   _c2|       _c3|
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
+------+-------+------+----------+
only showing top 5 rows



## Basic exercise 2: Something with RDD's

## Advanced exercise 1: Which movies are a matter of taste?

As you know, movies are a matter of taste. There are for sure some movies, which you would rate with a 5 whereas your friend rates the same movie with a 2. These are the movies that divide your opinon. Try to find out, which movies belong to this category.

**HINT**

- We need to consider the standard deviation of the movie ratings
- Also, try to consider only movies that have some minimum number of ratings (e.g. 700) 
- Join with the movies table to get the movie names

In [10]:
ratings_stddev = ratings.groupBy("movieId").agg(count("userId").alias("num_ratings"), 
avg(col("rating")).alias("avg_rating"),stddev(col("rating")).alias("std_rating")).where("num_ratings > 700")

ratings_stddev.show(5)

+-------+-----------+------------------+------------------+
|movieId|num_ratings|        avg_rating|        std_rating|
+-------+-----------+------------------+------------------+
|    296|      67310| 4.174231169217055|0.9760762295742448|
|   1090|      15808| 3.919977226720648|0.8272067263021853|
|   3959|       2869| 3.699372603694667|0.8607671626686736|
|   2294|      10163| 3.303207714257601|0.9047000233824075|
|   6731|       1173|3.5571184995737424| 0.918929235043451|
+-------+-----------+------------------+------------------+
only showing top 5 rows



In [67]:
matterofTaste_movies = ratings_stddev.join(movies, ratings_stddev.movieId == movies.movieId)

matterofTaste_movies.sort(desc("std_rating")).show(5)

+-------+-----------+------------------+------------------+-------+--------------------+--------------------+
|movieId|num_ratings|        avg_rating|        std_rating|movieId|               title|              genres|
+-------+-----------+------------------+------------------+-------+--------------------+--------------------+
|   1924|       2304|2.6319444444444446| 1.420171182322382|   1924|Plan 9 from Outer...|       Horror|Sci-Fi|
|   4255|       1550|2.1351612903225807|1.3504497277537106|   4255|Freddy Got Finger...|              Comedy|
|  78772|        884| 2.670814479638009|1.3485057430514158|  78772|Twilight Saga: Ec...|Fantasy|Romance|T...|
|  72407|       1135| 2.565638766519824|1.3367548401080391|  72407|Twilight Saga: Ne...|Drama|Fantasy|Hor...|
|   7318|       3130| 3.185623003194888| 1.335427370705759|   7318|Passion of the Ch...|               Drama|
+-------+-----------+------------------+------------------+-------+--------------------+--------------------+
only showi

Without any surprise, Twilight is a highly debated movie ;) 