In [84]:
import pyspark as ps
import pandas as pd
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext
from sklearn.feature_extraction.text import CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import col, split

## Load Data

In [80]:
# small data set:
ratings_df = spark.read.csv('../data/movies/ratings.csv', header=True, inferSchema=True)

# full data set:
# ratings_df = spark.read.csv('../data/ml-latest/ratings.csv', header=True, inferSchema=True)

ratings_df = ratings_df.drop("timestamp");
print(ratings_df.show(5))
print(ratings_df.describe)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows

None
<bound method DataFrame.describe of DataFrame[userId: int, movieId: int, rating: double]>


In [81]:
# small data set:
movies_df = spark.read.csv('../data/movies/movies.csv', header=True, inferSchema=True)

# full data set:
# movies_df = spark.read.csv('../data/ml-latest/movies.csv', header=True, inferSchema=True)

print(movies_df.show(5))
print(movies_df.describe)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

None
<bound method DataFrame.describe of DataFrame[movieId: int, title: string, genres: string]>


### Build ALS model

In [82]:
train_df, test_df, holdout_df = ratings_df.randomSplit([0.6, 0.2, 0.2], seed=42)

In [5]:
seed = 42
iterations = 10
reg_param = 0.1
ranks = [4]
rmse_list = []

for rank in ranks:
    als = ALS(
        itemCol='movieId',
        userCol='userId',
        ratingCol='rating',
        nonnegative=True,
        regParam=reg_param,
        rank=rank,
        seed=seed,
        maxIter=iterations,
        coldStartStrategy="drop"
        )
    
    model = als.fit(train_df)
    predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    rmse_list.append(rmse)
    print(f"Rank = {rank};  RMSE = {rmse}")

Rank = 4;  RMSE = 0.9337591042117053


In [6]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# # Generate top 10 movie recommendations for a specified set of users
# users = ratings.select(als.getUserCol()).distinct().limit(3)
# userSubsetRecs = model.recommendForUserSubset(users, 10)
# # Generate top 10 user recommendations for a specified set of movies
# movies = ratings.select(als.getItemCol()).distinct().limit(3)
# movieSubSetRecs = model.recommendForItemSubset(movies, 10)


In [7]:
movieRecs.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[113, 4.985595],...|
|   5300|[[46, 6.687193], ...|
|   6620|[[477, 5.1323185]...|
|  54190|[[113, 6.956951],...|
|    471|[[156, 5.4956512]...|
+-------+--------------------+
only showing top 5 rows



In [8]:
train_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
+------+-------+------+
only showing top 5 rows



In [9]:
seed = 42
# iterations = [5, 10, 15]
iterations = [10]
reg_param = 0.1
rank = 2
rmse_list = []

for iteration in iterations:
    als = ALS(
        itemCol='movieId',
        userCol='userId',
        ratingCol='rating',
        nonnegative=True,
        regParam=reg_param,
        rank=rank,
        seed=seed,
        maxIter=iteration,
        coldStartStrategy="drop"
        )
    
    model = als.fit(train_df)
    predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    rmse_list.append(rmse)
    print(f"Iterations = {iteration};  RMSE = {rmse}")

Iterations = 10;  RMSE = 0.9276785383792665


## Score Model On Holdout

In [88]:
import timeit

In [92]:
import timeit

seed = 42
iterations = 6
reg_param = 0.1
rank = 2

als = ALS(
    itemCol='movieId',
    userCol='userId',
    ratingCol='rating',
    nonnegative=True,
    regParam=reg_param,
    rank=rank,
    seed=seed,
    maxIter=iteration,
    coldStartStrategy="drop"
    )

model = als.fit(train_df.union(test_df))

# start timer
start = timeit.default_timer()

predictions = model.transform(holdout_df)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
# stop timer
stop = timeit.default_timer()

rmse_list.append(rmse)
print(f"Final Holdout Test: Rank = {rank}; Iterations = {iteration};\nRMSE = {rmse:.2f}")
print(f"Time to complete ALS: {stop-start:.2f} seconds")

Final Holdout Test: Rank = 2; Iterations = 10;
RMSE = 0.92
Time to complete ALS: 7.27 seconds


### Content-Based Recommender (Genres)

In [66]:
# print(movies_df.show(5))

In [67]:
movies_df = movies_df.drop("title");

In [68]:
print(movies_df.show(5))

+-------+--------------------+
|movieId|              genres|
+-------+--------------------+
|      1|Adventure|Animati...|
|      2|Adventure|Childre...|
|      3|      Comedy|Romance|
|      4|Comedy|Drama|Romance|
|      5|              Comedy|
+-------+--------------------+
only showing top 5 rows

None


In [69]:
movies_df.columns

['movieId', 'genres']

In [70]:
# temp_df = movies_df.rdd.map(lambda x: (x["movieId"], '|'.join(x["genres"])))\
#                             .toDF(["movieId", "genres"])

In [71]:
temp_df = movies_df.rdd.map(lambda x: (x["movieId"], x["genres"].replace('|', ' ')))\
                            .toDF(["movieId", "genres"])

In [72]:
temp_df = temp_df.withColumn("genres", split(col("genres"), " "))

In [73]:
print(temp_df.show(5))

+-------+--------------------+
|movieId|              genres|
+-------+--------------------+
|      1|[Adventure, Anima...|
|      2|[Adventure, Child...|
|      3|   [Comedy, Romance]|
|      4|[Comedy, Drama, R...|
|      5|            [Comedy]|
+-------+--------------------+
only showing top 5 rows

None


In [74]:
genre_df = temp_df.drop("movieId");

In [76]:
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="genres", outputCol="features")

model = cv.fit(temp_df)

result = model.transform(temp_df)
result.show(truncate=False)


+-------+-------------------------------------------------+----------------------------------------+
|movieId|genres                                           |features                                |
+-------+-------------------------------------------------+----------------------------------------+
|1      |[Adventure, Animation, Children, Comedy, Fantasy]|(22,[1,5,9,10,13],[1.0,1.0,1.0,1.0,1.0])|
|2      |[Adventure, Children, Fantasy]                   |(22,[5,9,10],[1.0,1.0,1.0])             |
|3      |[Comedy, Romance]                                |(22,[1,3],[1.0,1.0])                    |
|4      |[Comedy, Drama, Romance]                         |(22,[0,1,3],[1.0,1.0,1.0])              |
|5      |[Comedy]                                         |(22,[1],[1.0])                          |
|6      |[Action, Crime, Thriller]                        |(22,[2,4,6],[1.0,1.0,1.0])              |
|7      |[Comedy, Romance]                                |(22,[1,3],[1.0,1.0])            

In [77]:
from sklearn.metrics.pairwise import cosine_similarity

In [95]:
cos_mat = cosine_similarity(result['features'], result['features'])

ValueError: setting an array element with a sequence.