In [1]:
import pyspark as ps
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from pyspark.ml.recommendation import ALS

In [2]:
movies = pd.read_table('data/movies.dat', delimiter='::', names=['movie', 'name', 'genre'], engine='python')
users = pd.read_table('data/users.dat', delimiter='::', names=['user', 'gender', 'age', 'occupation', 'zip_code'], engine='python')
requests = pd.read_csv('data/requests.csv')
training = pd.read_csv('data/training.csv')

In [5]:
mask = movies.name.apply(lambda x: True if (len(x.split('(')) > 2) else False)
movies[mask]

Unnamed: 0,movie,name,genre
29,30,Shanghai Triad (Yao a yao yao dao waipo qiao) ...,Drama
46,47,Seven (Se7en) (1995),Crime|Thriller
57,58,"Postino, Il (The Postman) (1994)",Drama|Romance
58,59,"Confessional, The (Le Confessionnal) (1995)",Drama|Mystery
67,68,French Twist (Gazon maudit) (1995),Comedy|Romance
...,...,...,...
3794,3864,Godzilla 2000 (Gojira ni-sen mireniamu) (1999),Action|Adventure|Sci-Fi
3797,3867,All the Rage (a.k.a. It's the Rage) (1999),Drama
3822,3892,Anatomy (Anatomie) (2000),Horror
3832,3902,Goya in Bordeaux (Goya en Bodeos) (1999),Drama


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from pyspark.ml.recommendation import ALS
spark_df = spark.createDataFrame(training) 
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=50) 

als = als_model.fit(spark_df)

In [7]:
genres = set()
for lst in list(movies.genre):
    lst = lst.split('|')
    
    for genre in lst:
        genre = genre.lower()
        genre = genre.strip()
        
        # genre = '_'.join(genre.split('-'))
        genre = genre.split('\'')[0]
        genres.add(genre)  
genres

def glst(row):
    row = row.lower()
    row = row.strip()
    row = row.replace('\'s', '')
    
    return row.split('|')

movies_df = movies.copy()
# make genre to a dummy variable 
def dummy(col, data):
    data[col] = data.genre.apply(lambda x: 1 if col in glst(x) else 0)

for c in list(genres):
    dummy(c, movies_df)

In [8]:
spark = ps.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame(training.drop('timestamp', axis = 1))
train, test = df.randomSplit([0.8, 0.2], seed = 427471138)

In [23]:


# Make spark data frame and train and test split 
# Set up spark 
#### Setting Up Spark Session


# Create First model 
params = {'itemCol': 'movie',
          'userCol': 'user',
          'ratingCol': 'rating',
          'nonnegative': True,
          'regParam': 0.1,
          'rank': 50 }

als_model = ALS(**params)
recommender = als_model.fit(train)
test_pred = recommender.transform(test).toPandas()



In [28]:
test_pred
test = pd.read_csv('data/fake_testing.csv')
spark_df2 = spark.createDataFrame(test) 
recommender.transform(spark_df2).toPandas()

mask = test_pred.prediction.isna()
temp = test_pred[~mask]
mean_squared_error(temp.rating, temp.prediction)

Unnamed: 0,user,movie,actualrating,prediction
0,53,148,2,
1,4169,148,4,3.104105
2,5333,148,1,2.451304
3,4387,148,1,1.976113
4,840,148,1,2.979337
...,...,...,...,...
200204,3371,3910,1,3.777941
200205,1851,3910,3,2.891895
200206,5198,3910,3,3.457453
200207,1584,3910,1,3.217384


In [25]:
# do first RMSE test 
mask = test_pred.prediction.isna()
temp = test_pred[~mask]
mean_squared_error(temp.rating, temp.prediction)

0.7695322320760489

In [12]:
def cv_search(df, regParams, ranks):
    score = []
    train, test = df.randomSplit([0.8, 0.2])
    
        
    for regParam in regParams:
        for rank in ranks:
            params = {'itemCol': 'movie',
            'userCol': 'user',
            'ratingCol': 'rating',
            'nonnegative': True,
            'regParam': regParam,
            'rank': rank }
            als_model = ALS(**params)
            recommender = als_model.fit(train)
            test_pred = recommender.transform(test).toPandas()
            mask = test_pred.prediction.isna()
            temp = test_pred[~mask]
            score.append(f'regparam: {regParam}, rank: {rank}' )
            score.append(mean_squared_error(temp.rating, temp.prediction))
            
    return score

In [13]:
# cv_search(df, [.001,.01, .1], [25, 50, 100, 200])

In [14]:
pvt_training = pd.pivot_table(training, index = 'user', columns = 'movie', values = 'rating')

In [15]:
# rec = ItemItemRecommender(neighborhood_size=75)
# rec.fit(pvt_training)
# rec.pred_one_user(user_id=600)
pvt_training

movie,1,2,3,4,5,6,7,8,9,10,...,3943,3944,3945,3946,3947,3948,3949,3950,3951,3952
user,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
636,,,,,,,,,,,...,,,,,,,,,,
637,5.0,,,,,,,,,,...,,,,,,,,,,
638,,,,,,,,,,,...,,,,,,,,,,
639,,,,,,,,,,,...,,,,,,,,,,
640,,,,,,4.0,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6036,,,,2.0,,3.0,,,,,...,,,,,,,,,,
6037,,,,,,,,,,,...,,,,,,,,,,
6038,,,,,,,,,,,...,,,,,,,,,,
6039,,,,,,,,,,,...,,,,,,,,,,


In [16]:
from sklearn.impute import KNNImputer
from sklearn.metrics.pairwise import cosine_similarity
imp = KNNImputer()
imp.fit(pvt_training)

X = imp.transform(pvt_training)
sim = cosine_similarity(X,X)
sim


array([[1.        , 0.98325593, 0.98749047, ..., 0.98361073, 0.98684473,
        0.9783377 ],
       [0.98325593, 1.        , 0.98478588, ..., 0.98333822, 0.98577175,
        0.98153737],
       [0.98749047, 0.98478588, 1.        , ..., 0.98511678, 0.98738955,
        0.98038007],
       ...,
       [0.98361073, 0.98333822, 0.98511678, ..., 1.        , 0.98543536,
        0.98231828],
       [0.98684473, 0.98577175, 0.98738955, ..., 0.98543536, 1.        ,
        0.98284519],
       [0.9783377 , 0.98153737, 0.98038007, ..., 0.98231828, 0.98284519,
        1.        ]])

In [22]:
sim[0] *5

array([5.        , 4.91627967, 4.93745233, ..., 4.91805367, 4.93422363,
       4.89168852])

In [30]:
X2 = pvt_training.fillna(0)
cosine_similarity(X2,X2)


array([[1.        , 0.17466465, 0.17098663, ..., 0.04698045, 0.06992656,
        0.09093562],
       [0.17466465, 1.        , 0.23531432, ..., 0.07014394, 0.1668649 ,
        0.26487565],
       [0.17098663, 0.23531432, 1.        , ..., 0.06820819, 0.0685784 ,
        0.18678288],
       ...,
       [0.04698045, 0.07014394, 0.06820819, ..., 1.        , 0.16171426,
        0.08653594],
       [0.06992656, 0.1668649 , 0.0685784 , ..., 0.16171426, 1.        ,
        0.21094536],
       [0.09093562, 0.26487565, 0.18678288, ..., 0.08653594, 0.21094536,
        1.        ]])

In [58]:
pvt_training = pvt_training.fillna(0)
sim = cosine_similarity(pvt_training,pvt_training)

In [31]:
sim

array([[1.        , 0.98325593, 0.98749047, ..., 0.98361073, 0.98684473,
        0.9783377 ],
       [0.98325593, 1.        , 0.98478588, ..., 0.98333822, 0.98577175,
        0.98153737],
       [0.98749047, 0.98478588, 1.        , ..., 0.98511678, 0.98738955,
        0.98038007],
       ...,
       [0.98361073, 0.98333822, 0.98511678, ..., 1.        , 0.98543536,
        0.98231828],
       [0.98684473, 0.98577175, 0.98738955, ..., 0.98543536, 1.        ,
        0.98284519],
       [0.9783377 , 0.98153737, 0.98038007, ..., 0.98231828, 0.98284519,
        1.        ]])

In [56]:
pvt_training

movie,1,2,3,4,5,6,7,8,9,10,...,3943,3944,3945,3946,3947,3948,3949,3950,3951,3952
user,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
636,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
637,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
638,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
639,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
640,0.0,0.0,0.0,0.0,0.0,4.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6036,0.0,0.0,0.0,2.0,0.0,3.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6037,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6038,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6039,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [67]:
pd.DataFrame(sim)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,5389,5390,5391,5392,5393,5394,5395,5396,5397,5398
0,1.000000,0.174665,0.170987,0.157088,0.000000,0.020390,0.131076,0.021000,0.126666,0.021551,...,0.111710,0.047174,0.183838,0.000000,0.213842,0.192785,0.125157,0.046980,0.069927,0.090936
1,0.174665,1.000000,0.235314,0.230834,0.046894,0.189858,0.100528,0.052834,0.111137,0.122242,...,0.152505,0.247358,0.214006,0.060986,0.165909,0.354850,0.291603,0.070144,0.166865,0.264876
2,0.170987,0.235314,1.000000,0.170137,0.026690,0.087494,0.073155,0.060311,0.132101,0.040049,...,0.243314,0.080705,0.125949,0.039858,0.127226,0.182618,0.117020,0.068208,0.068578,0.186783
3,0.157088,0.230834,0.170137,1.000000,0.065769,0.021692,0.124507,0.173628,0.154658,0.080744,...,0.200709,0.087696,0.401663,0.045211,0.133153,0.212165,0.116557,0.025466,0.049340,0.102460
4,0.000000,0.046894,0.026690,0.065769,1.000000,0.128755,0.098976,0.157864,0.006650,0.105305,...,0.086754,0.039279,0.000000,0.050674,0.096608,0.133385,0.193022,0.000000,0.006682,0.162138
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5394,0.192785,0.354850,0.182618,0.212165,0.133385,0.173980,0.103364,0.190972,0.212391,0.186790,...,0.131294,0.209843,0.186426,0.103431,0.267405,1.000000,0.341462,0.124174,0.219115,0.400968
5395,0.125157,0.291603,0.117020,0.116557,0.193022,0.268235,0.097265,0.108147,0.179449,0.297592,...,0.142309,0.276134,0.129985,0.118749,0.141676,0.341462,1.000000,0.049015,0.252146,0.428425
5396,0.046980,0.070144,0.068208,0.025466,0.000000,0.119295,0.000000,0.041260,0.012359,0.084686,...,0.108837,0.106897,0.040689,0.000000,0.063967,0.124174,0.049015,1.000000,0.161714,0.086536
5397,0.069927,0.166865,0.068578,0.049340,0.006682,0.160267,0.062908,0.041635,0.120970,0.167874,...,0.118776,0.250994,0.053750,0.102168,0.068399,0.219115,0.252146,0.161714,1.000000,0.210945


In [57]:
pd.DataFrame(X)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,3652,3653,3654,3655,3656,3657,3658,3659,3660,3661
0,4.0,4.0,4.0,3.6,3.4,4.2,4.0,3.4,3.2,4.2,...,2.6,2.0,1.4,1.8,3.4,4.0,5.0,3.0,4.2,4.2
1,5.0,3.8,2.8,3.4,3.4,3.8,3.4,3.4,2.8,3.8,...,3.0,2.0,1.0,2.0,3.0,3.0,5.0,3.2,4.2,4.4
2,4.4,4.0,3.6,2.2,2.8,4.2,4.0,3.6,3.6,3.8,...,2.8,2.0,1.6,2.2,3.2,4.2,5.0,3.4,4.4,3.4
3,3.8,3.4,2.6,3.2,2.6,4.4,3.8,2.8,3.0,3.8,...,2.6,2.0,1.4,2.2,3.8,3.6,4.8,3.4,4.4,3.8
4,4.4,3.6,3.2,2.6,3.0,4.0,3.6,2.4,2.4,4.0,...,2.0,2.0,1.6,2.0,4.2,3.8,4.4,3.6,4.0,4.2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5394,5.0,2.6,2.6,2.0,2.2,3.0,2.8,2.6,2.2,3.0,...,3.0,2.0,1.0,2.4,4.0,3.8,4.4,3.4,4.4,3.6
5395,4.0,3.2,3.4,3.4,3.2,3.8,3.8,3.0,2.4,3.4,...,2.2,2.0,1.2,2.8,3.4,3.8,4.8,3.0,4.8,3.6
5396,4.0,2.4,3.4,3.2,3.2,4.8,4.0,2.8,2.6,4.4,...,2.6,2.0,2.0,2.0,3.2,4.0,4.2,3.6,4.4,3.4
5397,3.8,3.2,2.6,2.8,3.2,3.8,3.2,3.2,3.4,4.0,...,3.2,2.0,1.4,2.2,2.6,4.2,4.4,3.4,4.6,3.6


In [None]:
params = {'itemCol': 'movie',
          'userCol': 'user',
          'ratingCol': 'rating',
          'nonnegative': True,
          'regParam': 0.1,
          'rank': 10 }

als_model = ALS(**params)
recommender = als_model.fit(train)
test_pred = recommender.transform(test).toPandas()


In [59]:
user = pd.read_pickle('data/user_meta.pkl')
movie = pd.read_pickle('data/movie_meta.pkl')
test = pd.read_csv('data/fake_testing.csv')

In [44]:
df = pd.merge(left = training, right = movie, left_on = 'movie', right_on = 'movie')
df = pd.merge(left = df, right = user, left_on = 'user', right_on = 'user')
df.head() 

Unnamed: 0,user,movie,rating,timestamp,adventure,fantasy,horror,musical,romance,sci-fi,...,children,crime,date,name,foreign,new,gender,age,occupation,zip_code
0,6040,858,4,956703932,0,0,0,0,0,0,...,0,1,1972,"Godfather, The",0,0,1,25,6,11106
1,6040,593,5,956703954,0,0,0,0,0,0,...,0,0,1991,"Silence of the Lambs, The",0,0,1,25,6,11106
2,6040,2384,4,956703954,0,0,0,0,0,0,...,1,0,1998,Babe: Pig in the City,0,1,1,25,6,11106
3,6040,1961,4,956703977,0,0,0,0,0,0,...,0,0,1988,Rain Man,0,0,1,25,6,11106
4,6040,2019,5,956703977,0,0,0,0,0,0,...,0,0,1954,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,1,25,6,11106


In [61]:
df

Unnamed: 0,user,movie,rating,timestamp,adventure,fantasy,horror,musical,romance,sci-fi,...,children,crime,date,name,foreign,new,gender,age,occupation,zip_code
0,6040,858,4,956703932,0,0,0,0,0,0,...,0,1,1972,"Godfather, The",0,0,1,25,6,11106
1,6040,593,5,956703954,0,0,0,0,0,0,...,0,0,1991,"Silence of the Lambs, The",0,0,1,25,6,11106
2,6040,2384,4,956703954,0,0,0,0,0,0,...,1,0,1998,Babe: Pig in the City,0,1,1,25,6,11106
3,6040,1961,4,956703977,0,0,0,0,0,0,...,0,0,1988,Rain Man,0,0,1,25,6,11106
4,6040,2019,5,956703977,0,0,0,0,0,0,...,0,0,1954,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,1,25,6,11106
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
799995,3537,3859,1,966883351,0,0,0,0,0,0,...,0,0,2000,"Eyes of Tammy Faye, The",0,1,1,1,10,97402
799996,3537,3861,3,966883293,0,0,0,0,0,0,...,0,0,2000,"Replacements, The",0,1,1,1,10,97402
799997,3537,3857,1,966883408,0,0,0,0,0,0,...,0,0,2000,Bless the Child,0,1,1,1,10,97402
799998,3537,3863,3,966883381,0,0,0,0,0,1,...,0,0,2000,"Cell, The",0,1,1,1,10,97402


In [72]:
spark_df = spark.createDataFrame(df) 
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=50) 

als = als_model.fit(spark_df)


In [73]:
test_df = spark.createDataFrame(test) 

In [74]:
als_test = als.transform(test_df)
als_test.show(10)

+----+-----+------------+----------+
|user|movie|actualrating|prediction|
+----+-----+------------+----------+
|  53|  148|           2|       NaN|
|4169|  148|           4| 3.0613718|
|5333|  148|           1| 2.4647985|
|4387|  148|           1| 2.2179437|
| 840|  148|           1|  2.728621|
| 216|  148|           3|       NaN|
| 482|  148|           2|       NaN|
| 752|  148|           1|  2.855463|
| 424|  148|           4|       NaN|
| 970|  463|           1| 2.7486236|
+----+-----+------------+----------+
only showing top 10 rows



In [75]:
test_df

DataFrame[user: bigint, movie: bigint, actualrating: bigint]

In [76]:
test_pred = als_test.toPandas()

mask = test_pred.prediction.isna()
mean_squared_error(test_pred.loc[mask,'prediction'], test.loc[mask,'actualrating'])

ValueError: Input contains NaN, infinity or a value too large for dtype('float32').

In [62]:
test_pred

Unnamed: 0,user,movie,prediction
0,673,148,4.090493
1,4227,148,1.999880
2,3184,148,3.230587
3,4784,148,2.843972
4,2383,148,2.385995
...,...,...,...
799995,745,3910,1.411141
799996,2507,3910,4.043521
799997,1138,3910,4.205136
799998,1413,3910,4.295969


In [58]:
test_pred

Unnamed: 0,user,movie,prediction
0,673,148,4.090493
1,4227,148,1.999880
2,3184,148,3.230587
3,4784,148,2.843972
4,2383,148,2.385995
...,...,...,...
799995,745,3910,1.411141
799996,2507,3910,4.043521
799997,1138,3910,4.205136
799998,1413,3910,4.295969
