## Recommendation (collaborative filtering) example
#### Adapted from: https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

In [1]:
# pyspark specific imports
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# other python imports
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import time

### Load Data

In [2]:
# load ratings
ratings = spark.read.option("sep", ",")\
            .option("header", "true")\
            .csv("gs://80-629bucket/dat/ml-latest/ratings.csv", inferSchema=True)
ratings = ratings.cache()

In [3]:
print(ratings.head())
print(ratings.describe(["userId", "movieId", "rating", "timestamp"]).show())
print('uniquer users', ratings.select('userId').dropDuplicates().count())
print('uniquer items', ratings.select('movieId').dropDuplicates().count())

Row(userId=1, movieId=110, rating=1.0, timestamp=1425941529)
+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         26024289|          26024289|          26024289|            26024289|
|   mean| 135037.090248114|15849.109677040553|3.5280903543608817|1.1712584326913226E9|
| stddev|78176.19722170313| 31085.25753139178|1.0654427636662376|2.0528887028184643E8|
|    min|                1|                 1|               0.5|           789652004|
|    max|           270896|            176275|               5.0|          1501829870|
+-------+-----------------+------------------+------------------+--------------------+

None
uniquer users 270896
uniquer items 45115


In [4]:
# load movie titles
movies = spark.read.option("sep", ",")\
            .option("header", "true")\
            .csv("gs://80-629bucket/dat/ml-latest/movies.csv", inferSchema=True)
movies = movies.cache()

In [5]:
# print first n movies
n=2
print(movies.head(n))

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'), Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy')]


### Pre-Process data

In [6]:
(training, test) = ratings.randomSplit([0.8, 0.2])

### Train matrix factorization model

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
K=100
als = ALS(regParam=0.1, rank=K, maxIter=20, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", numUserBlocks=10, numItemBlocks=10)

stime = time.time()
model = als.fit(training)
etime = time.time()

In [206]:
print('Elapsed time', etime-stime)

Elapsed time 1415.0819599628448


### Predictions and model exploration

In [None]:
# Evaluate the model by computing the RMSE on train and test datasets
predictions = model.transform(training)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse_train = evaluator.evaluate(predictions)

predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse_test = evaluator.evaluate(predictions)


In [205]:
#Train RMSE =  0.6356150490826092
#Test RMSE =  0.8453643678422643
print("Train RMSE = ", str(rmse_train))
print("Test RMSE = ", str(rmse_test))

Train RMSE =  0.7508403940223557
Test RMSE =  0.8104012335207674


In [None]:
def explore_user(uID):
    userRecs(uID)

In [247]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
# movieRecs = model.recommendForAllItems(10)
userRecs.cache()

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

In [244]:
def movie_names(mid):
    #print(type(movies.where(col("movieId") == mid).select(col("title")).collect()))
    return movies.where(col("movieId") == mid).select(col("title")).collect()[0][0]

def movie_genres(mid):
    #print(type(movies.where(col("movieId") == mid).select(col("title")).collect()))
    return movies.where(col("movieId") == mid).select(col("genres")).collect()[0][0]

In [248]:
uID = 100000

training_user = training.where(training.userId == uID).select("movieId", "rating").collect()
print('Training set for user %d' % uID)
for i,movie in enumerate(training_user):
    #rating = training_user.where(training_user.movieId == movie.movieId).select("rating").collect()
    print('\t', movie_names(movie.movieId), movie_genres(movie.movieId), movie.rating)

print() # skip line

reco_users = userRecs.where(userRecs.userId == uID).select("recommendations.movieId", "recommendations.rating").collect()[0]
print('Recommendations for user %d' % uID)
for i,mid in enumerate(reco_users[0]):
    print('\t %d. %s (%s) (predicted rating: %.3f)' % (i+1,movie_names(mid),movie_genres(mid), reco_users[1][i]))
    #print(


Training set for user 100000
	 Jumanji (1995) Adventure|Children|Fantasy 4.0
	 American President, The (1995) Comedy|Drama|Romance 5.0
	 Babe (1995) Children|Drama 5.0
	 Clueless (1995) Comedy|Romance 5.0
	 Seven (a.k.a. Se7en) (1995) Mystery|Thriller 5.0
	 Apollo 13 (1995) Adventure|Drama|IMAX 4.0
	 Die Hard: With a Vengeance (1995) Action|Crime|Thriller 4.0
	 Net, The (1995) Action|Crime|Thriller 3.0
	 Nell (1994) Drama 4.0
	 Outbreak (1995) Action|Drama|Sci-Fi|Thriller 3.0
	 Pulp Fiction (1994) Comedy|Crime|Drama|Thriller 5.0
	 Stargate (1994) Action|Adventure|Sci-Fi 4.0
	 Santa Clause, The (1994) Comedy|Drama|Fantasy 4.0
	 Shawshank Redemption, The (1994) Crime|Drama 4.0
	 Ace Ventura: Pet Detective (1994) Comedy 1.0
	 Clear and Present Danger (1994) Action|Crime|Drama|Thriller 5.0
	 Lion King, The (1994) Adventure|Animation|Children|Drama|Musical|IMAX 5.0
	 Mask, The (1994) Action|Comedy|Crime|Fantasy 5.0
	 Speed (1994) Action|Romance|Thriller 5.0
	 True Lies (1994) Action|Adventu

In [241]:
# explore model parameters
#a = explode(model.itemFactors.features) #orderBy('features').select("id")
print(type(a))
#sort("id").collect()
 
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

model_ = model.itemFactors\
                .withColumn("fs", to_array(col("features")))\
                .select(["id"] + [col("fs")[i] for i in range(K)])
print(type(model_))
model_ = model_.sort(col("fs[0]")).select("id")
print(type(model_))

<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
