In [109]:
import numpy as np
import pandas as pd
from pyspark.sql.types import IntegerType
from pyspark.ml.recommendation import ALS
import matplotlib.pyplot as plt
import pyspark as ps
from sklearn.model_selection import train_test_split
from noah_cleaning1 import get_frames

In [2]:
spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext

In [11]:
ratings_data = pd.read_csv("training.csv")
ratings_data.head()

Unnamed: 0,user,movie,rating,timestamp
0,6040,858,4,956703932
1,6040,593,5,956703954
2,6040,2384,4,956703954
3,6040,1961,4,956703977
4,6040,2019,5,956703977


In [50]:
movie_data = pd.read_csv("movies.dat",delimiter = "::",names=["movie","title","genre"])

  """Entry point for launching an IPython kernel.


In [49]:
movie_data.head()

Unnamed: 0,movie,title,genre
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama
4,5,Father of the Bride Part II (1995),Comedy


In [51]:
dummy_cols = movie_data.genre.str.get_dummies()

In [54]:
movie_data = pd.concat((movie_data,dummy_cols),axis = 1)

In [56]:
movie_data= movie_data.drop("genre",axis=1)

In [33]:
user_data = pd.read_csv("users.dat",delimiter = "::",names=["user","gender","age","occupation","zipcode"])

  """Entry point for launching an IPython kernel.


In [34]:
user_data.head()

Unnamed: 0,user,gender,age,occupation,zipcode
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [57]:
movie_data["year"]=movie_data["title"]
movie_data["year"] = movie_data["year"].apply(lambda x: x[-5:-1])
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story (1995),0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji (1995),0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men (1995),0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale (1995),0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II (1995),0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [58]:
movie_data["title"] = movie_data["title"].apply(lambda x: x[:-7])
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story,0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji,0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men,0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale,0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [41]:
genres = set()
for i in movie_data['genre'].unique():
    genre_list = i.split("|")
    for item in genre_list:
        genres.add(item)
genres 
#movie_data['genre'].unique()

{'Action',
 'Adventure',
 'Animation',
 "Children's",
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western'}

In [None]:
movie_data["genre"] = movie_data["genre"].apply(lambda x: x.split("|"))

In [62]:
movie_data.head()

Unnamed: 0,movie,title,Action,Adventure,Animation,Children's,Comedy,Crime,Documentary,Drama,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,1,Toy Story,0,0,1,1,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
1,2,Jumanji,0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1995
2,3,Grumpier Old Men,0,0,0,0,1,0,0,0,...,0,0,0,0,1,0,0,0,0,1995
3,4,Waiting to Exhale,0,0,0,0,1,0,0,1,...,0,0,0,0,0,0,0,0,0,1995
4,5,Father of the Bride Part II,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,1995


In [60]:
user_data["gender"] = user_data["gender"].map({"M":1,"F":0})

In [61]:
user_data.head()

Unnamed: 0,user,gender,age,occupation,zipcode
0,1,0,1,10,48067
1,2,1,56,16,70072
2,3,1,25,15,55117
3,4,1,45,7,2460
4,5,1,25,20,55455


In [63]:
movie_rating = pd.merge(ratings_data,movie_data,how="left",left_on ="movie",right_on="movie")

In [70]:
movie_rating.head(10)

Unnamed: 0,user,movie,rating,timestamp,title,Action,Adventure,Animation,Children's,Comedy,...,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,year
0,6040,858,4,956703932,"Godfather, The",1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1972
1,6040,593,5,956703954,"Silence of the Lambs, The",0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1991
2,6040,2384,4,956703954,Babe: Pig in the City,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,1998
3,6040,1961,4,956703977,Rain Man,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1988
4,6040,2019,5,956703977,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1954
5,6040,1419,3,956704056,Walkabout,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1971
6,6040,573,4,956704056,"Ciao, Professore! (Io speriamo che me la cavo )",0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1993
7,6040,3111,5,956704056,Places in the Heart,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1984
8,6040,213,5,956704056,Burnt By the Sun (Utomlyonnye solntsem),0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1994
9,6040,3505,4,956704056,No Way Out,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1987


In [67]:
user_rating = pd.merge(ratings_data,user_data,how="left",left_on ="user",right_on="user")

In [69]:
user_rating.sample(10)

Unnamed: 0,user,movie,rating,timestamp,gender,age,occupation,zipcode
114516,5239,2410,2,961443420,1,35,7,7043
479471,2901,3359,4,971884464,1,25,17,78749
199298,4619,3176,5,964124663,0,25,1,97225
35699,5780,2540,4,958154897,1,18,17,92886
522083,2777,2806,3,973729444,1,18,4,95326
59459,5616,1388,1,959134683,1,45,1,8840
70078,5555,1605,1,959550643,1,1,10,37830
633227,1632,1238,3,974717779,1,25,16,94120
345390,3687,509,2,966316948,0,50,1,62221
418772,3272,3471,5,968204107,1,35,0,8330


In [73]:
final_train = pd.merge(movie_rating,user_rating,on=["user","movie","rating","timestamp"])

In [84]:
final_train.head()

Unnamed: 0,user,movie,rating,timestamp,title,Action,Adventure,Animation,Children's,Comedy,...,Romance,Sci-Fi,Thriller,War,Western,year,gender,age,occupation,zipcode
0,6040,858,4,956703932,"Godfather, The",1,0,0,0,0,...,0,0,0,0,0,1972,1,25,6,11106
1,6040,593,5,956703954,"Silence of the Lambs, The",0,0,0,0,0,...,0,0,1,0,0,1991,1,25,6,11106
2,6040,2384,4,956703954,Babe: Pig in the City,0,0,0,1,1,...,0,0,0,0,0,1998,1,25,6,11106
3,6040,1961,4,956703977,Rain Man,0,0,0,0,0,...,0,0,0,0,0,1988,1,25,6,11106
4,6040,2019,5,956703977,Seven Samurai (The Magnificent Seven) (Shichin...,1,0,0,0,0,...,0,0,0,0,0,1954,1,25,6,11106


In [85]:
X = final_train[['user','movie','rating']]

In [100]:
spark_df = spark.createDataFrame(X)

In [88]:
train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=427471138)

In [89]:
# Create an untrained ALS model.
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=10) 


In [143]:
recommender = als_model.fit(train_df)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33457)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:33457)

In [91]:
y_pred = recommender.transform(test_df)

In [93]:
y_pred.show(5)

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
|2383|  148|     2|  2.551038|
|1069|  148|     2|  2.986098|
|2456|  148|     2| 2.8667297|
|3683|  463|     1| 1.4679407|
|3562|  463|     2|   2.78387|
+----+-----+------+----------+
only showing top 5 rows



In [94]:
y_pred.describe().show()

+-------+------------------+------------------+------------------+----------+
|summary|              user|             movie|            rating|prediction|
+-------+------------------+------------------+------------------+----------+
|  count|            159972|            159972|            159972|    159972|
|   mean|3408.0624734328508| 1850.770822393919|3.5929037581576777|       NaN|
| stddev|1546.8362429895333|1088.9803317800838|1.1196836087607198|       NaN|
|    min|               636|                 1|                 1| 0.2726254|
|    max|              6040|              3952|                 5|       NaN|
+-------+------------------+------------------+------------------+----------+



In [95]:
pd_y_pred = y_pred.toPandas()

In [142]:
y_pred.show(5)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33457)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:33457)

In [98]:
pd_y_pred = pd_y_pred.fillna(3.2)

In [99]:
pd_y_pred.describe()

Unnamed: 0,user,movie,rating,prediction
count,159972.0,159972.0,159972.0,159972.0
mean,3408.062473,1850.770822,3.592904,3.431607
std,1546.836243,1088.980332,1.119684,0.669331
min,636.0,1.0,1.0,0.272625
25%,2039.0,1023.0,3.0,3.032485
50%,3509.0,1798.0,4.0,3.499867
75%,4705.0,2759.0,4.0,3.906292
max,6040.0,3952.0,5.0,5.472911


In [101]:
pd_y_pred

Unnamed: 0,user,movie,rating,prediction
0,2383,148,2,2.551038
1,1069,148,2,2.986098
2,2456,148,2,2.866730
3,3683,463,1,1.467941
4,3562,463,2,2.783870
5,5511,463,2,3.345553
6,3704,471,5,4.367433
7,5074,471,4,3.374620
8,5222,471,4,3.056594
9,1404,471,3,2.816599


In [116]:
def get_frames1(filename,test_file = False):

    ## Reading in the data
    ratings_data = pd.read_csv(filename)

    movie_data = pd.read_csv("../data/movies.dat",
                            delimiter = "::",
                            names=["movie","title","genre"])

    user_data = pd.read_csv("../data/users.dat",
                            delimiter = "::",
                            names=["user","gender","age","occupation","zipcode"])


    ## Adding Movie Genre Dummy Cols
    dummy_cols = movie_data.genre.str.get_dummies()
    movie_data = pd.concat((movie_data,dummy_cols),axis = 1)
    movie_data.drop("genre",axis=1, inplace=True)


    ## Creating seperate year column and title column
    movie_data["year"]=movie_data["title"].apply(lambda x: x[-5:-1])
    movie_data["title"] = movie_data["title"].apply(lambda x: x[:-7])

    ## Mapping M and F in user data to 1 and 0
    user_data["gender"] = user_data["gender"].map({"M":1,"F":0})


    ###################################
    ####### MERGES ###################

    ## DF with movie rating and the movie info
    movie_rating = pd.merge(ratings_data,
                            movie_data,
                            how="left",
                            left_on ="movie",
                            right_on="movie")

    ## DF with movie rating and the user info
    user_rating = pd.merge(ratings_data,
                           user_data,
                           how="left",
                           left_on ="user",
                           right_on="user")

    ## Final DF with both movie info and user info
    if test_file == False:
        final_train = pd.merge(movie_rating,
                               user_rating,
                               on=["user","movie","rating","timestamp"])
    else:
        final_train = pd.merge(movie_rating,
                               user_rating,
                               on=["user","movie"])


    ## Returning frames as dictionary
    frames = {"ratings_data": ratings_data,
              "movie_data": movie_data,
              "user_data": user_data,
              "movie_rating": movie_rating,
              "user_rating": user_rating,
              "total_frame": final_train}
    print("Name of Frames for reference")
    print("ratings_data, movie_data, user_data, movie_rating, user_rating, total_frame")
    return frames



In [117]:
res_dict = get_frames1("requests.csv",test_file =True)

  
  if sys.path[0] == '':


Name of Frames for reference
ratings_data, movie_data, user_data, movie_rating, user_rating, total_frame


In [118]:
test_df = res_dict['total_frame']

In [120]:
req_df = test_df[["user","movie"]]

In [122]:
req_df.head()

Unnamed: 0,user,movie
0,4958,1924
1,4958,3264
2,4958,2634
3,4958,1407
4,4958,2399


In [124]:
testspark_df = spark.createDataFrame(req_df)

In [126]:
y_pred = recommender.transform(testspark_df)

In [128]:
y_pred.show(5)

+----+-----+----------+
|user|movie|prediction|
+----+-----+----------+
|  53|  148|       NaN|
|4169|  148| 3.2171936|
|5333|  148| 2.4626255|
|4387|  148| 2.4067972|
| 840|  148| 2.7947896|
+----+-----+----------+
only showing top 5 rows



In [129]:
pd_y_pred = y_pred.toPandas()

In [130]:
pd_y_pred = pd_y_pred.fillna(3.2)

In [133]:
pd_y_pred['rating']= pd_y_pred['prediction']

In [134]:
pd_y_pred=pd_y_pred.drop('prediction',axis=1)

In [135]:
pd_y_pred.head()

Unnamed: 0,user,movie,rating
0,53,148,3.2
1,4169,148,3.217194
2,5333,148,2.462626
3,4387,148,2.406797
4,840,148,2.79479


In [136]:
pd_y_pred.to_csv("res1.csv",index=False)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 56982)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp

In [144]:
pd_y_pred.head()

Unnamed: 0,user,movie,rating
0,53,148,3.2
1,4169,148,3.217194
2,5333,148,2.462626
3,4387,148,2.406797
4,840,148,2.79479


In [151]:
x=pd_y_pred.groupby('user').mean()['rating']

In [152]:
x

user
1       3.200000
2       3.200000
3       3.200000
4       3.200000
5       3.200000
6       3.200000
7       3.200000
8       3.200000
9       3.200000
10      3.200000
11      3.200000
12      3.200000
13      3.200000
14      3.200000
15      3.200000
16      3.200000
17      3.200000
18      3.200000
19      3.200000
20      3.200000
21      3.200000
22      3.200000
23      3.200000
24      3.200000
25      3.200000
26      3.200000
27      3.200000
28      3.200000
29      3.200000
30      3.200000
          ...   
5869    3.123160
5872    3.326855
5874    3.575305
5875    3.736841
5878    3.629150
5880    2.855008
5891    3.510691
5892    3.091177
5902    3.290637
5918    4.142386
5920    3.842805
5923    3.482408
5927    3.414318
5938    3.667717
5948    3.951805
5949    3.253427
5950    3.168464
5956    3.168432
5966    3.296785
5972    3.610217
5985    3.620598
5991    3.170504
5995    3.328423
5996    3.559875
5998    3.005191
6001    3.397735
6002    3.880362
6016    3