In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = '5g'
spark = SparkSession.builder.appName('movie-recommendation')\
        .config('spark.executor.memory', MAX_MEMORY)\
        .config('spark.driver.memory', MAX_MEMORY)\
        .getOrCreate()

In [5]:
rating_file = r'./data/ml-25m/ratings.csv'
rating_df = spark.read.csv(rating_file, inferSchema=True, header=True)

In [7]:
rating_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [8]:
rating_df = rating_df.select(['userid','movieid','rating'])

In [9]:
rating_df.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- rating: double (nullable = true)



In [10]:
rating_df.select('rating').describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          25000095|
|   mean| 3.533854451353085|
| stddev|1.0607439611423508|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [11]:
train_df, test_df = rating_df.randomSplit([0.8,0.2])

In [12]:
from pyspark.ml.recommendation import ALS

In [16]:
als = ALS(
    maxIter=5,
    regParam=0.1,
    userCol='userid',
    itemCol='movieid',
    ratingCol='rating',
    coldStartStrategy='drop'
)

Exception ignored in: <function JavaWrapper.__del__ at 0x076FACD0>
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\ml\wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'ALS' object has no attribute '_java_obj'


In [17]:
model = als.fit(train_df)

In [18]:
predictions = model.transform(test_df)

In [19]:
predictions.show()

+------+-------+------+----------+
|userid|movieid|rating|prediction|
+------+-------+------+----------+
|   101|   8638|   5.0| 3.5119193|
|   155|   1580|   2.5| 3.4675193|
|   243|   1580|   3.0| 2.6661637|
|   322|    463|   3.0| 3.2045796|
|   472|   1088|   4.0| 3.4170225|
|   472|   3918|   3.0| 2.3723238|
|   497|   2366|   4.0| 3.8124385|
|   501|   1580|   5.0|   3.87407|
|   597|   2142|   2.0| 3.2140796|
|   606|   4519|   4.0| 3.9513927|
|   606|  68135|   3.5|  3.871873|
|   626|   2122|   2.0|  2.243652|
|   626|   2866|   3.0| 3.3826358|
|   626|   3175|   4.0|  3.450072|
|   626|   3997|   2.0|   2.11784|
|   626|  36525|   4.0| 3.3286457|
|   633|   1591|   5.0| 3.4640694|
|   772|    471|   4.0|  3.476862|
|   772|   1591|   1.0| 2.0172691|
|   772|   1645|   3.0| 2.9905553|
+------+-------+------+----------+
only showing top 20 rows



In [21]:
predictions.select('rating', 'prediction').describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           4995895|           4995895|
|   mean|3.5338175642202247| 3.403473807382258|
| stddev| 1.060629769668916|0.6402685908998172|
|    min|               0.5|        -1.5802333|
|    max|               5.0|          6.749661|
+-------+------------------+------------------+



In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',predictionCol='prediction')

In [23]:
rmse = evaluator.evaluate(predictions)

In [24]:
print(rmse)

0.8131680389985763


In [25]:
model.recommendForAllUsers(3).show()



+------+--------------------+
|userid|     recommendations|
+------+--------------------+
|    26|[{203086, 5.84763...|
|    27|[{177411, 6.34608...|
|    28|[{194434, 7.45642...|
|    31|[{194334, 3.85096...|
|    34|[{194434, 5.57265...|
|    44|[{194434, 6.81767...|
|    53|[{194334, 7.03597...|
|    65|[{194434, 6.13258...|
|    76|[{203086, 6.17230...|
|    78|[{203086, 6.74651...|
|    81|[{159761, 5.21492...|
|    85|[{205453, 5.53288...|
|   101|[{203086, 5.42073...|
|   103|[{203086, 5.55319...|
|   108|[{194434, 5.53796...|
|   115|[{203086, 6.22221...|
|   126|[{203086, 6.54823...|
|   133|[{203086, 5.15062...|
|   137|[{203086, 5.58885...|
|   148|[{194434, 5.96479...|
+------+--------------------+
only showing top 20 rows



In [26]:
model.recommendForAllItems(3).show()

+-------+--------------------+
|movieid|     recommendations|
+-------+--------------------+
|     12|[{87426, 4.96737}...|
|     26|[{66365, 5.141595...|
|     27|[{30897, 5.297116...|
|     28|[{58082, 5.411728...|
|     31|[{143282, 5.20534...|
|     34|[{128562, 5.70376...|
|     44|[{87426, 5.070844...|
|     53|[{138914, 5.28260...|
|     65|[{87426, 4.952166...|
|     76|[{87426, 5.166475...|
|     78|[{149507, 4.71001...|
|     81|[{7629, 4.839836}...|
|     85|[{48436, 4.926748...|
|    101|[{105801, 4.95114...|
|    103|[{26659, 5.075937...|
|    108|[{86709, 5.558506...|
|    115|[{54404, 5.565475...|
|    126|[{86668, 4.488641...|
|    133|[{108346, 5.08828...|
|    137|[{113441, 5.17785...|
+-------+--------------------+
only showing top 20 rows



In [28]:
from pyspark.sql.types import IntegerType

user_list = [10, 40, 70]
users_df = spark.createDataFrame(user_list, IntegerType()).toDF('userid')

users_df.show()

+------+
|userid|
+------+
|    10|
|    40|
|    70|
+------+



In [29]:
user_recs = model.recommendForUserSubset(users_df, 5)



In [30]:
movies_list = user_recs.collect()[0].recommendations

In [31]:
recs_df = spark.createDataFrame(movies_list)
recs_df.show()

+-------+------------------+
|movieid|            rating|
+-------+------------------+
| 194434| 6.367295742034912|
| 203086|6.2253618240356445|
| 203882|   6.0559983253479|
| 151989| 5.888882637023926|
| 183947| 5.876765251159668|
+-------+------------------+



In [32]:
movies_file = './data/ml-25m/movies.csv'
movies_df = spark.read.csv(movies_file, inferSchema=True, header=True)

In [33]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [43]:
recs_df.createOrReplaceTempView('recommendations')
movies_df.createOrReplaceTempView('movies')

In [41]:
movies_df.show(1)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+
only showing top 1 row



In [44]:
query = '''
SELECT *
FROM 
    movies JOIN recommendations
    ON movies.movieId = recommendations.movieid
ORDER BY
    rating desc
'''
recommended_movies = spark.sql(query)
recommended_movies.show(5)

+-------+--------------------+------------------+-------+------------------+
|movieId|               title|            genres|movieid|            rating|
+-------+--------------------+------------------+-------+------------------+
| 194434|   Adrenaline (1990)|(no genres listed)| 194434| 6.367295742034912|
| 203086|Truth and Justice...|             Drama| 203086|6.2253618240356445|
| 203882|Dead in the Water...|            Horror| 203882|   6.0559983253479|
| 151989|    The Thorn (1971)|            Comedy| 151989| 5.888882637023926|
| 183947|NOFX Backstage Pa...|(no genres listed)| 183947| 5.876765251159668|
+-------+--------------------+------------------+-------+------------------+



In [45]:
def get_recommendations(user_id, num_recs):
    users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userid')
    user_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_list = user_recs_df.collect()[0].recommendations
    recs_df = spark.createDataFrame(recs_list)
    recommended_movies = spark.sql(query)
    return recommended_movies

In [46]:
recs = get_recommendations(400, 5)

In [None]:
recs.toPandas()