In [1]:
import numpy as np
import pandas as pd
from pyspark.sql.types import IntegerType
from pyspark.ml.recommendation import ALS
import matplotlib.pyplot as plt
import pyspark as ps
from sklearn.model_selection import train_test_split
from noah_cleaning1 import get_frames

In [2]:
spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
ratings_data = pd.read_csv("training.csv")
ratings_data.head()

Unnamed: 0,user,movie,rating,timestamp
0,6040,858,4,956703932
1,6040,593,5,956703954
2,6040,2384,4,956703954
3,6040,1961,4,956703977
4,6040,2019,5,956703977


In [4]:
movie_data = pd.read_csv("movies.dat",delimiter = "::",names=["movie","title","genre"])

  """Entry point for launching an IPython kernel.


In [5]:
movie_data.head()

Unnamed: 0,movie,title,genre
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama
4,5,Father of the Bride Part II (1995),Comedy


In [6]:
dummy_cols = movie_data.genre.str.get_dummies()

In [7]:
movie_data = pd.concat((movie_data,dummy_cols),axis = 1)

In [8]:
movie_data= movie_data.drop("genre",axis=1)

In [9]:
user_data = pd.read_csv("users.dat",delimiter = "::",names=["user","gender","age","occupation","zipcode"])

  """Entry point for launching an IPython kernel.


In [10]:
user_data.head()

Unnamed: 0,user,gender,age,occupation,zipcode
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [11]:
movie_data["year"]=movie_data["title"]
movie_data["year"] = movie_data["year"].apply(lambda x: x[-5:-1])
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story (1995),0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji (1995),0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men (1995),0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale (1995),0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II (1995),0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [12]:
movie_data["title"] = movie_data["title"].apply(lambda x: x[:-7])
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story,0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji,0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men,0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale,0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [13]:
genres = set()
for i in movie_data['genre'].unique():
    genre_list = i.split("|")
    for item in genre_list:
        genres.add(item)
genres 
#movie_data['genre'].unique()

KeyError: 'genre'

In [None]:
movie_data["genre"] = movie_data["genre"].apply(lambda x: x.split("|"))

In [14]:
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story,0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji,0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men,0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale,0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [15]:
user_data["gender"] = user_data["gender"].map({"M":1,"F":0})

In [16]:
user_data.head()

Unnamed: 0,user,gender,age,occupation,zipcode
0,1,0,1,10,48067
1,2,1,56,16,70072
2,3,1,25,15,55117
3,4,1,45,7,2460
4,5,1,25,20,55455


In [17]:
movie_rating = pd.merge(ratings_data,movie_data,how="left",left_on ="movie",right_on="movie")

In [18]:
movie_rating.head(10)

Unnamed: 0,user,movie,rating,timestamp,title,Action,Adventure,Animation,Children's,Comedy,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,6040,858,4,956703932,"Godfather, The",1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1972
1,6040,593,5,956703954,"Silence of the Lambs, The",0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1991
2,6040,2384,4,956703954,Babe: Pig in the City,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,1998
3,6040,1961,4,956703977,Rain Man,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1988
4,6040,2019,5,956703977,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1954
5,6040,1419,3,956704056,Walkabout,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1971
6,6040,573,4,956704056,"Ciao, Professore! (Io speriamo che me la cavo )",0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1993
7,6040,3111,5,956704056,Places in the Heart,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1984
8,6040,213,5,956704056,Burnt By the Sun (Utomlyonnye solntsem),0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1994
9,6040,3505,4,956704056,No Way Out,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1987


In [19]:
user_rating = pd.merge(ratings_data,user_data,how="left",left_on ="user",right_on="user")

In [20]:
user_rating.sample(10)

Unnamed: 0,user,movie,rating,timestamp,gender,age,occupation,zipcode
733992,1983,2664,5,975055397,1,25,14,92109
434990,3167,293,5,968817474,1,25,2,77056
546691,2419,2396,4,974301874,1,25,0,6096
544520,2406,2791,3,974255599,0,56,12,1520
16749,5915,2124,3,957404449,1,18,4,58102
193598,4658,2532,3,963879029,1,25,4,99163
407657,3336,170,1,967854350,1,35,17,93436
500669,2777,3690,4,973048516,1,18,4,95326
367176,3673,2908,5,966745463,1,25,2,10003
581518,2265,2642,2,974653865,1,56,13,60506


In [21]:
final_train = pd.merge(movie_rating,user_rating,on=["user","movie","rating","timestamp"])

In [22]:
final_train.head()

Unnamed: 0,user,movie,rating,timestamp,title,Action,Adventure,Animation,Children's,Comedy,...,Romance,Sci-Fi,Thriller,War,Western,year,gender,age,occupation,zipcode
0,6040,858,4,956703932,"Godfather, The",1,0,0,0,0,...,0,0,0,0,0,1972,1,25,6,11106
1,6040,593,5,956703954,"Silence of the Lambs, The",0,0,0,0,0,...,0,0,1,0,0,1991,1,25,6,11106
2,6040,2384,4,956703954,Babe: Pig in the City,0,0,0,1,1,...,0,0,0,0,0,1998,1,25,6,11106
3,6040,1961,4,956703977,Rain Man,0,0,0,0,0,...,0,0,0,0,0,1988,1,25,6,11106
4,6040,2019,5,956703977,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,0,0,0,...,0,0,0,0,0,1954,1,25,6,11106


In [82]:
final_train.head()

Unnamed: 0,user,movie,rating,timestamp,title,Action,Adventure,Animation,Children's,Comedy,...,Romance,Sci-Fi,Thriller,War,Western,year,gender,age,occupation,zipcode
0,6040,858,4,956703932,"Godfather, The",1,0,0,0,0,...,0,0,0,0,0,1972,1,25,6,11106
1,6040,593,5,956703954,"Silence of the Lambs, The",0,0,0,0,0,...,0,0,1,0,0,1991,1,25,6,11106
2,6040,2384,4,956703954,Babe: Pig in the City,0,0,0,1,1,...,0,0,0,0,0,1998,1,25,6,11106
3,6040,1961,4,956703977,Rain Man,0,0,0,0,0,...,0,0,0,0,0,1988,1,25,6,11106
4,6040,2019,5,956703977,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,0,0,0,...,0,0,0,0,0,1954,1,25,6,11106


In [23]:
X = final_train[['user','movie','rating']]

In [25]:
training_means = X['rating'].mean()

movie_means = X.groupby('movie')['rating'].mean()
user_means = X.groupby('user')['rating'].mean()

In [26]:
spark_df = spark.createDataFrame(X)

In [27]:
train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=427471138)

In [28]:
# Create an untrained ALS model.
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=10) 


In [29]:
recommender = als_model.fit(train_df)

In [30]:
y_pred = recommender.transform(test_df)

In [93]:
y_pred.show(5)

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
|2383|  148|     2|  2.551038|
|1069|  148|     2|  2.986098|
|2456|  148|     2| 2.8667297|
|3683|  463|     1| 1.4679407|
|3562|  463|     2|   2.78387|
+----+-----+------+----------+
only showing top 5 rows



In [94]:
y_pred.describe().show()

+-------+------------------+------------------+------------------+----------+
|summary|              user|             movie|            rating|prediction|
+-------+------------------+------------------+------------------+----------+
|  count|            159972|            159972|            159972|    159972|
|   mean|3408.0624734328508| 1850.770822393919|3.5929037581576777|       NaN|
| stddev|1546.8362429895333|1088.9803317800838|1.1196836087607198|       NaN|
|    min|               636|                 1|                 1| 0.2726254|
|    max|              6040|              3952|                 5|       NaN|
+-------+------------------+------------------+------------------+----------+



In [31]:
pd_y_pred = y_pred.toPandas()

In [98]:
#pd_y_pred = pd_y_pred.fillna(3.2)

In [64]:
pd_y_pred.describe()

Unnamed: 0,user,movie,rating,prediction,final
count,159972.0,159972.0,159972.0,159929.0,159972.0
mean,3408.062473,1850.770822,3.592904,3.426136,3.425937
std,1546.836243,1088.980332,1.119684,0.669276,0.669582
min,636.0,1.0,1.0,0.304385,0.304385
25%,2039.0,1023.0,3.0,3.026306,3.026068
50%,3509.0,1798.0,4.0,3.492768,3.492704
75%,4705.0,2759.0,4.0,3.900609,3.900619
max,6040.0,3952.0,5.0,5.518016,5.518016


In [65]:
test = pd_y_pred.apply(lambda x: fill_na(x) if pd.isnull(x.prediction) else x['prediction'], axis = 1)

In [41]:
test.head()

0    2.299468
1    2.956888
2    2.783664
3    1.504709
4    2.750967
dtype: float64

In [42]:
pd_y_pred['final'] = test

In [48]:
def get_frames1(filename,test_file = False):

    ## Reading in the data
    ratings_data = pd.read_csv(filename)

    movie_data = pd.read_csv("../data/movies.dat",
                            delimiter = "::",
                            names=["movie","title","genre"])

    user_data = pd.read_csv("../data/users.dat",
                            delimiter = "::",
                            names=["user","gender","age","occupation","zipcode"])


    ## Adding Movie Genre Dummy Cols
    dummy_cols = movie_data.genre.str.get_dummies()
    movie_data = pd.concat((movie_data,dummy_cols),axis = 1)
    movie_data.drop("genre",axis=1, inplace=True)


    ## Creating seperate year column and title column
    movie_data["year"]=movie_data["title"].apply(lambda x: x[-5:-1])
    movie_data["title"] = movie_data["title"].apply(lambda x: x[:-7])

    ## Mapping M and F in user data to 1 and 0
    user_data["gender"] = user_data["gender"].map({"M":1,"F":0})


    ###################################
    ####### MERGES ###################

    ## DF with movie rating and the movie info
    movie_rating = pd.merge(ratings_data,
                            movie_data,
                            how="left",
                            left_on ="movie",
                            right_on="movie")

    ## DF with movie rating and the user info
    user_rating = pd.merge(ratings_data,
                           user_data,
                           how="left",
                           left_on ="user",
                           right_on="user")

    ## Final DF with both movie info and user info
    if test_file == False:
        final_train = pd.merge(movie_rating,
                               user_rating,
                               on=["user","movie","rating","timestamp"])
    else:
        final_train = pd.merge(movie_rating,
                               user_rating,
                               on=["user","movie"])


    ## Returning frames as dictionary
    frames = {"ratings_data": ratings_data,
              "movie_data": movie_data,
              "user_data": user_data,
              "movie_rating": movie_rating,
              "user_rating": user_rating,
              "total_frame": final_train}
    print("Name of Frames for reference")
    print("ratings_data, movie_data, user_data, movie_rating, user_rating, total_frame")
    return frames



In [49]:
res_dict = get_frames1("requests.csv",test_file =True)

  
  if sys.path[0] == '':


Name of Frames for reference
ratings_data, movie_data, user_data, movie_rating, user_rating, total_frame


In [50]:
test_df = res_dict['total_frame']

In [51]:
req_df = test_df[["user","movie"]]

In [56]:
req_df.head()

Unnamed: 0,user,movie
0,4958,1924
1,4958,3264
2,4958,2634
3,4958,1407
4,4958,2399


In [57]:
testspark_df = spark.createDataFrame(req_df)

In [58]:
y_pred = recommender.transform(testspark_df)

In [128]:
y_pred.show(5)

+----+-----+----------+
|user|movie|prediction|
+----+-----+----------+
|  53|  148|       NaN|
|4169|  148| 3.2171936|
|5333|  148| 2.4626255|
|4387|  148| 2.4067972|
| 840|  148| 2.7947896|
+----+-----+----------+
only showing top 5 rows



In [59]:
req_fin = y_pred.toPandas()

In [60]:
req_fin.head()

Unnamed: 0,user,movie,prediction
0,53,148,
1,4169,148,3.243941
2,5333,148,2.495717
3,4387,148,1.99101
4,840,148,3.026429


In [61]:
test = req_fin.apply(lambda x: fill_na(x) if pd.isnull(x.prediction) else x['prediction'], axis = 1)

KeyError: ('the label [53] is not in the [index]', 'occurred at index 0')

In [None]:
y_pred.toPandas()

In [129]:
pd_y_pred = y_pred.toPandas()

In [130]:
pd_y_pred = pd_y_pred.fillna(3.2)

In [133]:
pd_y_pred['rating']= pd_y_pred['prediction']

In [134]:
pd_y_pred=pd_y_pred.drop('prediction',axis=1)

In [163]:
pd_y_pred.head()

Unnamed: 0,user,movie,rating
0,32,32,32.0
1,4169,148,3.217194
2,5333,148,2.462626
3,4387,148,2.406797
4,840,148,2.79479


In [136]:
pd_y_pred.to_csv("res1.csv",index=False)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 56982)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp

In [144]:
pd_y_pred.head()

Unnamed: 0,user,movie,rating
0,53,148,3.2
1,4169,148,3.217194
2,5333,148,2.462626
3,4387,148,2.406797
4,840,148,2.79479


In [151]:
x=pd_y_pred.groupby('user').mean()['rating']

In [172]:
pd_y_pred.head()

AttributeError: 'float' object has no attribute 'head'

In [171]:
pd_y_pred['rating']

TypeError: 'float' object is not subscriptable

In [173]:
pd_y_pred

3.2

In [68]:
def fill_na(n):
    if (n.user not in user_means) and (n.movie not in movie_means):
        final = training_means
    elif n.movie not in movie_means:
        final = user_means.loc[n.user]
    else:
        final = movie_means.loc[n.movie]
    return final

In [178]:
y=pd.read_csv('foobar.csv')

In [179]:
y.head()

Unnamed: 0,user,movie,rating
0,53,148,3.2
1,4169,148,3.354821
2,5333,148,2.633483
3,4387,148,2.472111
4,840,148,2.714116


In [None]:
self.training_means = X['rating'].mean()

self.movie_means = X.groupby('movie')['rating'].mean()
self.user_means = X.groupby('user')['rating'].mean()

In [43]:
pd_y_pred.head()

Unnamed: 0,user,movie,rating,prediction,final
0,2383,148,2,2.299468,2.299468
1,1069,148,2,2.956888,2.956888
2,2456,148,2,2.783664,2.783664
3,3683,463,1,1.504709,1.504709
4,3562,463,2,2.750967,2.750967


In [45]:
final = pd_y_pred[["user","movie","final"]]
final.head()

Unnamed: 0,user,movie,final
0,2383,148,2.299468
1,1069,148,2.956888
2,2456,148,2.783664
3,3683,463,1.504709
4,3562,463,2.750967


In [46]:
final.to_csv("res_kp.csv",index=False)

In [55]:
req_df.head()

Unnamed: 0,user,movie
0,4958,1924
1,4958,3264
2,4958,2634
3,4958,1407
4,4958,2399


In [62]:
req_fin.head()

Unnamed: 0,user,movie,prediction
0,53,148,
1,4169,148,3.243941
2,5333,148,2.495717
3,4387,148,1.99101
4,840,148,3.026429


In [78]:
sum(req_fin.prediction.isna())

95785

In [69]:
test = req_fin.apply(lambda x: fill_na(x) if pd.isnull(x.prediction) else x['prediction'], axis = 1)

In [70]:
test.head()

0    2.785714
1    3.243941
2    2.495717
3    1.991010
4    3.026429
dtype: float64

In [71]:
req_fin['rating']= test

In [79]:
req_fin.describe()

Unnamed: 0,user,movie,prediction,rating
count,200209.0,200209.0,104424.0,200209.0
mean,1511.751225,1930.586682,3.354862,3.467994
std,1582.930564,1129.67035,0.673475,0.628004
min,1.0,1.0,0.492823,0.492823
25%,331.0,1046.0,2.932264,3.095483
50%,752.0,1946.0,3.414784,3.553931
75%,2131.0,2890.0,3.832892,3.924411
max,6040.0,3952.0,5.474791,5.474791


In [81]:
req_fin

Unnamed: 0,user,movie,prediction,rating
0,53,148,,2.785714
1,4169,148,3.243941,3.243941
2,5333,148,2.495717,2.495717
3,4387,148,1.991010,1.991010
4,840,148,3.026429,3.026429
5,216,148,,2.785714
6,482,148,,2.785714
7,752,148,2.930657,2.930657
8,424,148,,2.785714
9,970,463,2.825386,2.825386


In [73]:
fin = req_fin[["user","movie","rating"]]

In [74]:
fin.to_csv("res_kp.csv",index=False)

In [75]:
fin.describe()

Unnamed: 0,user,movie,rating
count,200209.0,200209.0,200209.0
mean,1511.751225,1930.586682,3.467994
std,1582.930564,1129.67035,0.628004
min,1.0,1.0,0.492823
25%,331.0,1046.0,3.095483
50%,752.0,1946.0,3.553931
75%,2131.0,2890.0,3.924411
max,6040.0,3952.0,5.474791
