In [1]:
#imports
import pyspark
import os
import pandas as pd
import numpy as np

from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS

In [2]:
# Change to the location of data files
dbfs_dir ='s3://dsci6007-f21-mk-finalproject/Netflix/'
train_ratings_filename = dbfs_dir + '/TrainingRatings.txt'
test_ratings_filename = dbfs_dir + '/TestingRatings.txt'
movies_filename = dbfs_dir + '/movie_titles.txt'

### Defining an explicit schema

In [3]:
# MovieID,UserID,Rating
train_ratings_df_schema = StructType(
  [StructField('MovieId', IntegerType()),
   StructField('UserId', IntegerType()),
   StructField('Rating', DoubleType())]
)

# MovieID,UserID,Rating
test_ratings_df_schema = StructType(
  [StructField('MovieId', IntegerType()),
   StructField('UserId', IntegerType()),
   StructField('Rating', DoubleType())]
)
# MovieID,YearOfRelease,Title
movies_df_schema = StructType(
  [StructField('MovieID', IntegerType()),
   StructField('YearOfRelease', IntegerType()),
   StructField('Title', StringType())]
)

#### Load and Cache data

In [4]:
train_raw_ratings_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(train_ratings_df_schema).load(train_ratings_filename)
train_ratings_df = train_raw_ratings_df

test_raw_ratings_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(test_ratings_df_schema).load(test_ratings_filename)
test_ratings_df = test_raw_ratings_df

raw_movies_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('YearOfRelease')

train_ratings_df.cache()
test_ratings_df.cache()
movies_df.cache()

DataFrame[MovieID: int, Title: string]

###  Problem 2: Analyzing the Netflix Data

#### Summary statistics

In [5]:
# test ratings
test_ratings_df.describe().show()

                                                                                

+-------+-----------------+-----------------+------------------+
|summary|          MovieId|           UserId|            Rating|
+-------+-----------------+-----------------+------------------+
|  count|           100477|           100477|            100477|
|   mean| 8701.63431432069|1329963.559849518|3.4794828667257183|
| stddev|5098.027090516451|762504.1354759972|1.0852572463859325|
|    min|                8|                7|               1.0|
|    max|            17742|          2649285|               5.0|
+-------+-----------------+-----------------+------------------+



In [6]:
# train ratings
train_ratings_df.describe().show()



+-------+-----------------+------------------+------------------+
|summary|          MovieId|            UserId|            Rating|
+-------+-----------------+------------------+------------------+
|  count|          3255351|           3255351|           3255351|
|   mean|8724.662781985722|1327058.3376840162|3.4811883572616287|
| stddev|5107.400011271949| 762688.7721136078|1.0828725720369496|
|    min|                8|                 7|               1.0|
|    max|            17742|           2649285|               5.0|
+-------+-----------------+------------------+------------------+



                                                                                

In [7]:
# movies
movies_df.describe().show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------+-----------------+--------------------+
|summary|          MovieID|               Title|
+-------+-----------------+--------------------+
|  count|            17769|               17769|
|   mean|           8886.0|            Infinity|
| stddev|5129.612802151835|                 NaN|
|    min|                2|'Allo 'Allo!: Ser...|
|    max|            17770|                 sex|
+-------+-----------------+--------------------+



                                                                                

In [8]:
train_raw_ratings_count = train_raw_ratings_df.count()
train_ratings_count = train_ratings_df.count()

test_raw_ratings_count = test_raw_ratings_df.count()
test_ratings_count = test_ratings_df.count()

raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()

print('There are %s train_ratings, %s test_ratings and %s movies in the datasets.' % (train_ratings_count, test_ratings_count, movies_count))

print('Train set Ratings:')
train_ratings_df.show(5)

print('Test set Ratings:')
test_ratings_df.show(5)

print('Movies:')
movies_df.show(5, truncate=False)

There are 3255351 train_ratings, 100477 test_ratings and 17769 movies in the datasets.
Train set Ratings:
+-------+-------+------+
|MovieId| UserId|Rating|
+-------+-------+------+
|      8|1395430|   2.0|
|      8|1205593|   4.0|
|      8|1488844|   4.0|
|      8|1447354|   1.0|
|      8| 306466|   4.0|
+-------+-------+------+
only showing top 5 rows

Test set Ratings:
+-------+-------+------+
|MovieId| UserId|Rating|
+-------+-------+------+
|      8|2149668|   3.0|
|      8|1089184|   3.0|
|      8|2465894|   3.0|
|      8| 534508|   1.0|
|      8| 992921|   4.0|
+-------+-------+------+
only showing top 5 rows

Movies:
+-------+----------------------------+
|MovieID|Title                       |
+-------+----------------------------+
|2      |Isle of Man TT 2004 Review  |
|3      |Character                   |
|4      |Paula Abdul's Get Up & Dance|
|5      |The Rise and Fall of ECW    |
|6      |Sick                        |
+-------+----------------------------+
only showing top 

#### Movies with Highest Average Ratings

In [9]:
# movies and their average ratings in test set

test_movie_ids_with_avg_ratings_df = test_ratings_df.groupBy('MovieId').agg(F.count(test_ratings_df.Rating).alias("rating_count"), F.avg(test_ratings_df.Rating).alias("average_rating"))
print('Rating count per movie and average rating per movie:')
test_movie_ids_with_avg_ratings_df.show(5, truncate=False)

Rating count per movie and average rating per movie:


[Stage 30:>                                                         (0 + 1) / 1]21/12/15 21:01:12 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


+-------+------------+------------------+
|MovieId|rating_count|average_rating    |
+-------+------------+------------------+
|2366   |47          |2.851063829787234 |
|11317  |35          |3.085714285714286 |
|13289  |1           |4.0               |
|4190   |3           |4.0               |
|9517   |68          |3.3088235294117645|
+-------+------------+------------------+
only showing top 5 rows



                                                                                

In [10]:
# users and their average ratings in test set

test_movie_ids_with_avg_ratings_df = test_ratings_df.groupBy('UserId').agg(F.count(test_ratings_df.Rating).alias("user_count"), F.avg(test_ratings_df.Rating).alias("average_rating"))
print('Rating count per user and average rating per User:')
test_movie_ids_with_avg_ratings_df.show(5, truncate=False)

Rating count per user and average rating per User:
+-------+----------+-----------------+
|UserId |user_count|average_rating   |
+-------+----------+-----------------+
|2358799|2         |3.5              |
|973051 |4         |4.25             |
|1189060|3         |3.0              |
|2376892|9         |3.888888888888889|
|1628484|10        |2.8              |
+-------+----------+-----------------+
only showing top 5 rows



In [11]:
# movies and their average ratings in train set

train_movie_ids_with_avg_ratings_df = train_ratings_df.groupBy('MovieId').agg(F.count(train_ratings_df.Rating).alias("rating_count"), F.avg(train_ratings_df.Rating).alias("average_rating"))
print('Rating count per movie and average rating per movie:')
train_movie_ids_with_avg_ratings_df.show(5, truncate=False)

Rating count per movie and average rating per movie:
+-------+------------+------------------+
|MovieId|rating_count|average_rating    |
+-------+------------+------------------+
|2366   |1643        |3.036518563603165 |
|4190   |37          |3.054054054054054 |
|3220   |1417        |2.9273112208892025|
|481    |114         |2.7982456140350878|
|3149   |162         |3.2037037037037037|
+-------+------------+------------------+
only showing top 5 rows



In [12]:
# users and their average ratings in train set

train_movie_ids_with_avg_ratings_df = train_ratings_df.groupBy('UserId').agg(F.count(train_ratings_df.Rating).alias("rating_count"), F.avg(train_ratings_df.Rating).alias("average_rating"))
print('Rating count per user and average rating per User:')
train_movie_ids_with_avg_ratings_df.show(5, truncate=False)

Rating count per user and average rating per User:
+-------+------------+------------------+
|UserId |rating_count|average_rating    |
+-------+------------+------------------+
|1896167|91          |3.21978021978022  |
|2358799|107         |3.8130841121495327|
|2531111|185         |3.372972972972973 |
|1497891|143         |3.195804195804196 |
|2629660|180         |3.1333333333333333|
+-------+------------+------------------+
only showing top 5 rows



                                                                                

### 2(a): How many distinct items and users are there in the test set?

In [13]:
test_distinct_users = test_ratings_df.select('UserId').distinct().count()
test_distinct_movies = test_ratings_df.select('MovieId').distinct().count()
print("There are {} distinct users and {} distinct movies(items) in the test set.".format(test_distinct_users, test_distinct_movies))

There are 27555 distinct users and 1701 distinct movies(items) in the test set.


### 2(b): estimated average overlap of items for users

##### target user 1

In [14]:
# pick a user from the test set and extract the items this user has rated in the training set
# how many other users in the training set have rated the same movies as user=534508

# users from test set: 1447354, 534508, 992921

print("The first target user I picked from the test set is UserId:534508")
print(" ")

user_id = 534508
movies_rated_by_target_user_df = train_ratings_df.filter("UserId  == 534508")

print("No of movies rated by the target user in the train set:", movies_rated_by_target_user_df.count())
print(" ")
print('movies rated by target user', user_id, ":")
movies_rated_by_target_user_df.show(5, truncate = False)

The first target user I picked from the test set is UserId:534508
 
No of movies rated by the target user in the train set: 101
 
movies rated by target user 534508 :
+-------+------+------+
|MovieId|UserId|Rating|
+-------+------+------+
|111    |534508|2.0   |
|443    |534508|4.0   |
|452    |534508|4.0   |
|718    |534508|5.0   |
|725    |534508|3.0   |
+-------+------+------+
only showing top 5 rows



In [15]:
#list of movies rated by target user in training set
list_movie_rated_by_target_user = movies_rated_by_target_user_df.select('MovieId')
#print(list_movie_rated_by_target_user)

# converting list to array
movies_array = [int(row.MovieId) for row in list_movie_rated_by_target_user.collect()]
#print(len(movies_array))

# now lets get all the users in the train set, who have rated the same movies as 534508 rated

total_overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)].count()
print("The total number of users in the train set, who have rated the same movies as the target user rated:", total_overlap_item_user)

print("Overlap of an item for target user:")
overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)]
overlap_item_user = overlap_item_user.groupBy('MovieId').agg(F.count(overlap_item_user.UserId).alias("user_count"))
overlap_item_user.show()

The total number of users in the train set, who have rated the same movies as the target user rated: 889999
Overlap of an item for target user:
+-------+----------+
|MovieId|user_count|
+-------+----------+
|   4119|       825|
|   8354|      1925|
|  12293|     20691|
|  14317|      7965|
|   8163|      2483|
|  11888|     15196|
|  14407|     10218|
|   3893|     11799|
|   9481|     12893|
|  12472|       218|
|  16147|     10430|
|   4546|      6189|
|   8596|     23005|
|   5814|     18298|
|  16784|      3521|
|   3170|       793|
|   1305|      8707|
|  10561|      2892|
|  13614|     19316|
|   2251|      3710|
+-------+----------+
only showing top 20 rows



In [16]:
u1 = overlap_item_user.groupBy().avg('user_count').collect()
u1

[Row(avg(user_count)=8811.871287128713)]

In [17]:
overlap_item_user.describe().show()
user_id = 534508 
print("The average overlap of items for first target user", user_id, "is: ", "8812")

+-------+------------------+-----------------+
|summary|           MovieId|       user_count|
+-------+------------------+-----------------+
|  count|               101|              101|
|   mean| 8828.138613861385|8811.871287128713|
| stddev|5445.4343280030525|7279.235960818095|
|    min|               111|               46|
|    max|             17411|            24393|
+-------+------------------+-----------------+

The average overlap of items for first target user 534508 is:  8812


##### target user 2

In [18]:
# pick a user from the test set and extract the items this user has rated in the training set
# how many other users in the training set have rated the same movies as user=534508

# users from test set: 534508, 1447354,992921

print("The second target user I picked from the test set is UserId:1447354")
print(" ")

user_id = 1447354
movies_rated_by_target_user_df = train_ratings_df.filter("UserId  == 1447354")

print("No of movies rated by the target user in the train set:", movies_rated_by_target_user_df.count())
print(" ")
print('movies rated by target user', user_id, ":")
movies_rated_by_target_user_df.show(5, truncate = False)

The second target user I picked from the test set is UserId:1447354
 
No of movies rated by the target user in the train set: 79
 
movies rated by target user 1447354 :
+-------+-------+------+
|MovieId|UserId |Rating|
+-------+-------+------+
|8      |1447354|1.0   |
|417    |1447354|5.0   |
|1140   |1447354|3.0   |
|1305   |1447354|4.0   |
|1367   |1447354|2.0   |
+-------+-------+------+
only showing top 5 rows



In [19]:
#list of movies rated by target user in training set
list_movie_rated_by_target_user = movies_rated_by_target_user_df.select('MovieId')
#print(list_movie_rated_by_target_user)

# converting list to array
movies_array = [int(row.MovieId) for row in list_movie_rated_by_target_user.collect()]
#print(len(movies_array))

# now lets get all the users in the train set, who have rated the same movies as 1447354 rated

total_overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)].count()
print("The total number of users in the train set, who have rated the same movies as the target user rated:", total_overlap_item_user)

print("Overlap of an item for target user:")
overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)]
overlap_item_user = overlap_item_user.groupBy('MovieId').agg(F.count(overlap_item_user.UserId).alias("user_count"))
overlap_item_user.show()

The total number of users in the train set, who have rated the same movies as the target user rated: 957152
Overlap of an item for target user:
+-------+----------+
|MovieId|user_count|
+-------+----------+
|    417|       815|
|   6110|      5325|
|   5025|      9653|
|   1406|     20631|
|   2356|      1601|
|   8596|     23005|
|      8|      2830|
|  13651|     20902|
|  13614|     19316|
|   1305|      8707|
|  14185|     15690|
|   2660|     20153|
|   9617|     16546|
|   5963|      2502|
|   2866|      8822|
|  10429|     11567|
|  16153|      1163|
|   3743|      2275|
|   6482|     19580|
|   5069|     12869|
+-------+----------+
only showing top 20 rows



In [20]:
u2 = overlap_item_user.groupBy().avg('user_count').collect()
u2

[Row(avg(user_count)=12115.848101265823)]

In [21]:
overlap_item_user.describe().show()
user_id = 1447354
print("The average overlap of items for second target user", user_id, "is: ", "12116")

+-------+------------------+------------------+
|summary|           MovieId|        user_count|
+-------+------------------+------------------+
|  count|                79|                79|
|   mean|8190.7468354430375|12115.848101265823|
| stddev|  5029.74747153311| 6821.628339876057|
|    min|                 8|               409|
|    max|             17174|             25468|
+-------+------------------+------------------+

The average overlap of items for second target user 1447354 is:  12116


##### target user 3

In [22]:
# pick a user from the test set and extract the items this user has rated in the training set
# how many other users in the training set have rated the same movies as user=534508

# users from test set: 1447354, 534508, 992921

print("The first target user I picked from the test set is UserId:992921")
print(" ")

user_id = 992921
movies_rated_by_target_user_df = train_ratings_df.filter("UserId  == 992921")

print("No of movies rated by the target user in the train set:", movies_rated_by_target_user_df.count())
print(" ")
print('movies rated by target user', user_id, ":")
movies_rated_by_target_user_df.show(5, truncate = False)

The first target user I picked from the test set is UserId:992921
 
No of movies rated by the target user in the train set: 79
 
movies rated by target user 992921 :
+-------+------+------+
|MovieId|UserId|Rating|
+-------+------+------+
|305    |992921|3.0   |
|851    |992921|3.0   |
|1123   |992921|3.0   |
|1256   |992921|4.0   |
|1367   |992921|4.0   |
+-------+------+------+
only showing top 5 rows



In [23]:
#list of movies rated by target user in training set
list_movie_rated_by_target_user = movies_rated_by_target_user_df.select('MovieId')
#print(list_movie_rated_by_target_user)

# converting list to array
movies_array = [int(row.MovieId) for row in list_movie_rated_by_target_user.collect()]
#print(len(movies_array))

# now lets get all the users in the train set, who have rated the same movies as 992921 rated

total_overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)].count()
print("The total number of users in the train set, who have rated the same movies as the target user rated:", total_overlap_item_user)

print("Overlap of an item for target user:")
overlap_item_user = train_ratings_df[train_ratings_df['MovieId'].isin(movies_array)]
overlap_item_user = overlap_item_user.groupBy('MovieId').agg(F.count(overlap_item_user.UserId).alias("user_count"))
overlap_item_user.show()

The total number of users in the train set, who have rated the same movies as the target user rated: 985708
Overlap of an item for target user:
+-------+----------+
|MovieId|user_count|
+-------+----------+
|   1425|      8552|
|   1615|     18043|
|  12293|     20691|
|   1406|     20631|
|   8596|     23005|
|  14505|      2244|
|   1256|      3722|
|  13651|     20902|
|  13614|     19316|
|   1757|      2625|
|  14185|     15690|
|   2660|     20153|
|   2866|      8822|
|   9617|     16546|
|   6482|     19580|
|  14283|      3330|
|  16948|      8950|
|  12232|     20275|
|  10889|      9579|
|  16082|     16542|
+-------+----------+
only showing top 20 rows



In [24]:
u3 = overlap_item_user.groupBy().avg('user_count').collect()
u3

[Row(avg(user_count)=12477.316455696202)]

In [25]:
overlap_item_user.describe().show()
user_id = 992921 
print("The average overlap of items for third target user", user_id, "is: ", "12477")

+-------+-----------------+------------------+
|summary|          MovieId|        user_count|
+-------+-----------------+------------------+
|  count|               79|                79|
|   mean|8398.556962025317|12477.316455696202|
| stddev|5399.468947645271| 6441.574096016387|
|    min|              305|              1504|
|    max|            17574|             25468|
+-------+-----------------+------------------+

The average overlap of items for third target user 992921 is:  12477


In [26]:
average_overlap_items_three_users = u1[0] + u2[0] + u3[0]
estimated_average_overlap_item_users = 0
for item in average_overlap_items_three_users:
    estimated_average_overlap_item_users += item/3
    
print("The estimated average overlap of item for users is: ", estimated_average_overlap_item_users)

The estimated average overlap of item for users is:  11135.011948030246


### estimated average over lap of users for items

##### target movie 1

In [27]:
# pick a target movie from the test set and extract the users that rated this movie in the training set
# how many other movies in the training set have been rated by the users who also rated target movie

# movies from test set: 481, 2366, 3149 

print("The first target movie I picked from the test set is MovieId: 481")
print(" ")

movie_id = 992921
users_who_rated_target_movie_df = train_ratings_df.filter("MovieId  == 481")

print("No of users who rated the target movie in the train set:", users_who_rated_target_movie_df.count())
print(" ")
print('users who rated target movie', movie_id, ":")
users_who_rated_target_movie_df.show(5, truncate = False)

The first target movie I picked from the test set is MovieId: 481
 
No of users who rated the target movie in the train set: 114
 
users who rated target movie 992921 :
+-------+-------+------+
|MovieId|UserId |Rating|
+-------+-------+------+
|481    |519545 |4.0   |
|481    |491331 |2.0   |
|481    |1300793|3.0   |
|481    |2602249|3.0   |
|481    |1227322|3.0   |
+-------+-------+------+
only showing top 5 rows



In [28]:
#list of users who rated target movie in training set
list_users_who_rated_target_movie = users_who_rated_target_movie_df.select('UserId')
#print(list_movie_rated_by_target_user)

# converting list to array
users_array = [int(row.UserId) for row in list_users_who_rated_target_movie.collect()]
#print(len(users_array))

# now lets get all the movies in the train set rated by uers who have rated the target movie 481

total_overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)].count()
print("The total number of movies in the train set, who have rated the same movies as the target user rated:", total_overlap_user_item)

print("Overlap of users for target item:")
overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)]
overlap_user_item = overlap_user_item.groupBy('UserId').agg(F.count(overlap_user_item.MovieId).alias("movie_count"))
overlap_user_item.show()

The total number of movies in the train set, who have rated the same movies as the target user rated: 34568
Overlap of users for target item:
+-------+-----------+
| UserId|movie_count|
+-------+-----------+
| 421240|         79|
| 912542|         85|
|2289098|         96|
|1900445|        169|
|2397454|         84|
|1932594|        848|
|2315012|        390|
|2436452|        109|
|1918794|        111|
| 863644|        126|
| 682963|        520|
|1275094|        241|
|2260738|         74|
| 507094|        367|
|1227322|        490|
|2009329|         99|
|1659647|        150|
| 731685|        167|
| 251967|         89|
| 769702|        377|
+-------+-----------+
only showing top 20 rows



In [29]:
m1 = overlap_user_item.groupBy().avg('movie_count').collect()
m1

[Row(avg(movie_count)=303.2280701754386)]

In [30]:
overlap_user_item.describe().show()
movie_id = 481 
print("The average overlap of users for first target movie", movie_id, "is: ", "303 ")

+-------+------------------+------------------+
|summary|            UserId|       movie_count|
+-------+------------------+------------------+
|  count|               114|               114|
|   mean|1340480.6140350876| 303.2280701754386|
| stddev| 776819.3275789219|333.59752750213073|
|    min|             27165|                74|
|    max|           2617950|              1757|
+-------+------------------+------------------+

The average overlap of users for first target movie 481 is:  303 


##### target movie 2

In [31]:
# pick a target movie from the test set and extract the users that rated this movie in the training set
# how many other movies in the training set have been rated by the users who also rated target movie

# movies from test set: 481, 2366, 3149 

print("The second target movie I picked from the test set is MovieId: 2366")
print(" ")

movie_id = 3149 
users_who_rated_target_movie_df = train_ratings_df.filter("MovieId  == 2366")

print("No of users who rated the target movie in the train set:", users_who_rated_target_movie_df.count())
print(" ")
print('users who rated target movie', movie_id, ":")
users_who_rated_target_movie_df.show(5, truncate = False)

The second target movie I picked from the test set is MovieId: 2366
 
No of users who rated the target movie in the train set: 1643
 
users who rated target movie 3149 :
+-------+-------+------+
|MovieId|UserId |Rating|
+-------+-------+------+
|2366   |1945809|1.0   |
|2366   |41412  |4.0   |
|2366   |2422606|4.0   |
|2366   |1276913|4.0   |
|2366   |2581970|3.0   |
+-------+-------+------+
only showing top 5 rows



In [32]:
#list of users who rated target movie in training set
list_users_who_rated_target_movie = users_who_rated_target_movie_df.select('UserId')
#print(list_movie_rated_by_target_user)

# converting list to array
users_array = [int(row.UserId) for row in list_users_who_rated_target_movie.collect()]
#print(len(users_array))

# now lets get all the movies in the train set rated by uers who have rated the target movie 2366

total_overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)].count()
print("The total number of movies in the train set, who have rated the same movies as the target user rated:", total_overlap_user_item)

print("Overlap of users for target item:")
overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)]
overlap_user_item = overlap_user_item.groupBy('UserId').agg(F.count(overlap_user_item.MovieId).alias("movie_count"))
overlap_user_item.show()

The total number of movies in the train set, who have rated the same movies as the target user rated: 266348
Overlap of users for target item:
+-------+-----------+
| UserId|movie_count|
+-------+-----------+
|1552084|        184|
|1629521|         82|
|1628484|        205|
|1189060|         78|
|1704384|        105|
| 713980|         78|
| 110938|        267|
|1684416|        270|
| 861862|        100|
| 599480|        129|
|2124531|         75|
| 477387|        141|
| 333426|         92|
|1723385|        118|
|1293986|        131|
| 212883|        117|
|2473973|         94|
|2645399|         94|
|1384003|         83|
|1906611|        218|
+-------+-----------+
only showing top 20 rows



In [33]:
m2 = overlap_user_item.groupBy().avg('movie_count').collect()
m2

[Row(avg(movie_count)=162.11077297626292)]

In [34]:
overlap_user_item.describe().show()
movie_id = 2366
print("The average overlap of users for second target movie", movie_id, "is: ", "162")

+-------+-----------------+------------------+
|summary|           UserId|       movie_count|
+-------+-----------------+------------------+
|  count|             1643|              1643|
|   mean|1335307.347534997|162.11077297626292|
| stddev|760553.7139657538|114.70887749834162|
|    min|             2976|                71|
|    max|          2648853|              1757|
+-------+-----------------+------------------+

The average overlap of users for second target movie 2366 is:  162


##### target movie 3

In [35]:
# pick a target movie from the test set and extract the users that rated this movie in the training set
# how many other movies in the training set have been rated by the users who also rated target movie

# movies from test set: 481, 2366, 3149 

print("The third target movie I picked from the test set is MovieId: 3149 ")
print(" ")

movie_id = 3149 
users_who_rated_target_movie_df = train_ratings_df.filter("MovieId  == 3149 ")

print("No of users who rated the target movie in the train set:", users_who_rated_target_movie_df.count())
print(" ")
print('users who rated target movie', movie_id, ":")
users_who_rated_target_movie_df.show(5, truncate = False)

The third target movie I picked from the test set is MovieId: 3149 
 
No of users who rated the target movie in the train set: 162
 
users who rated target movie 3149 :
+-------+-------+------+
|MovieId|UserId |Rating|
+-------+-------+------+
|3149   |1850680|1.0   |
|3149   |2245579|3.0   |
|3149   |2609870|1.0   |
|3149   |1574266|4.0   |
|3149   |1248029|3.0   |
+-------+-------+------+
only showing top 5 rows



In [36]:
#list of users who rated target movie in training set
list_users_who_rated_target_movie = users_who_rated_target_movie_df.select('UserId')
#print(list_movie_rated_by_target_user)

# converting list to array
users_array = [int(row.UserId) for row in list_users_who_rated_target_movie.collect()]
#print(len(users_array))

# now lets get all the movies in the train set rated by uers who have rated the target movie 3149

total_overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)].count()
print("The total number of movies in the train set, who have rated the same movies as the target user rated:", total_overlap_user_item)

print("Overlap of users for target item:")
overlap_user_item = train_ratings_df[train_ratings_df['UserId'].isin(users_array)]
overlap_user_item = overlap_user_item.groupBy('UserId').agg(F.count(overlap_user_item.MovieId).alias("movie_count"))
overlap_user_item.show()

The total number of movies in the train set, who have rated the same movies as the target user rated: 34851
Overlap of users for target item:
+-------+-----------+
| UserId|movie_count|
+-------+-----------+
| 554587|        208|
|2269379|         88|
|1317009|         89|
| 273576|         72|
|2098006|        158|
| 370735|        197|
|2080458|        102|
|2381792|        163|
|1018465|        146|
|2436452|        109|
|1916315|         87|
| 152178|        116|
| 113983|         89|
|2409015|        113|
|1406700|         90|
| 842706|        104|
|1037245|        542|
|2448620|        112|
|2115291|        115|
|1317589|        133|
+-------+-----------+
only showing top 20 rows



In [37]:
m3 = overlap_user_item.groupBy().avg('movie_count').collect()
m3

[Row(avg(movie_count)=215.12962962962962)]

In [38]:
overlap_user_item.describe().show()
movie_id = 3149
print("The average overlap of users for first target movie", movie_id, "is: ", "215")

+-------+------------------+------------------+
|summary|            UserId|       movie_count|
+-------+------------------+------------------+
|  count|               162|               162|
|   mean|1374304.3580246915|215.12962962962962|
| stddev| 737302.2546868202| 249.0907225505027|
|    min|             86246|                72|
|    max|           2647396|              1757|
+-------+------------------+------------------+

The average overlap of users for first target movie 3149 is:  215


In [39]:
average_overlap_users_three_items = m1[0] + m2[0] + m3[0]
estimated_average_overlap_users_items = 0
for user in average_overlap_users_three_items:
    estimated_average_overlap_users_items += user/3
    
print("The estimated average overlap of users for items is: ", estimated_average_overlap_users_items)

The estimated average overlap of users for items is:  226.8228242604437


### 2(c): Approach Chosen

In [40]:
print("""Collaboratives filtering approach lives from finding many similar users(user-user model) or 
many similar items(item-item model).
User similarities are measured by overlapping items and item similarities are measured by overlapping users.
From the above two statistics, the overlapping items for users is much higher(11135) than overlapping users for items(226). 
Hence, I will implement the collaborative filetering approach using user-user model or user similarities.""")

Collaboratives filtering approach lives from finding many similar users(user-user model) or 
many similar items(item-item model).
User similarities are measured by overlapping items and item similarities are measured by overlapping users.
From the above two statistics, the overlapping items for users is much higher(11135) than overlapping users for items(226). 
Hence, I will implement the collaborative filetering approach using user-user model or user similarities.


## Utility Matrix

### Part 3: Collaborative Filtering Implementation

#### Step 1: Implementation

##### Collaborative Filtering

###### Creating a Training and Validation Set

In [41]:
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

In [44]:
# Creating a Training Set

# We'll hold out 80% for training, 20% of our data for validation
seed = 1800009193
(split_80_df, split_20_df) = train_ratings_df.randomSplit([0.8, 0.2], seed=seed)

# Let's cache these datasets for performance
training_df = split_80_df.cache()
validation_df = split_20_df.cache()
test_df = test_ratings_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count()))
training_df.show(3)
validation_df.show(3)
test_df.show(3)

21/12/15 21:02:24 WARN CacheManager: Asked to cache already cached data.
21/12/15 21:02:24 WARN CacheManager: Asked to cache already cached data.
                                                                                

Training: 2604904, validation: 650447, test: 100477

+-------+------+------+
|MovieId|UserId|Rating|
+-------+------+------+
|      8|     7|   5.0|
|      8|  1333|   3.0|
|      8|  3321|   1.0|
+-------+------+------+
only showing top 3 rows

+-------+------+------+
|MovieId|UserId|Rating|
+-------+------+------+
|      8|  4706|   5.0|
|      8|  5652|   3.0|
|      8| 13197|   3.0|
+-------+------+------+
only showing top 3 rows

+-------+-------+------+
|MovieId| UserId|Rating|
+-------+-------+------+
|      8|2149668|   3.0|
|      8|1089184|   3.0|
|      8|2465894|   3.0|
+-------+-------+------+
only showing top 3 rows



##### Build the Recommender

##### Alternating Least Squares

In [45]:
# This step is broken in ML Pipelines: https://issues.apache.org/jira/browse/SPARK-14489
from pyspark.ml.recommendation import ALS

# Let's initialize our ALS learner

als = ALS(maxIter=5, regParam=0.1, 
          userCol="UserId", itemCol="MovieId", ratingCol="Rating",
          coldStartStrategy="drop",
          implicitPrefs=False)


# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.rank    #fill 
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)   #fill

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

                                                                                

For rank 4 the RMSE is 0.8637802040072456


                                                                                

For rank 8 the RMSE is 0.8637802040072455




For rank 12 the RMSE is 0.8637802040072456
The best model was trained with rank 8


                                                                                

##### Test the recommender

In [46]:
# Testing Your Model
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489

predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame

test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))



The model had a RMSE on the test set of 0.8648648275793531


                                                                                

In [62]:
predicted_test_df.show(3)

+-------+-------+------+----------+
|MovieId| UserId|Rating|prediction|
+-------+-------+------+----------+
|     28|2358799|   3.0|   3.78483|
|    156| 973051|   5.0| 3.9910395|
|    851|1189060|   3.0| 3.5878341|
+-------+-------+------+----------+
only showing top 3 rows



#### Step 2: Execution and Evaluation

In [47]:
# Comparing Your Model

# Compute the average rating
avg_rating_df = training_df.agg(F.avg(train_ratings_df.Rating).alias("average"))

# Extract the average rating value. (This is row 0, column 0.)
training_avg_rating = avg_rating_df.collect()[0][0]

print('The average rating for movies in the training set is {0}'.format(training_avg_rating))

# Add a column with the average rating
test_for_avg_df = test_df.withColumn('prediction', F.lit(training_avg_rating))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))

The average rating for movies in the training set is 3.4814234996759956
The RMSE on the average set is 1.0852535809526271


In [48]:
# Generate top 5 movie recommendations for each user
userRecs = model.recommendForAllUsers(5)
userRecs.count()

                                                                                

28978

In [49]:
userRecs_df = userRecs.toPandas()
print(userRecs_df.shape)

                                                                                

(28978, 2)


In [50]:
userRecs_df.head()

Unnamed: 0,UserId,recommendations
0,481,"[(4238, 5.233418941497803), (3033, 5.183670997..."
1,2678,"[(7569, 4.2683258056640625), (3033, 4.23158931..."
2,3595,"[(12293, 4.1894426345825195), (16147, 4.076137..."
3,6460,"[(12544, 4.642346382141113), (8933, 4.53262138..."
4,7284,"[(6522, 5.089906215667725), (8933, 5.008864879..."


### Step 3: Does your approach work for your own preferences?

In [51]:
# Your Movie Ratings

print('Most rated movies:')
print('average rating, movie name, number of reviews, movie ID')
#display(movies_df.orderBy(movies_with_500_ratings_or_more['average'].desc()).take(50))
most_rated_movies_df = train_ratings_df.join(movies_df, "MovieId")
#most_rated_movies_df.orderBy('Rating', ascending = [0,1]).show()
#most_rated_movies_df.groupBy('Title').filter('Rating  >= 4.0').show()
most_rated_movies_average_ratings_df = most_rated_movies_df.groupBy('Title').agg(F.count(most_rated_movies_df.Rating).alias("rating_count"), F.avg(most_rated_movies_df.Rating).alias("average_rating"))
most_rated_movies_average_ratings_df.show()

Most rated movies:
average rating, movie name, number of reviews, movie ID


[Stage 494:>                                                        (0 + 4) / 4]

+--------------------+------------+------------------+
|               Title|rating_count|    average_rating|
+--------------------+------------+------------------+
|       The Big Tease|         927| 3.112189859762675|
|Police Academy 3:...|        2633|3.4135966578047854|
|David Blaine: Fea...|         227|3.4537444933920707|
|Godzilla vs. The ...|         243|3.5185185185185186|
|Martha Stewart Ho...|          26|2.0384615384615383|
|Halloween III: Se...|         601| 2.302828618968386|
|     Parting Glances|         431|3.1531322505800463|
|     Rich and Famous|          87|3.0114942528735633|
| The Opposite of Sex|        6356|  3.29137822529893|
|           Tiger Bay|          98|  2.86734693877551|
|        Mother's Day|         147|2.5850340136054424|
|       Captain Blood|         977|3.9365404298874105|
|        Just 4 Kicks|          42|2.5476190476190474|
|     The Inheritance|          53| 3.150943396226415|
|              Animal|         219|3.1552511415525113|
|        F

                                                                                

#### (3a) My Movie Ratings

In [66]:
from pyspark.sql import Row
my_user_id = 1

#fill
# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
     (1425, my_user_id, 5.0),\
     (1615, my_user_id, 4.0),\
     (2660, my_user_id, 4.0),\
     (14185, my_user_id, 3.0),\
     (15690, my_user_id, 4.0),\
     (16082, my_user_id, 3.0),\
     (2866, my_user_id, 2.0),\
     (16948, my_user_id, 4.0),\
     (12232, my_user_id, 1.0),\
     (9579, my_user_id, 5.0)
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['MovieId','UserId','Rating'])
print('My movie ratings:')
my_ratings_df.limit(10).show()

My movie ratings:
+-------+------+------+
|MovieId|UserId|Rating|
+-------+------+------+
|   1425|     1|   5.0|
|   2660|     1|   4.0|
|  16082|     1|   3.0|
|  16948|     1|   4.0|
|   1615|     1|   4.0|
|  14185|     1|   3.0|
|  15690|     1|   4.0|
|   2866|     1|   2.0|
|  12232|     1|   1.0|
|   9579|     1|   5.0|
+-------+------+------+



##### (3b) Add Your Movies to Training Dataset

In [67]:
# Add Your Movies to Training Dataset

training_with_my_ratings_df = train_ratings_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - train_ratings_df.count()))
assert (training_with_my_ratings_df.count() - train_ratings_df.count()) == my_ratings_df.count()

The training dataset now has 10 more entries than the original training dataset


In [68]:
train_ratings_df.summary().show()



+-------+-----------------+------------------+------------------+
|summary|          MovieId|            UserId|            Rating|
+-------+-----------------+------------------+------------------+
|  count|          3255351|           3255351|           3255351|
|   mean|8724.662781985722|1327058.3376840162|3.4811883572616287|
| stddev|5107.400011271949| 762688.7721136078|1.0828725720369496|
|    min|                8|                 7|               1.0|
|    25%|             3893|            671697|               3.0|
|    50%|             8825|           1322587|               4.0|
|    75%|            13326|           1988919|               4.0|
|    max|            17742|           2649285|               5.0|
+-------+-----------------+------------------+------------------+



                                                                                

In [69]:
training_with_my_ratings_df.summary().show()



+-------+-----------------+------------------+-----------------+
|summary|          MovieId|            UserId|           Rating|
+-------+-----------------+------------------+-----------------+
|  count|          3255361|           3255361|          3255361|
|   mean|8724.664635965106|1327054.2611550607|3.481188415048285|
| stddev|5107.403836424692| 762691.1471884635|1.082872965979906|
|    min|                8|                 1|              1.0|
|    25%|             3893|            671697|              3.0|
|    50%|             8825|           1322587|              4.0|
|    75%|            13326|           1988919|              4.0|
|    max|            17742|           2649285|              5.0|
+-------+-----------------+------------------+-----------------+



                                                                                

##### (3c) Train a Model with Your Ratings

In [70]:
# Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setUserCol("UserId")\
   .setItemCol("MovieId")\
   .setRatingCol("Rating")

# Create the model with these parameters.
#my_ratings_model = als.<FILL_IN>
my_ratings_model = als.fit(training_with_my_ratings_df)

                                                                                

##### (3d) Check RMSE for the New Model with Your Ratings

In [71]:
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame

test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))



The model had a RMSE on the test set of 0.8659527667037572


                                                                                

In [72]:
predicted_test_my_ratings_df.orderBy('UserId').show()

+-------+------+------+----------+
|MovieId|UserId|Rating|prediction|
+-------+------+------+----------+
|  15496|     7|   5.0| 3.7315006|
|   9528|     7|   5.0|  4.362616|
|   8163|    79|   3.0| 3.3374395|
|   2913|    79|   4.0| 3.8379288|
|  14648|    79|   5.0|  4.281792|
|  12497|    79|   4.0|  3.354691|
|   3165|   199|   5.0|  3.663073|
|   3541|   199|   3.0| 3.9297366|
|   2518|   199|   4.0| 3.6155295|
|   8851|   199|   4.0| 3.9575157|
|  14185|   481|   4.0| 4.4459743|
|    851|   481|   5.0| 4.5181932|
|  14712|   769|   2.0| 2.0519073|
|  16731|   769|   4.0| 3.2350373|
|   6190|   769|   3.0| 2.9281616|
|   6287|   769|   4.0|  2.469206|
|    797|   906|   1.0|  3.286673|
|  14484|   906|   3.0| 3.3645842|
|  11182|  1310|   5.0| 3.3378527|
|   5069|  1310|   2.0| 3.0583959|
+-------+------+------+----------+
only showing top 20 rows



In [73]:
# Generate top 5 movie recommendations for each user
userRecs = my_ratings_model.recommendForAllUsers(5)
userRecs.count()

                                                                                

28979

In [74]:
userRecs_df = userRecs.toPandas()
print(userRecs_df.shape)

                                                                                

(28979, 2)


In [75]:
userRecs_df.head()

Unnamed: 0,UserId,recommendations
0,481,"[(3033, 5.149134635925293), (4238, 5.138109683..."
1,2678,"[(7569, 4.19780158996582), (6991, 4.0455470085..."
2,3595,"[(12293, 4.227561950683594), (3290, 4.09849262..."
3,6460,"[(10080, 4.582267761230469), (359, 4.461654663..."
4,7284,"[(9701, 4.913458347320557), (6522, 4.907984256..."


In [84]:
userRecs_df.loc[userRecs_df['UserId'] == 1]

Unnamed: 0,UserId,recommendations
1150,1,"[(9579, 4.64711332321167), (15183, 3.981461763..."
