# 202 Spark - Movielens

The goal of this lab is to run some analysis on the [MovieLens](https://grouplens.org/datasets/movielens/) dataset.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [PySpark RDD APIs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)

This lab's notebook is in the ```material``` folder; the solutions will be released in the same folder.

The cluster configuration is the same from 201.
- Clone the previous cluster
- Update the addresses in Putty

Download the dataset [here](https://big.csr.unibo.it/downloads/bigdata/ml-dataset.zip), unzip it and upload the files to S3.

- ml_movies.csv (<u>movieId</u>:Long, title:String, genres:String) 
    - genres are separated by pipelines  (e.g., "comedy|drama|action")
    - each movie is associated with many ratings

- ml_ratings.csv (<u>userId</u>:Long, <u>movieId</u>:Long, rating:Double, year:Int)
    - each rating is associated with many tags
- ml_tags.csv (<u>userId</u>:Long, <u>movieId</u>:Long, <u>tag</u>:String, year:Int) 

In [1]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [6]:
bucketname = "univ-tours-bd2223-egallinucci"

path_ml_movies = "s3a://"+bucketname+"/datasets/movielens/ml-movies.csv"
path_ml_ratings = "s3a://"+bucketname+"/datasets/movielens/ml-ratings.csv"
path_ml_tags = "s3a://"+bucketname+"/datasets/movielens/ml-tags.csv"

sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1669799482197_0002/'

In [3]:
commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
pipeRegex = "\\|(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"

quotes = "\""
def parseText(title):
    try:
        title = title.strip()
        while (re.match(quotes,title[0:1])):
            title = title[1:]
        while (re.match(quotes,title[len(title)-1:])):
            title = title[:len(title)-1]
        return title
    except:
        return ""

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from itertools import islice
from datetime import datetime
import re

rddMovies = spark.read.option("header","true").csv(path_ml_movies).rdd.map(lambda row: (int(row[0]),parseText(row[1]),row[2]))
rddRatings = spark.read.option("header","true").csv(path_ml_ratings).rdd.map(lambda row: (int(row[0]),int(row[1]),float(row[2]),datetime.utcfromtimestamp(int(row[3])).strftime('%Y')))
#rddTags = spark.read.option("header","true").csv(path_ml_tags).rdd.map(lambda row: (int(row[0]),int(row[1]),parseText(row[2]),datetime.utcfromtimestamp(int(row[3])).strftime('%Y')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 303-1 Datasets exploration

Cache the dataset and answer the following questions:

- How many (distinct) users, movies, ratings, and tags?
- How many (distinct) genres?
- On average, how many ratings per user?
- On average, how many ratings per movie?
- On average, how many genres per movie?
- What is the range of ratings?
- On average, how many ratings per year?

In [8]:
rddMoviesCached = rddMovies.cache()
rddRatingsCached = rddRatings.cache()
#rddTagsCached = rddTags.cache()

rddMoviesCached.count()
rddRatingsCached.count()
#rddTagsCached.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

27753444

In [9]:
"Number of movies: " + str(rddMoviesCached.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of movies: 58098'

In [10]:
"Number of ratings: " + str(rddRatingsCached.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of ratings: 27753444'

In [11]:
"Number of users: " + str(rddRatingsCached.map(lambda x: (x[0])).distinct().count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of users: 283228'

In [12]:
"Number of genres: " + str(
    rddMoviesCached.\
    flatMap(lambda x: x[2].split('|')).\
    distinct().count()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of genres: 20'

In [13]:
res = rddRatingsCached.\
    map(lambda x: (x[0],1)).\
    reduceByKey(lambda a,b: a+b).\
    map(lambda x: (x[1],1)).\
    reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
"Number of ratings per user: " + str(res[0]/res[1])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of ratings per user: 97.98976089934611'

In [14]:
res = rddRatingsCached.\
    map(lambda x: (x[1],1)).\
    reduceByKey(lambda a,b: a+b).\
    map(lambda x: (x[1],1)).\
    reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
"Number of ratings per movie: " + str(res[0]/res[1])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of ratings per movie: 515.0113010076268'

In [15]:
res = rddMoviesCached.\
    map(lambda x: (len(x[2].split('|')),1)).\
    reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
"Number of genres per movie: " + str(res[0]/res[1])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of genres per movie: 1.8263451409687081'

In [16]:
"Range of ratings: " +\
    str(rddRatingsCached.map(lambda x: x[2]).min()) +\
    " to " +\
    str(rddRatingsCached.map(lambda x: x[2]).max())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Range of ratings: 0.5 to 5.0'

In [17]:
res = rddRatingsCached.\
    map(lambda x: (x[3],1)).\
    reduceByKey(lambda a,b: a+b).\
    map(lambda x: (x[1],1)).\
    reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
"Number of ratings per year: " + str(res[0]/res[1])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Number of ratings per year: 1156393.5'

## 303-2 Compute the average rating for each movie

- Export the result to S3
- Do not start from cached RDDs
- Evaluate:
  - Join-and-Aggregate vs Aggregate-and-Join
  - Best join vs broadcast
- Use Tableau to check the results
  - Download the file from S3 instead of connecting to S3

In [18]:
path_output_avgRatPerMovie = "s3a://"+bucketname+"/spark/avgRatPerMovie-test"
# rdd.coalesce(1).toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerMovie)

for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
    rdd.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

JavaObject id=o363
JavaObject id=o365
JavaObject id=o367
JavaObject id=o369

### Join-and-Aggregate vs Aggregate-and-Join

In [None]:
rddMoviesKV = rddMovies.map(lambda x: ((x[0]),(x[1])))
rddRatings.\
    map(lambda x: (x[1],x[2])).\
    join(rddMoviesKV).\
    map(lambda x: (x[1][1],(x[1][0],1))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    map(lambda x: (x[0], x[1][0]/x[1][1], x[1][1])).\
    coalesce(1).\
    collect()
    #toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerMovie)

In [20]:
rddMoviesKV = rddMovies.map(lambda x: ((x[0]),(x[1])))
rddRatings.\
    map(lambda x: (x[1],(x[2],1))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    mapValues(lambda x: (x[0]/x[1], x[1])).\
    join(rddMoviesKV).\
    map(lambda x: (x[1][1],x[1][0][0],x[1][0][1])).\
    coalesce(1).\
    collect()
    toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerMovie)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Most of the time is spent in just reading the data; nonetheless, aggregating BEFORE joining effectively reduces execution times

### Best join vs broadcast

In [22]:
rddMoviesKV = rddMovies.map(lambda x: ((x[0]),(x[1])))
bRddMovies = sc.broadcast(rddMoviesKV.collectAsMap())
rddRatings.\
    map(lambda x: (x[1],(x[2],1))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    mapValues(lambda x: (x[0]/x[1], x[1])).\
    map(lambda x: (x[0],(x[1],bRddMovies.value[x[0]] )) ).\
    map(lambda x: (x[1][1],x[1][0][0],x[1][0][1])).\
    coalesce(1).\
    toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerMovie)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 303-3 Genres

Make a chart of best-ranked genres, export the result to S3, then use Tableau to check it.

Use cached RDDs.

Two possible workflows:

1. Pre-aggregation (3 shuffles)

  - Aggregate ratings by movieId
  - Join with movies and map to genres
  - Aggregate by genres
  
2. Join & aggregate (2 shuffles)

  - Join with movies and map to genres
  - Aggregate by genres



In [23]:
path_output_avgRatPerGenre = "s3a://"+bucketname+"/spark/avgRatPerGenre-test"

for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
    rdd.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Which is better?

1. Pre-aggregation (3 shuffles)

  - Aggregate ratings by movieId
    - Input: 724MB (there are 28M ratings, it's ~26B per rating)
    - Output: ~1.5MB (there are 58K movies)
  - Join with movies and map to genres
    - Input: ~1.5MB + 2.7MB (there are 58K movies, it's ~47B per movie)
    - Output: ~6MB (considering 47B per record and 2 genres per movie)
  - Aggregate by genres
    - Input: ~6MB
    - Output: ~1KB (considering 47B per record and that there are 20 genres)
  
2. Join & aggregate (2 shuffles)

  - Join with movies and map to genres
    - Input: 724MB + 2.7MB
    - Output: ~1.4GB (considering 47B per record and 2 genres per movie)
  - Aggregate by genres
    - Input: ~1.4GB
    - Output: ~1KB

Let's verify it.

In [24]:
rddMoviesKV = rddMovies.map(lambda x: ((x[0]),(x[2]))).flatMapValues(lambda x: x.split('|'))
rddRatings.\
    map(lambda x: (x[1],(x[2],1))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    join(rddMoviesKV).\
    map(lambda x: (x[1][1],(x[1][0][0],x[1][0][1]))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    map(lambda x: (x[0], x[1][0]/x[1][1], x[1][1])).\
    coalesce(1).\
    toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerMovie)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
rddMoviesKV = rddMovies.map(lambda x: ((x[0]),(x[2]))).flatMapValues(lambda x: x.split('|'))
rddRatings.\
    map(lambda x: (x[1],x[2])).\
    join(rddMoviesKV).\
    map(lambda x: (x[1][1],(x[1][0],1))).\
    reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).\
    map(lambda x: (x[0], x[1][0]/x[1][1], x[1][1])).\
    coalesce(1).\
    toDF().write.format("csv").mode('overwrite').save(path_output_avgRatPerGenre)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The first one is definetely better!