# Spark Analytics with MovieLens Dataset

In this Jupyter Notebook, we will use the [MovieLens 20M Dataset](https://grouplens.org/datasets/movielens/20m/) on movie ratings to answer several tasks by using `PySpark`. The exercises are structured as a guideline to get familiar with the Pyspark syntax. Have also a look on the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html). 

**Introduction to Movielens dataset**

The Introduction exercises have the following goals:
- Reading and understanding the schema of our movielens dataset
- Calculating some summary statistics of our dataset
- Learn how to perform joins and aggregations using Spark

This will be also illustrated by guided exercises to get a first understanding of Spark
- Guided Exercise 1: Which movies are the most popular ones?
- Guided Exercise 2: What are the distinct genres in the Movielens Dataset (RDD)?

**Exercises for you:**
- Exercise 1: Which movies have the highest number of ratings?
- Exercise 2: What's the number of movies in each genre?
- Exercise 3: Which movies are a matter of taste?


## Initialize Sparksession

Execute the following cell to initialize a Sparksession:

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/zulu8.78.0.19-ca-jdk8.0.412-linux_aarch64"
os.environ["SPARK_HOME"] = "/opt/spark"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://rpi0:7077") \
    .appName("MovieLens") \
    .getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/29 21:05:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Introduction: Reading the dataset

Our movielens dataset contains 20 million ratings and 465'000 tag applications applied to 27'000 movies by 138'000 users. It also includs tag genome data with 12 million relevance scores across 1100 tags.

The whole dataset contains six CSV files:
- genome-scores.csv
- genome-tags.csv
- links.csv
- movies.csv
- ratings.csv
- tags.csv

In this Introduction exercise, we will have a look on the **`movies`** and **`ratings`** dataframes.

To read a CSV file in our "ml-20m" folder, we access the `DataFrameReader` class through `read` and call the `csv()` method on it. We also specify `option("header", "true")` since the first row of the file contains our column names. 

In [2]:
ratings = spark.read.option("header", "true").csv("hdfs://rpi0:8020/data/ml-20m/ratings.csv")
ratings.show(5)

                                                                                

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



Each tuple of the `ratings` DataFrame represents one rating (`rating`) for one movie (`movieId`) by one user (`userId`). The ratings ranges from 0.5 stars (worst) up to 5.0 stars (best). 

We can also have look on the Schema of our dataset (column names and types) by using the `printSchema()` method.

In [3]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



Do the same for the `movies.csv` file. What kind of data is available and how does the schema look like?

## Guided Exercise 1: Which movies are the most popular ones?

To get the most popular movies, we are looking for the movies with the highest number of ratings. In this task, we assume the number of ratings as a representative for the most popular movies. To do this, we will perform the following *transformations* on the `ratings` DataFrame: 
- group by `movieId`
- count the number of users (`userId`) associated with each movie 
- rename this column to `num_ratings`
- sort by `num_ratings` in descending order 

We do these transformations in `PySpark` and store the DataFrame as `most_popular`. Have also a look on the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html).

**HINT**:
- Use `agg(count())` to perform an aggregate calculation on grouped data. 
- Don't forget that transformations are [lazy](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations) in spark. We need to call an action (e.g. `show()` for Dataframes, `take()` for RDD's) explicitly to see the results. 

In [4]:
from pyspark.sql.functions import *

most_popular = ratings.groupBy("movieId").agg(count("userId")).withColumnRenamed("count(userId)", "num_ratings").sort(desc("num_ratings"))

most_popular.show(5)



+-------+-----------+
|movieId|num_ratings|
+-------+-----------+
|    296|      67310|
|    356|      66172|
|    318|      63366|
|    593|      63299|
|    480|      59715|
+-------+-----------+
only showing top 5 rows



                                                                                

Unfortunately, the resulting table only contains `movieId` and `num_ratings`. The title of the movie is stored in the `movies` DataFrame. So, we need an inner join of our `most_popular` DataFrame with the `movies` DataFrame on `movieId`.

In [5]:
most_popular_movies = most_popular.join(movies, most_popular.movieId == movies.movieId)
most_popular_movies.select("title", "num_ratings").show(5, truncate = False)

NameError: name 'movies' is not defined

We now have a list of the most popular (or most rated) movies of our movielens dataset. Have you already watched all of them? :)

## Guided Exercise 2: Understanding RDD (Resilient Distributed Datasets) operations

We will have a look on two core data abstractions of Spark, namely Dataframes and RDD's.
RDDs and DataFrames can be created from external data sources (e.g. HDFS, SQL) or from internal process steps. Dataframes the are easiest abstraction. One can compare Dataframes with a traditional table with columns and rows, which is generally used for handling workflows with structured data. If the data is unstructured (has no schema) and the data needs to be manipulated in non-standard ways, one should use RDD's. Even though our data is structured, we will use some operations on RDD's to understand RDD transformations. 

Have a look on the Pyspark Documentation for RDD operations [PySpark Package](https://spark.apache.org/docs/1.5.1/api/python/pyspark.html)



Execute the following cells and try to understand what map(), flatmap() and take() do

In [None]:
# Load the data into RDD
data = sc.textFile("hdfs://rpi0:8020/data/ml-20m/movies.csv")

# Split the RDD 
moviesRDD = data.map(lambda l: l.split(','))
moviesRDD.take(5)

In [None]:
# map
Reversed = moviesRDD.map(lambda m: m[::-1])
Reversed.take(5)

In [None]:
# flatMap
words = moviesRDD.flatMap(lambda m: list(m))
words.take(10)

## Extracting Distinct Movie Genres

Now we will try to combine these operations above by trying to extract all distinct movie genres in our Movielens data. You need to:
- Read the csv file located in ("ml-20m/movies.csv")
- Split the data and select the corresponding genre column via `map()`
- `flatmap()` the data --> **Remark**: Be aware that a movie can contain several genres delimited by `('|')`
- Print the results by`take()` the `distinct()` genres


In [6]:
movies_rdd=sc.textFile("hdfs://rpi0:8020/data/ml-20m/movies.csv")

movies_split= movies_rdd.map(lambda lines:lines.split(','))
genres=movies_split.map(lambda line:(line[2]))
genre=genres.flatMap(lambda l:l.split('|'))
genres_distinct=genre.distinct()

print(genres_distinct.take(10))

[Stage 5:>                                                          (0 + 2) / 2]

['Adventure', 'Children', 'Comedy', 'Romance', 'Drama', 'Action', ' The (1995)"', 'Horror', 'Sci-Fi', 'IMAX']


24/06/29 21:13:33 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
24/06/29 21:13:34 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

## Exercise 1: Which movies have the highest ratings (in average)?

Now it's your turn. We want to see which movies are rated to be the best. You need to use the `ratings` DataFrame and: 

- Group by `movieId` 
- Calculate the average rating for each movie and rename this column to `avg_rating`
- Sort by `avg_rating` in descending order 
- Join the resulting DataFrame with the `movies` DataFrame to get the movienames.


**NOTE** Be sure that you read the movies file

In [None]:
top_rated = 

You will realize that all of the movies with `avg_rating` of exactly 5.0 have 2 or less `num_ratings` . We must investigate the distribution of `num_ratings` to only consider movies that have a minimum number of ratings. Calculate some summary statistics within Spark (consider `mean()`, `min()` and `max()`) and take a decent value to filter your results:

## Exercise 2: What's the number of movies in each genre?

In this exercise, we want to calculate the number of movies in each genre. This exercise is similar to the guided introduction exercise 2:

- Read the csv file located in ("ml-20m/movies.csv")
- Split the data and select the corresponding genre column via `map()`
- `flatmap()` the data --> **Remark**: Be aware that a movie can contain several genres delimited by `('|')`
- Have a look on the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html) and check what the `reduceByKey()` function do. This function is needed to sum up the number of movies in each genre. 
- Sort the results using the `sortBy()` function
- Print the results by`take()`

In [None]:
movies_rdd =

## Exercise 3: Which movies are a matter of taste?

As you know, movies are a matter of taste. There are for sure some movies, which you would rate with a 5 whereas your friend rates the same movie with a 2. These are the movies that divide your opinon. Try to find out, which movies belong to this category.

**HINT**

- We need to consider the standard deviation of the movie ratings
- Also, try to consider only movies that have some minimum number of ratings (e.g. 700) 
- Join with the movies table to get the movie names

In [None]:
matterofTaste_movies = 