## Movielens

In [8]:
import pandas as pd
import numpy as np
import os
import warnings
import datetime

pd.set_option('display.float_format', lambda x : '{:,.2f}'.format(x))
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', None)

In [9]:
if not ('sc' in locals() or 'sc' in globals()):
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('spark://spark-master:7077')
    conf.set('spark.executor.memory', '512m')
    conf.set('spark.app.name', 'basics')


    sc = SparkContext.getOrCreate(SparkContext(conf=conf))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

## Context
### Exercise 1
__We want to know the 25 most rated movie titles from this data. How many times a movie has been rated?__

#### Users
Users: This file name is kept as “u.user”, The columns in this file are:

```python
['user_id', 'age', 'sex', 'occupation', 'zip_code']
```

#### Ratings
Ratings: This file name is kept as “u.data”, The columns in this file are:

```python
['user_id', 'movie_id', 'rating', 'unix_timestamp']
```

#### Movies
Movies: This file name is kept as “u.item”, The columns in this file are:

```python
['movie_id', 'title', 'release_date', 'video_release_date', 'imdb_url', and 18 more columns.....]
```

#### Load data
The firs step is to see how the data looks like

In [15]:
userRDD = sc.textFile("../data/u.user")
userRDD.take(1)

['1|24|M|technician|85711']

In [16]:
ratingRDD = sc.textFile("../data/u.data")
ratingRDD.take(1)

['196\t242\t3\t881250949']

In [17]:
movieRDD = sc.textFile("../data/u.item")
movieRDD.take(1)

['1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0']

#### Process

After looking the shape of the data we want to convert this raw data into a format easy to handle

In [21]:
# Create a RDD from RatingRDD that only contains the two columns of interest i.e. movie_id,rating.
RDD_movid_rating = ratingRDD.map(lambda x: (x.split('\t')[0], x.split('\t')[1]))
RDD_movid_rating.take(1)

[('196', '242')]

In [22]:
# Create a RDD from MovieRDD that only contains the two columns of interest i.e. movie_id,title.
RDD_movid_title = movieRDD.map(lambda x: (x.split('|')[0], x.split('|')[1]))
RDD_movid_title.take(1)

[('1', 'Toy Story (1995)')]

#### Merge

Merge these two pair RDDs based on movie_id. For this we will use the transformation [leftOuterJoin()](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.leftOuterJoin.html)

In [24]:
rdd_movid_title_rating = RDD_movid_rating.leftOuterJoin(RDD_movid_title)
rdd_movid_title_rating.take(3)

                                                                                

[('244', ('51', "Smilla's Sense of Snow (1997)")),
 ('244', ('815', "Smilla's Sense of Snow (1997)")),
 ('244', ('756', "Smilla's Sense of Snow (1997)"))]

Use the RDD in previous step to create (movie, 1) tuple pair RDD

In [27]:
rdd_title_rating = rdd_movid_title_rating.map(lambda x: (x[1][1], 1))
rdd_title_rating.take(5)

[("Smilla's Sense of Snow (1997)", 1),
 ("Smilla's Sense of Snow (1997)", 1),
 ("Smilla's Sense of Snow (1997)", 1),
 ("Smilla's Sense of Snow (1997)", 1),
 ("Smilla's Sense of Snow (1997)", 1)]

Use the reduceByKey transformation to reduce on the basis of movie_title

In [None]:
rdd_title_ratingcnt = rdd_title_rating

In [None]:
for movie in rdd_title_ratingcnt

### Exercise 2

__Join all the sentences into a joined one__

In [28]:
top = (((ratingRDD.map(lambda x : (x.split("\t")[1],
                                   x.split("\t")[2]
                                  )
                      )
        )
        .leftOuterJoin(movieRDD.map(lambda x : (x.split("|")[0],
                                                x.split("|")[1]
                                               )
                                   )
                      )
       )
       .map(lambda x: (x[1][1],
                       1)
           )
       .reduceByKey(lambda x,y: x+y)
       .takeOrdered(25,lambda x:-x[1])
      )

                                                                                

In [29]:
for movies in top:
    print(movies)

('Star Wars (1977)', 583)
('Contact (1997)', 509)
('Fargo (1996)', 508)
('Return of the Jedi (1983)', 507)
('Liar Liar (1997)', 485)
('English Patient, The (1996)', 481)
('Scream (1996)', 478)
('Toy Story (1995)', 452)
('Air Force One (1997)', 431)
('Independence Day (ID4) (1996)', 429)
('Raiders of the Lost Ark (1981)', 420)
('Godfather, The (1972)', 413)
('Pulp Fiction (1994)', 394)
('Twelve Monkeys (1995)', 392)
('Silence of the Lambs, The (1991)', 390)
('Jerry Maguire (1996)', 384)
('Chasing Amy (1997)', 379)
('Rock, The (1996)', 378)
('Empire Strikes Back, The (1980)', 367)
('Star Trek: First Contact (1996)', 365)
('Back to the Future (1985)', 350)
('Titanic (1997)', 350)
('Mission: Impossible (1996)', 344)
('Fugitive, The (1993)', 336)
('Indiana Jones and the Last Crusade (1989)', 331)


### Exercise 3

__Now we want to find the most highly rated 25 movies using the same dataset. We actually want only those movies which have been rated at least 100 times.__

We already have the RDD rdd_movid_title_rating:

In [30]:
rdd_movid_title_rating.take(2)

[('244', ('51', "Smilla's Sense of Snow (1997)")),
 ('244', ('815', "Smilla's Sense of Snow (1997)"))]

We create an RDD that contains sum of all the ratings for a particular movie

In [None]:
rdd_title_ratingsum = (rdd_movid_title_rating
                       .map(lambda x: (x[1][1], 
                                       int(x[1][0])
                                      )
                           )
                       .reduceByKey(lambda x, y: x + y)
                      )
rdd_title_ratingsum.take(2)

In [None]:
['nombre', ['rating', 'count']]

['nombre', ['avg rating', 'count']]

'avg rating' = 'rating' / 'count'

['nombre', ['rating' / 'count', 'count']]

Merge this data with the RDD rdd_title_ratingcnt we created in the last step


And use Map function to divide ratingsum by rating count.

In [None]:
rdd_title_ratingmean_rating_count = 

We could use take ordered here only but we want to only get the movies which have count

of ratings more than or equal to 100 so lets filter the data RDD.

In [None]:
rdd_title_rating_rating_count_gt_100 = 

In [None]:
print("rdd_title_rating_rating_count_gt_100: ", rdd_title_rating_rating_count_gt_100.take(1))


In [None]:
print ("25 highly rated movies:")
for i in rdd_title_rating_rating_count_gt_100.takeOrdered(25, lambda x:-x[1][0]):
    print(i)