### Spark Initialization

In [1]:
import findspark
findspark.init()

In [2]:
import sys
import copy
import csv

from string import atoi
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
import numpy as np

In [4]:
conf = SparkConf().setAppName("ContentBased")
conf = conf.setMaster("local[*]")

In [5]:
sc  = SparkContext(conf=conf)

### Load Train and Test Data

In [6]:
trainData = sc.textFile("/Users/lakshya/Desktop/INF-553/Project/las_vegas_review_with_text_without_rare_20_res_lemma_data_train.csv",use_unicode=False)
testData = sc.textFile("/Users/lakshya/Desktop/INF-553/Project/las_vegas_review_with_text_without_rare_20_res_lemma_data_test.csv",use_unicode=False)

In [7]:
train_rdd = trainData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))
test_rdd = testData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))

In [8]:
avg_rating = train_rdd.map(lambda x: (x[0][0], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], sum(x[1])/len(x[1])))
prod_rating = train_rdd.map(lambda x: (x[0][1], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], sum(x[1])/len(x[1])))

In [9]:
train_temp = trainData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), 1))

### Load review data

In [10]:
data = sc.textFile("/Users/lakshya/Desktop/INF-553/Project/las_vegas_review_with_text_without_rare_20_res_lemma_data.csv",use_unicode=False)

In [11]:
data.take(6)

['"---1lKK3aKOuomHnwAkAow","A0X1baHPgw9IiBRivu0G9g","5","2010-12-02 ","5","2","1","holy bread you have cometh to the sin consumerism summerlin set we be unworthy of your artisan gift but hopefully there be enough foodie fringe to sustain thee ps be rude be be french a rise by any other name we get a truffle cake for xmas and it be wonderful this seem to be a hub for french people in town most of the folks in line be speak french "',
 '"---1lKK3aKOuomHnwAkAow","AZlnpvILz5cEWJifjr2CSQ","5","2010-11-25 ","3","1","1","i come here for pesto i use pesto like most people use salt it go on veggies bread pasta ect they also make freeze pasta include my husband covet gnocchi they also have a great deli selection include ny favorite boars head they have in house ricotta and mozzarella as well as all sort of pepper and olives there be a little cafe and it be nothing fancy it be however simple and good not much on presentation but big on taste they also have arborio which apparently be hard to get 

In [12]:
train_data = data.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), (x[2],x[7]))).join(train_temp)

In [13]:
train_data.take(3)

[(('ik3YuDGRcNo9G24U0eG19A', 'q3oJ6bNRV3OoJrwc95GOwg'),
  (('5',
    'i have be want to go here for awhile i must have see it on tv i get the veggie dog red i know what i think to be a portuguese roll i be from chicago so we be quite big on our hotdogs this be nothing like i be use to but it be sooooo good we also get an order of dirty fry and while we be sit there eat somebody come out and say the kitchen make too much french fry would we like another my friend and i be already split the container of fry yes i say container i tell my friend just go ahead and bring those home for herself because i still have an almost full container after two people eat them the hot dog be so humongous that i bring some of that home as well have your appetite ready if you go to this place '),
   1)),
 (('1JW3rbxP2WpnXYOeMyKEWg', 'kZspuWnM0Y-Losvk2Rl0lA'),
  (('1',
    'i ask um amellie i think she say her name be what come with the tzadziki appetizer and she proceed to tell me what tzadziki be not what

### Collect user data from train data (User, Product, Rating)

In [14]:
userReview = train_data.map(lambda x: (x[0][0], x[0][1], x[1][0][0]))

In [15]:
userReviewCollected = userReview.map(lambda x: (x[0], x[1], x[2]))

In [16]:
userReviewCollected.take(5)

[('ik3YuDGRcNo9G24U0eG19A', 'q3oJ6bNRV3OoJrwc95GOwg', '5'),
 ('1JW3rbxP2WpnXYOeMyKEWg', 'kZspuWnM0Y-Losvk2Rl0lA', '1'),
 ('sELpZpITOy9abQB3YY2Ugg', 'tveb-DkZ0lnwgKb_oavl6A', '5'),
 ('1M7qbeQoL8O5r_cUVEuKEw', 'PSUOncuqfqHulYj_fusthw', '5'),
 ('iS4m_LE7f2oEzYl09HiIuw', 'fL-b760btOaGa85OJ9ut3w', '5')]

### Collect product data (Product, Review Text)

In [17]:
prodReview = train_data.map(lambda x: (x[0][1], x[1][0][1])).groupByKey().mapValues(list)

In [18]:
prodReviewCollected = prodReview.map(lambda x: (x[0], " ".join(x[1])))

In [19]:
prodReviewCollected.take(5)

[('ZHRBiaHmqdwyXPNsETqKCw',
  'huge annual event with plenty of panel and photo ops good range of price from discount single day to weeklong package definitely worth a visit  man this convention be funner than watch a klingon knick himself with his batleth battle sword for a star trek gather in las vegas this convention be u s s enterprise huge be always intend to come to this convention since its right at home and present and past intermingle with fan great gather of nerds and geeks alike i get star strike by folks like levar burton marina sirtis brent spiner and jonathan frakes no picard though this round for a nominal fee you could get a pic and auto from each of them and these folks be friendly as ever just pass by there be no divas in the house hell even the original shaft richard roundtree be there shut yo mouthlots of convention space and a number of dealers to get your star trek swag no joke i buy no star trek swag but score big time with some star war memorabilia ive be want f

### Doc2Vec model creation

In [20]:
from collections import namedtuple
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

  _nan_object_mask = _nan_object_array != _nan_object_array


Convert user and product rdd to pandas dataframe

In [22]:
spark = SparkSession(sc)

userPandas = userReviewCollected.toDF().toPandas()
prodPandas = prodReviewCollected.toDF().toPandas()

Create Doc2Vec model on product category text

In [23]:
documents = [TaggedDocument(doc, [i]) for i, doc in enumerate(prodPandas['_2'])]

In [24]:
documents

(9067, 138470)

In [None]:
model = Doc2Vec(documents, size = 2000, min_count = 1, workers=4)

In [None]:
feature_vectors = []
for i in range(0, len(model.docvecs)):
    feature_vectors.append(model.docvecs[i])

Add Doc2Vec vectors to product dataframe

In [25]:
prodPandas['Vector'] = feature_vectors

In [26]:
prodPandas

Unnamed: 0,_1,_2,Vector
0,ZHRBiaHmqdwyXPNsETqKCw,huge annual event with plenty of panel and pho...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,X11JfeIVH9d6qMrDqA-N7A,i come to scooters to watch las vegas wrangler...,"[0.00353394626436, 0.0, 0.0, 0.0, 0.0, 0.0, 0...."
2,1ROp6NSMYaXPqAjo8ubAtA,thank god for you brother you be an example to...,"[0.0490698325252, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
3,J121PGT4oi5AqVbtMoEfVw,food have no taste i order the asado taco and ...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,WsIJixo-2iDc-V078c_afQ,raack n roll be a topless adults only comedy s...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
5,7RidnmyYyH0cfcX2x7buag,my mother live in this community so ive eat he...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.05750714..."
6,BjPph9SbmEfox2vip326PQ,this be the second time ive be in the last few...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,rfwJFFzW6xW2qYfJh14OTA,oh man this show be a strip show a really real...,"[0.0110202632102, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
8,i_pI9XnOgJ93cq85Ot06OQ,when i think about this place think about smal...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,CxzUdREgfS4ymtXFSREf8Q,so be drive down the road and we see this sign...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


Product category text not needed

In [27]:
del prodPandas['_2']

### Weighted Linear Combination of product vectors for users

In [None]:
userPandas['Vector'] = [[] for _ in range(len(userPandas))]

In [33]:
userPandas['_3'] = userPandas['_3'].astype(float)

In [34]:
for index, row in userPandas.iterrows():
    vector = np.array(prodPandas.loc[prodPandas['_1'] == row['_2'], 'Vector'].values[0])
    rating = row['_3']
    userPandas.at[index,'Vector'] = rating*vector

In [35]:
userPandas

Unnamed: 0,_1,_2,_3,Vector
0,3egcdazws_x1wW35jgXfNw,gae9LAyt7Qvf_OgAkWASxA,4.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,2JjI8DCyrpabtg36iL5lrw,YN8pgoAGNfk8J3LKmSVJ2w,3.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,eA-KLkSEq9RYt9-RfNZXCA,RYBGhnFtyZGjlREEfcRFrg,4.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,BxDsaVNeWxc5mNyA1HtSHQ,oeW0vIYd3rUnAPgmD4fEFg,5.0,"[0.0469108080909, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
4,jlWYqr9IGLQrSRowtsKf4g,CsbNmQqu9dKFrgcIItevYA,1.0,"[0.00549316759098, 0.0, 0.0, 0.0, 0.0, 0.0, 0...."
5,4r33dXcE1oYZxjONrhxTiA,9gNko6cFCMZbvy1zhJ7-Xg,5.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,Bwa7MWEM88FtiNLaXWCkww,X2X3n0PutSNonQQj2cjsCw,4.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,cUUCMncwWZm57s95iUX1Gg,dW79jPJVpzlTKGF-1JAsaw,3.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,vd5wfQ2L8fszYBHDPza9yw,yPcAdikNrXEsfbHNBQUjkQ,3.0,"[0.0264315048363, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
9,91koVXu54Y6tIiZKKxVmrw,N8FdrKl0y_E_mUOj8VSFuA,5.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


Product and rating column not needed

In [36]:
del userPandas['_2']
del userPandas['_3']

Linear combination of feature vectors

In [37]:
userPandas = userPandas.groupby(['_1']).sum()

Normalize the user feature vectors

In [38]:
from sklearn.preprocessing import Normalizer

In [39]:
for index, row in userPandas.iterrows():
    vector = np.array(row['Vector']).reshape(1, -1)
    transformer = Normalizer().fit(vector)
    userPandas.at[index,'Vector'] = transformer.transform(vector)

In [40]:
userPandas

Unnamed: 0_level_0,Vector
_1,Unnamed: 1_level_1
-0-hVEpwWEcJLJoGq3rE3g,"[[0.00615835412603, 0.0, 0.0, 0.0, 0.000922140..."
-2OB54nQ6FsGLUM-R1KXnA,"[[0.00754647464141, 0.000816293149086, 0.0, 0...."
-ARdx8hOcEWlMDjzwLYZ_g,"[[0.00936276793327, 0.00118454361657, 0.0, 0.0..."
-Pk25bOBsvemFaWKDBVBzA,"[[0.00520853482007, 0.0, 0.0, 0.0, 0.000951302..."
-Q2wBtscwW6JOqlBndji4A,"[[0.00347866771189, 0.000394463441461, 0.0, 0...."
-Q4bjWlbxmb1yKP4U7OODg,"[[0.00748184620648, 0.00111679982221, 0.0, 0.0..."
-SDx-d5jppC4OBBosLVpYw,"[[0.0145795139085, 0.000867873056934, 0.0, 0.0..."
-XgVXGJnOnW0kQEol6O3Pg,"[[0.00924828001642, 0.0, 0.0, 0.0, 0.000691329..."
-Y6tXYPYqeVy37-L5p0rMw,"[[0.00924447837274, 0.0015946375716, 0.0, 0.00..."
-a873HRQxWRRobMNT4xOKg,"[[0.00849843776588, 0.00145316584715, 0.0, 0.0..."


### Create user numpy matrix from feature vectors

In [41]:
user_matrix = np.zeros((len(userPandas), tfidf_prod.shape[1]))
idx = 0
for index, row in userPandas.iterrows():
    vector = np.array(row['Vector'])[0]
    user_matrix[idx] = vector
    idx += 1
    

In [42]:
user_matrix

array([[ 0.00615835,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.00754647,  0.00081629,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.00936277,  0.00118454,  0.        , ...,  0.        ,
         0.00030528,  0.        ],
       ..., 
       [ 0.01177755,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.01024616,  0.00118618,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.00568815,  0.00037771,  0.        , ...,  0.        ,
         0.        ,  0.        ]])

### Create product numpy matrix from feature vectors

In [43]:
prod_matrix = np.zeros((len(prodPandas), tfidf_prod.shape[1]))
idx = 0
for index, row in prodPandas.iterrows():
    vector = np.array(row['Vector'])
    prod_matrix[idx] = vector
    idx += 1

In [44]:
prod_matrix

array([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
       [ 0.,  0.,  0., ...,  0.,  0.,  0.],
       [ 0.,  0.,  0., ...,  0.,  0.,  0.],
       ..., 
       [ 0.,  0.,  0., ...,  0.,  0.,  0.],
       [ 0.,  0.,  0., ...,  0.,  0.,  0.],
       [ 0.,  0.,  0., ...,  0.,  0.,  0.]])

### Compute cosine similarity by taking dot product

In [1]:
similarity_matrix = np.dot(user_matrix, prod_matrix.T)

NameError: name 'np' is not defined

In [46]:
similarity_matrix.shape

(987, 3076)

### Flatten similarity matrix to related with user and products

In [47]:
prod = prodPandas['_1'].values
user = userPandas.index.values

In [48]:
zf = similarity_matrix.flatten()
xr = np.repeat(user, prod.size)
yt = np.tile(prod, user.size)
d = np.stack((xr, yt, zf), axis=-1)

In [49]:
d.shape

(3036012, 3)

### Convert similarity matrix to RDD

In [50]:
similarity_rdd = sc.parallelize(d)

In [51]:
similarity_rdd.take(5)

[array([u'-0-hVEpwWEcJLJoGq3rE3g', u'5REYrZfsX3m4E3FTwovp5Q',
        0.5049527633273366], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'HWrbZS1mxVRj2Y2VwMmDMg',
        0.20888810243149972], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'MvlQo4bev1eqp1q0HYOLHg',
        0.13913650161619856], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'2BF45Gr_FiubZqfP-JgaRQ',
        0.09994331031112633], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'X9Bql7RrPU5Mab5-hJsI8A',
        0.40960488027192626], dtype=object)]

### Group Users 

In [None]:
userGrouped = similarity_rdd.map(lambda x: (x[0], (x[1], float(x[2])))).groupByKey().mapValues(lambda x: list(x))

In [None]:
userGrouped.take(5)

### Sort businesses for each user on the basis of similarity value

In [None]:
similaritySorted = userGrouped.map(lambda x: (x[0], sorted(x[1], key=lambda y: y[1], reverse=True)))

In [None]:
similaritySorted.take(2)

### Take top N recommendations

In [None]:
numKeep = 300

In [None]:
topProds = similaritySorted.map(lambda x: (x[0], x[1][:numKeep])).mapValues(lambda x: [idx[0] for idx in x])

In [None]:
topProds.take(2)

In [None]:
topProds = topProds.collect()

### Save similarity values

In [None]:
with open('/Users/lakshya/Desktop/INF-553/Project/ReviewBasedSorted.txt', 'w') as f:
    for item in topProds:
        f.write(str(item[0])+",")
        prodList = ""
        for prod in item[1]:
            prodList += str(prod)+","
        prodList = prodList[:-1]
        f.write(prodList+"\n")