In [1]:
import pyspark 

In [3]:
spark  = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [38]:
ratings = spark.read.json('data/ratings.json')

In [39]:
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

In [40]:
ratings.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [76]:
import pandas as pd 
movies = pd.read_csv('data/movies.dat', sep='::', engine='python', header=None)
movies.head()

Unnamed: 0,0,1,2
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama
4,5,Father of the Bride Part II (1995),Comedy


In [98]:
users = pd.read_csv('data/users.dat', sep='::', engine='python', header=None)
users.head()

Unnamed: 0,0,1,2,3,4
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [104]:
users = users.rename({0:'user_id', 
              1:'gender', 
              2:'min_age', 
              3:'occupation', 
              4:'zipcode'}, 
             axis=1)
users.age.value_counts()

25    2096
35    1193
18    1103
45     550
50     496
56     380
1      222
Name: age, dtype: int64

In [41]:
requests = spark.read.json('data/requests.json')

In [42]:
requests.persist()

DataFrame[movie_id: bigint, rating: double, timestamp: double, user_id: bigint]

In [43]:
requests.count()

280260

In [32]:
#import libraries for shitty model 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel 
ratings.printSchema

<bound method DataFrame.printSchema of DataFrame[{"user_id": 6040: string,  "movie_id": 858: string,  "rating": 4: string,  "timestamp": 956678732.0}: string]>

In [44]:
ratings_df = ratings.toPandas()

In [60]:
ratings_df.sort_values(by='timestamp', ascending=False)

Unnamed: 0,movie_id,rating,timestamp,user_id
719948,892,4,975739919.0,1875
719947,802,4,975739918.0,1875
719946,2303,4,975739907.0,635
719945,2966,4,975739907.0,635
719944,3198,4,975739907.0,635
719943,1884,4,975739907.0,635
719942,838,4,975739906.0,1875
719941,3479,4,975739906.0,1875
719939,2108,4,975739892.0,1875
719940,1680,4,975739892.0,1875


In [61]:
als = ALS(
    rank=11,
    userCol='user_id',
    itemCol='movie_id',
    ratingCol='rating'
)

In [62]:
als_model = als.fit(ratings)

In [63]:
type(als_model)

pyspark.ml.recommendation.ALSModel

In [65]:
preds = als_model.transform(ratings)

In [66]:
preds.show(5)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     148|     5|9.75592024E8|    673| 4.2179713|
|     148|     2|9.65634524E8|   4227|  2.082299|
|     148|     4|9.68683753E8|   3184| 3.4188745|
|     148|     3| 9.6997537E8|   4784| 2.9637852|
|     148|     2|9.74388854E8|   2383| 2.3499334|
+--------+------+------------+-------+----------+
only showing top 5 rows



In [68]:
reg_ev = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [69]:
rmse_model_1 = reg_ev.evaluate(preds)

In [70]:
rmse_model_1

0.8091218175453062

In [71]:
request_preds = als_model.transform(requests)

In [78]:
request_preds.show(20)

+--------+------+-------------+-------+----------+
|movie_id|rating|    timestamp|user_id|prediction|
+--------+------+-------------+-------+----------+
|     148|   NaN| 9.77959026E8|     53|       NaN|
|     148|   NaN| 9.76559602E8|   4169| 3.2321188|
|     148|   NaN| 9.89024856E8|   5333| 2.3576462|
|     148|   NaN| 9.77005381E8|   4387| 1.9873172|
|     148|   NaN| 9.66907208E8|   3539| 2.8987255|
|     148|   NaN| 9.76266538E8|    840| 2.5574653|
|     148|   NaN| 9.76841639E8|    216|       NaN|
|     148|   NaN| 9.76191154E8|    482|       NaN|
|     148|   NaN|1.029283935E9|    752| 3.2906346|
|     148|   NaN|1.026978024E9|    424|       NaN|
|     148|   NaN| 9.74150193E8|   2456| 2.7647066|
|     148|   NaN|  9.7014489E8|   3053| 2.8814397|
|     463|   NaN| 9.80596453E8|    970| 2.7192957|
|     463|   NaN| 9.76560887E8|   4169|   2.44903|
|     463|   NaN| 9.78242788E8|     26|       NaN|
|     463|   NaN| 9.76395651E8|    319|       NaN|
|     463|   NaN| 9.76907712E8|

In [93]:
#confirm that Nan in prediction column are cold-start users
user_factors = als_model.userFactors
row_216 = user_factors[user_factors['id']==216]
#.first() takes it out of the spark dataframe type 

In [95]:
row_216.show()
#no values for the cold-start users

+---+--------+
| id|features|
+---+--------+
+---+--------+

