## DETAILED DESCRIPTIONS OF DATA FILES
Here are brief descriptions of the data.

ml-data.tar.gz -- Compressed tar file. To rebuild the u data files do this:
gunzip ml-data.tar.gz
tar xvf ml-data.tar
mku.sh

**u.data** -- The full u data set, 100000 ratings by 943 users on 1682 items.
Each user has rated at least 20 movies. Users and items are
numbered consecutively from 1. The data is randomly
ordered. This is a tab separated list of
user id | item id | rating | timestamp.
The time stamps are unix seconds since 1/1/1970 UTC

**u.info** -- The number of users, items, and ratings in the u data set.

**u.item** -- Information about the items (movies); this is a tab separated
list of
movie id | movie title | release date | video release date |
IMDb URL | unknown | Action | Adventure | Animation |
Children's | Comedy | Crime | Documentary | Drama | Fantasy |
Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
Thriller | War | Western |
The last 19 fields are the genres, a 1 indicates the movie
is of that genre, a 0 indicates it is not; movies can be in
several genres at once.
The movie ids are the ones used in the u.data data set.

**u.genre** -- A list of the genres.

**u.user** -- Demographic information about the users; this is a tab
separated list of
user id | age | gender | occupation | zip code
The user ids are the ones used in the u.data data set.

**u.occupation** -- A list of the occupations.

**u1.base** -- The data sets u1.base and u1.test through u5.base and u5.test
**u1.test** are 80%/20% splits of the u data into training and test data.
**u2.base** Each of u1, …, u5 have disjoint test sets; this if for
**u2.test** 5 fold cross validation (where you repeat your experiment
**u3.base with each training and test set and average the results).
**u3.test** These data sets can be generated from u.data by mku.sh.
**u4.base**
**u4.test**
**u5.base**
**u5.test**

**ua.base** -- The data sets ua.base, ua.test, ub.base, and ub.test
**ua.test** split the u data into a training set and a test set with
**ub.base** exactly 10 ratings per user in the test set. The sets
**ub.test** ua.test and ub.test are disjoint. These data sets can
be generated from u.data by mku.sh.

**allbut.pl** -- The script that generates training and test sets where
all but n of a users ratings are in the training data.

**mku.sh** -- A shell script to generate all the u data sets from u.data.

In [44]:
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf 

# Import the required ML functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


import sys

### Following are the parameters of a SparkContext.

**Master** − It is the URL of the cluster it connects to.

**appName** − Name of your job.

**sparkHome** − Spark installation directory.

**pyFiles** − The .zip or .py files to send to the cluster and add to the PYTHONPATH.

**Environment** − Worker nodes environment variables.

**batchSize** − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size.

**Serializerve** − RDD serializer.

**Conf** − An object of L{SparkConf} to set all the Spark properties.

**Gateway** − Use an existing gateway and JVM, otherwise initializing a new JVM.

**JSC** − The JavaSparkContext instance.
**profiler_cls** − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler).


In [2]:
spark.stop()

In [3]:
 spark = SparkSession \
 .builder \
 .appName("recommendation_engine") \
 .master("local") \
 .getOrCreate()

In [4]:
ratings = spark.read.csv("./data/ml-100k/ua.base", sep="\t")
#ratings = spark.read.load('data/ml-100k/ratings.parquet', schema=schema)

In [5]:
ratings = ratings.toDF('userId','movieId','rating','timestamp')

In [6]:
ratings.columns

['userId', 'movieId', 'rating', 'timestamp']

In [7]:
ratings = ratings.sort(['userId','movieId'], ascending=True)

In [8]:
# show schema
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|     5|874965758|
|     1|     10|     3|875693118|
|     1|    100|     5|878543541|
|     1|    101|     2|878542845|
|     1|    102|     2|889751736|
|     1|    103|     1|878542845|
|     1|    104|     1|875241619|
|     1|    105|     2|875240739|
|     1|    106|     4|875241390|
|     1|    107|     4|875241619|
|     1|    108|     5|875240920|
|     1|    109|     5|874965739|
|     1|     11|     2|875072262|
|     1|    110|     1|878542845|
|     1|    111|     5|889751711|
|     1|    112|     1|878542441|
|     1|    113|     5|878542738|
|     1|    114|     5|875072173|
|     1|    115|     5|878541637|
|     1|    116|     3|878542960|
+------+-------+------+---------+
only showing top 20 rows



In [9]:
# count rows
ratings.count()

90570

In [10]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

In [11]:
# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
print('number of unique users: ', num_users)

number of unique users:  943


In [12]:
num_movies = ratings.select("movieId").distinct().count()
print('number of unique movies: ', num_movies)

number of unique movies:  1680


In [13]:
# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies
print('number of possible ratings: ', denominator)

number of possible ratings:  1584240


In [14]:
# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  94.28% empty.


In [15]:
# Filter to show only userIds less than 100
ratings.filter(sf.col("userId") < 100).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|     5|874965758|
|     1|     10|     3|875693118|
|     1|    100|     5|878543541|
|     1|    101|     2|878542845|
|     1|    102|     2|889751736|
|     1|    103|     1|878542845|
|     1|    104|     1|875241619|
|     1|    105|     2|875240739|
|     1|    106|     4|875241390|
|     1|    107|     4|875241619|
|     1|    108|     5|875240920|
|     1|    109|     5|874965739|
|     1|     11|     2|875072262|
|     1|    110|     1|878542845|
|     1|    111|     5|889751711|
|     1|    112|     1|878542441|
|     1|    113|     5|878542738|
|     1|    114|     5|875072173|
|     1|    115|     5|878541637|
|     1|    116|     3|878542960|
+------+-------+------+---------+
only showing top 20 rows



In [16]:
# Group data by userId, count ratings
ratings.groupBy("userId").count().show()

+------+-----+
|userId|count|
+------+-----+
|   296|  137|
|   467|   34|
|   675|   24|
|   691|   22|
|   829|   54|
|   125|  172|
|   451|   88|
|   800|   18|
|   853|   31|
|   666|  235|
|   870|  259|
|   919|  207|
|   926|   10|
|   124|   14|
|   447|  129|
|    51|   13|
|   591|   74|
|     7|  393|
|   307|  102|
|   475|   10|
+------+-----+
only showing top 20 rows



In [17]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(sf.min("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+



In [18]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(sf.avg('count')).show()

Movie with the fewest ratings: 
+------------------+
|        avg(count)|
+------------------+
|53.910714285714285|
+------------------+



In [19]:
# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("userId").count().select(sf.min("count")).show()

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|        10|
+----------+



In [20]:
# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("userId").count().select(sf.avg("count")).show()

Avg num ratings per user: 
+-----------------+
|       avg(count)|
+-----------------+
|96.04453870625663|
+-----------------+



In [21]:
# Use .printSchema() to see the datatypes of the ratings dataset
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [22]:
# Tell Spark to convert the columns to the proper data types
ratings = ratings.select(ratings.userId.cast("integer"), ratings.movieId.cast("integer"), ratings.rating.cast("double"))

In [23]:
# Call .printSchema() again to confirm the columns are now in the correct format
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



# ALS Model

In [24]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

In [25]:
# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False)

In [26]:
# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [27]:
# Add hyperparameters and their respective values to param_grid
#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank,  [10, 50, 100, 150]) \
#            .addGrid(als.maxIter, [5, 50, 100, 200]) \
#            .addGrid(als.regParam, [.01, .05, .1, .15]) \
#            .build()

param_grid = ParamGridBuilder() \
            .addGrid(als.rank,  [10, 50]) \
            .addGrid(als.maxIter, [5]) \
            .build()

In [28]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  2


In [29]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_a615bdab980c


In [30]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

In [31]:
best_model = model.bestModel

In [32]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# Print "Rank"
print("  Rank:", best_model.rank)


<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 10


In [33]:
df_test_01 = spark.read.csv("./data/ml-100k/ua.test", sep="\t")

In [34]:
df_test_01 = df_test_01.toDF('userId','movieId','rating','timestamp')

In [35]:
df_test_01 = df_test_01.select(df_test_01.userId.cast("integer"), df_test_01.movieId.cast("integer"), df_test_01.rating.cast("double"))

In [36]:
test_predictions = best_model.transform(df_test_01)

In [37]:
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   251|    148|   2.0| 3.0283358|
|   580|    148|   4.0| 3.0808342|
|   633|    148|   1.0| 2.9991403|
|   642|    148|   5.0|   4.01511|
|   406|    148|   3.0| 2.5808716|
|    26|    148|   3.0|  2.611642|
|    44|    148|   4.0| 2.8444238|
|   271|    148|   3.0| 2.8736029|
|   606|    148|   3.0| 3.3448424|
|   916|    148|   2.0|  2.479743|
|   236|    148|   4.0| 3.1556623|
|   602|    148|   4.0| 3.6904817|
|   663|    148|   4.0| 2.9024608|
|   222|    148|   2.0| 2.8818076|
|   601|    148|   3.0| 1.9276849|
|   330|    148|   4.0| 4.1763325|
|   372|    148|   5.0|  3.984272|
|   727|    148|   2.0| 2.9217217|
|   190|    148|   4.0| 3.2250655|
|   224|    148|   3.0| 3.3781059|
+------+-------+------+----------+
only showing top 20 rows



In [38]:
test_predictions.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = false)



In [39]:
test_predictions = test_predictions.select(test_predictions.userId.cast("integer"),
                               test_predictions.movieId.cast("integer"),
                               test_predictions.rating.cast("double"),
                              test_predictions.prediction.cast("double"))

In [40]:
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

nan


In [42]:
test_predictions

DataFrame[userId: int, movieId: int, rating: double, prediction: double]

In [54]:
# recommendations for all users 
ALS_recommendations = best_model.recommendForAllUsers(10)

In [55]:
ALS_recommendations.registerTempTable("ALS_recs_temp")

In [82]:
ALS_recommendations.select('recommendations')

DataFrame[recommendations: array<struct<movieId:int,rating:float>>]

In [77]:
getrows(ALS_recommendations, rownums=[0, 2]).collect()


NameError: name 'getrows' is not defined

In [86]:
clean_recs = spark.sql("SELECT userId, \
                       moviesIds_and_ratings.movieId AS movieId, \
                       moviesIds_And_ratings.rating AS prediction \
                       FROM ALS_recs_temp  \
                       LATERAL VIEW explode(recommendations) exploded_table \
                        moviesIds_and_ratings")

In [87]:
clean_recs.show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|   471|    838| 5.7406297|
|   471|   1155| 5.1363497|
|   471|    394| 5.1212916|
|   471|   1242| 5.0978255|
|   471|    867| 5.0426164|
|   471|    613| 4.9960318|
|   471|    898|  4.980299|
|   471|   1313|  4.958345|
|   471|    289|  4.943675|
|   471|   1344| 4.8187056|
|   463|   1431|  4.512798|
|   463|   1344|  4.505311|
|   463|   1589| 4.4850698|
|   463|   1242| 4.2059665|
|   463|   1268|  4.190686|
|   463|    887| 4.0957394|
|   463|    963| 4.0705633|
|   463|    838|  4.067269|
|   463|   1137|   4.02338|
|   463|    613| 4.0128913|
+------+-------+----------+
only showing top 20 rows



In [100]:
items_names = spark.read.csv('data/ml-100k/u.item', sep='|')

In [102]:
items_names = items_names.select('_c0', '_c1')

In [104]:
items_names = items_names.toDF('movieId', 'movieName')

In [105]:
items_names.show()

+-------+--------------------+
|movieId|           movieName|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|    GoldenEye (1995)|
|      3|   Four Rooms (1995)|
|      4|   Get Shorty (1995)|
|      5|      Copycat (1995)|
|      6|Shanghai Triad (Y...|
|      7|Twelve Monkeys (1...|
|      8|         Babe (1995)|
|      9|Dead Man Walking ...|
|     10|  Richard III (1995)|
|     11|Seven (Se7en) (1995)|
|     12|Usual Suspects, T...|
|     13|Mighty Aphrodite ...|
|     14|  Postino, Il (1994)|
|     15|Mr. Holland's Opu...|
|     16|French Twist (Gaz...|
|     17|From Dusk Till Da...|
|     18|White Balloon, Th...|
|     19|Antonia's Line (1...|
|     20|Angels and Insect...|
+-------+--------------------+
only showing top 20 rows



In [107]:
# recommended movies by user
clean_recs.join(items_names, ['movieId'], "left").show()


+-------+------+----------+--------------------+
|movieId|userId|prediction|           movieName|
+-------+------+----------+--------------------+
|    838|   471| 5.7406297|In the Line of Du...|
|   1155|   471| 5.1363497|Rendezvous in Par...|
|    394|   471| 5.1212916|Radioland Murders...|
|   1242|   471| 5.0978255|Old Lady Who Walk...|
|    867|   471| 5.0426164|Whole Wide World,...|
|    613|   471| 4.9960318|My Man Godfrey (1...|
|    898|   471|  4.980299| Postman, The (1997)|
|   1313|   471|  4.958345|     Palmetto (1998)|
|    289|   471|  4.943675|        Evita (1996)|
|   1344|   471| 4.8187056|Story of Xinghua,...|
|   1431|   463|  4.512798| Legal Deceit (1997)|
|   1344|   463|  4.505311|Story of Xinghua,...|
|   1589|   463| 4.4850698|  Schizopolis (1996)|
|   1242|   463| 4.2059665|Old Lady Who Walk...|
|   1268|   463|  4.190686|  Bitter Moon (1992)|
|    887|   463| 4.0957394|  Eve's Bayou (1997)|
|    963|   463| 4.0705633|Some Folks Call I...|
|    838|   463|  4.