In [1]:
# import necessary libraries
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt 



import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


#Build our Spark Session and Context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark, sc


from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import countDistinct, col

import matplotlib.pyplot as plt
%matplotlib inline

In [41]:
schema = StructType([
    StructField("user", IntegerType(), True),
    StructField("movie", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)])


df_fake_testing = spark.read.csv('data/fake_testing.csv', header=True, schema=schema)
df_requests = spark.read.csv('data/requests.csv', header=True, schema=schema)
df_training = spark.read.csv('data/training.csv', header=True, schema=schema)
df_sample = spark.read.csv('data/training.csv', header=True, schema=schema)

In [9]:
df_training.show()

+----+-----+------+---------+
|user|movie|rating|timestamp|
+----+-----+------+---------+
|6040|  858|     4|956703932|
|6040|  593|     5|956703954|
|6040| 2384|     4|956703954|
|6040| 1961|     4|956703977|
|6040| 2019|     5|956703977|
|6040| 1419|     3|956704056|
|6040|  573|     4|956704056|
|6040| 3111|     5|956704056|
|6040|  213|     5|956704056|
|6040| 3505|     4|956704056|
|6040| 1734|     2|956704081|
|6040|  912|     5|956704191|
|6040|  919|     5|956704191|
|6040| 2503|     5|956704191|
|6040|  527|     5|956704219|
|6040|  318|     4|956704257|
|6040| 1252|     5|956704257|
|6040|  649|     5|956704257|
|6040| 3289|     5|956704305|
|6040|  759|     5|956704448|
+----+-----+------+---------+
only showing top 20 rows



In [17]:
df_requests.show()

+----+-----+------+---------+
|user|movie|rating|timestamp|
+----+-----+------+---------+
|4958| 1924|  null|     null|
|4958| 3264|  null|     null|
|4958| 2634|  null|     null|
|4958| 1407|  null|     null|
|4958| 2399|  null|     null|
|4958| 3489|  null|     null|
|4958| 2043|  null|     null|
|4958| 2453|  null|     null|
|5312| 3267|  null|     null|
|5948| 3098|  null|     null|
|5948| 1180|  null|     null|
|3158| 2648|  null|     null|
| 403| 1036|  null|     null|
|3693|  468|  null|     null|
|5950| 1262|  null|     null|
|5950| 3555|  null|     null|
|5950| 3793|  null|     null|
|5950| 3578|  null|     null|
|5950| 3948|  null|     null|
|5950| 3893|  null|     null|
+----+-----+------+---------+
only showing top 20 rows



In [7]:

print('Number of unique users: {}'.format(df_training.select('user').distinct().count()))
print('Number of unique movies: {}'.format(df_training.select('movie').distinct().count()))
print('Number of rating: {}'.format(df_training.count()))

Number of unique users: 5400
Number of unique movies: 3663
Number of rating: 800001


In [33]:
als_model = ALS(userCol='user',
                itemCol='movie',
                ratingCol='rating',
                nonnegative=True,
                #NumFactors=2,
                #implicitPrefs=TRUE,
                #alpha=.5
                regParam=0.1,
                rank=10,
                seed=88
               )

recommender = als_model.fit(df_training)

In [34]:
recommender

ALSModel: uid=ALS_368cd293a1a6, rank=10

## Populate user and item factors

In [38]:
user_factor_df = recommender.userFactors
item_factor_df = recommender.itemFactors

In [48]:
predictions = recommender.transform(df_requests)

In [49]:
predictions.describe().show()

+-------+------------------+------------------+------+---------+----------+
|summary|              user|             movie|rating|timestamp|prediction|
+-------+------------------+------------------+------+---------+----------+
|  count|            200209|            200209|     0|        0|    200209|
|   mean|1511.7512249699064|1930.5866819173964|  null|     null|       NaN|
| stddev|1582.9305639554102|1129.6703496051032|  null|     null|       NaN|
|    min|                 1|                 1|  null|     null|0.31814185|
|    max|              6040|              3952|  null|     null|       NaN|
+-------+------------------+------------------+------+---------+----------+



In [50]:
predictions_df = predictions.toPandas()

In [51]:
predictions_df.head()

Unnamed: 0,user,movie,rating,timestamp,prediction
0,53,148,,,
1,4169,148,,,2.999333
2,5333,148,,,2.511278
3,4387,148,,,2.325281
4,840,148,,,2.645048


In [53]:
predictions_df = predictions_df.drop(columns=['timestamp', 'rating'])

In [57]:
predictions_df.to_csv('v1_binommial_predictions.csv')

In [44]:
predictions_df = predictions.toPandas().fillna(df_training['rating'].mean())

Exception ignored in: <function JavaWrapper.__del__ at 0x7fcf4af44af0>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'ALS' object has no attribute '_java_obj'


TypeError: 'Column' object is not callable

In [58]:
predictions_df

Unnamed: 0,user,movie,prediction
0,53,148,
1,4169,148,2.999333
2,5333,148,2.511278
3,4387,148,2.325281
4,840,148,2.645048
...,...,...,...
200204,3371,3910,3.917830
200205,1851,3910,2.894143
200206,5198,3910,3.473501
200207,1584,3910,3.218319


In [37]:
user_factors = user_factor_df.collect()[1]['features']
item_factors = item_factor_df.collect()[1]['features']

TypeError: list indices must be integers or slices, not str