In [1]:
from operator import add
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF
from pyspark.ml import Pipeline, PipelineModel

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#sc = SparkContext(gateway = jg.launch_gateway())

import folium
import html

import pandas as pd
import numpy as np

In [2]:
data_path = '/Volumes/Transcend/dataset/'
model_path = '/Volumes/Transcend/MDS_Yelp/model/'
outout_path = '/Volumes/Transcend/MDS_Yelp/output/'

In [None]:
sc.stop()

In [4]:
sc = SparkContext()
#sc = SparkContext('local')
spark = SparkSession(sc)

In [5]:
business_df = spark.read.parquet(data_path + 'business-small.parquet')
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [6]:
user_df = spark.read.parquet(data_path + 'user-small.parquet')
user_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- fans: long (nullable = true)
 |-- average_stars: double (nullable = true)



In [7]:
review_df = spark.read.parquet(data_path + 'review-small.parquet')
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)



In [8]:
# create SQL view for later queries
review_df.createOrReplaceTempView("reviews")

# create review text dataframe
reviews_text = spark.sql("SELECT user_id, review_text FROM reviews")
reviews_text.show(3)

+--------------------+--------------------+
|             user_id|         review_text|
+--------------------+--------------------+
|u0LXt3Uea_GidxRW1...|Who would have gu...|
|u0LXt3Uea_GidxRW1...|Not bad!! Love th...|
|u0LXt3Uea_GidxRW1...|This is currently...|
+--------------------+--------------------+
only showing top 3 rows



In [9]:
# concatenate all reviews per restuarant

reviews_text_rdd = reviews_text.rdd
reviews_by_user_rdd = reviews_text_rdd.map(tuple).reduceByKey(add)  
reviews_by_user_df = spark.createDataFrame(reviews_by_user_rdd)
reviews_by_user_df = reviews_by_user_df \
                            .withColumnRenamed('_1', 'user_id') \
                            .withColumnRenamed('_2', 'text')
reviews_by_user_df.count()

73041

In [9]:
## Example of using Word2vec
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.02326762331649661,0.008931299671530724,-0.06394885405898094]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.009175857529044151,-0.024911361613443917,0.012272004171141555]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.005160079896450043,-0.0005152661353349686,0.014656295813620091]



In [10]:
## https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Word2Vec

## The minimum number of times a token must appear to be included in the word2vec model's vocabulary"

In [None]:
%%time
# usefule link: https://www.tutorialkart.com/apache-spark/spark-mllib-tf-idf/
# create text processing pipeline -- this a lengthy resource-intensive process
# Build the pipeline 
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text', outputCol = 'token')
stopWordsRemover = StopWordsRemover(inputCol = 'token', outputCol = 'nostopwrd')
countVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")
iDF = IDF(inputCol="rawFeature", outputCol="idf_vec")
# TD-IDF Vec
word2Vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'nostopwrd', outputCol = 'word_vec', seed=123)
vectorAssembler = VectorAssembler(inputCols=['idf_vec', 'word_vec'], outputCol='comb_vec')
pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])

# fit the model
pipeline_mdl = pipeline.fit(reviews_by_user_df)

#save the pipeline model
pipeline_mdl.write().overwrite().save(model_path + 'pipe_txt')

In [10]:
# load the text transformation pipeline trained model
pipeline_mdl = PipelineModel.load(model_path + 'pipe_txt')

In [11]:
# transform the review data
reviews_by_user_trf_df = pipeline_mdl.transform(reviews_by_user_df)

In [12]:
# show the transformed review data
reviews_by_user_trf_df.select( 'text', 'nostopwrd', 'idf_vec', 'word_vec', 'comb_vec').show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|           nostopwrd|             idf_vec|            word_vec|            comb_vec|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|A born and bred T...|[born, bred, toro...|(128365,[0,1,2,3,...|[-0.0298230579082...|(128465,[0,1,2,3,...|
|I have never done...|[never, done, one...|(128365,[2,6,11,1...|[-0.0256846508636...|(128465,[2,6,11,1...|
|What a find!  I'm...|[find, m, almost,...|(128365,[2,3,6,11...|[-0.0783052189961...|(128465,[2,3,6,11...|
|Not impressed wit...|[impressed, place...|(128365,[0,1,2,3,...|[-0.0273270801487...|(128465,[0,1,2,3,...|
|Very tight space....|[tight, space, fo...|(128365,[2,39,116...|[-0.0261619807634...|(128465,[2,39,116...|
|While in Toronto ...|[toronto, last, w...|(128365,[0,1,2,3,...|[-0.0836421314076...|(128465,[0,1,2,3,...|
|I loved the decor...|[loved, decor, 

In [13]:
def CosineSim(vec1, vec2): 
    return np.dot(vec1, vec2) / np.sqrt(np.dot(vec1, vec1)) / np.sqrt(np.dot(vec2, vec2))

In [15]:
all_user_vecs = reviews_by_user_trf_df.select('user_id', 'word_vec').rdd.map(lambda x: (x[0], x[1])).collect()

In [16]:
all_user_vecs[0]
# user_id = 'nOTl4aPC4tKHK35T3bNauQ'
# DenseVector() - vector respresentation of all the reviews of User

('BytRWk8X1OelSgwwfXd8Aw',
 DenseVector([-0.0298, -0.0741, -0.0649, 0.0832, 0.0279, -0.0149, -0.0436, -0.0123, -0.0076, -0.0379, -0.0085, -0.0369, -0.0037, -0.0116, 0.0142, -0.0317, -0.0555, -0.0272, -0.0729, 0.0104, -0.0666, -0.0442, 0.0158, 0.0029, 0.0563, -0.0458, -0.0137, -0.0207, -0.0063, 0.0337, 0.0583, 0.0148, 0.0212, -0.0019, -0.031, -0.0436, 0.0617, 0.0352, -0.0151, -0.0433, -0.0319, -0.0212, -0.0246, 0.0944, 0.0214, -0.0364, -0.0363, 0.0447, 0.0029, 0.0008, 0.0107, -0.0229, -0.0219, -0.045, -0.0714, 0.0296, 0.0166, 0.0181, 0.0324, -0.0024, -0.0046, 0.0065, 0.0301, -0.049, -0.0385, 0.0413, 0.0566, 0.0437, -0.0322, 0.0902, -0.0098, -0.0056, 0.0242, 0.0346, 0.0133, -0.0412, -0.0373, 0.0083, -0.0207, 0.0461, -0.0175, -0.0269, 0.0018, -0.0071, -0.0001, 0.0023, 0.0449, 0.0108, 0.0087, 0.0074, 0.013, -0.0001, -0.0442, 0.0266, -0.0158, -0.0175, -0.0496, 0.0423, -0.0899, 0.034]))

In [49]:
def getSimilarUsers(u_ids, all_user_vecs, sim_user_limit=10):
    
    schema = StructType([   
                            StructField("user_id", StringType(), True)
                            ,StructField("score", IntegerType(), True)
                            ,StructField("input_user_id", StringType(), True)
                        ])
    
    similar_user_df = spark.createDataFrame([], schema)
    similar_user_df_all = spark.createDataFrame([], schema)
    
    for u_id in u_ids:
        input_vec = [(r[1]) for r in all_user_vecs if r[0] == u_id]
        if(len(input_vec) < 1):
            print("not in the user_df")
            break
        else:
            #print("-------------------------------")
            #print(u_id)
            input_vec = input_vec[0]
    
        similar_user_rdd = sc.parallelize((i[0], float(CosineSim(input_vec, i[1]))) for i in all_user_vecs)
        
        similar_user_df = spark.createDataFrame(similar_user_rdd) \
            .withColumnRenamed('_1', 'user_id') \
            .withColumnRenamed('_2', 'score') \
            .orderBy("score", ascending = False)
            
        similar_user_df = similar_user_df.dropna()    
        similar_user_df = similar_user_df.filter(col("user_id") != u_id).limit(sim_user_limit)
        similar_user_df = similar_user_df.withColumn('input_user_id', lit(u_id))
        
        similar_user_df = similar_user_df \
                                    .union(similar_user_df)
        
        similar_user_df_all = similar_user_df_all.union(similar_user_df)
    similar_user_df_all = similar_user_df_all.dropDuplicates()    
    return similar_user_df_all

In [17]:
def getUserDetails(user):
    
    a = user.alias("a")
    b = user_df.alias("b")
    
    return a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.user_id'), col('b.user_name'),col('b.review_count')])

In [40]:
# test with two users

uids = ['nOTl4aPC4tKHK35T3bNauQ', 'QBac9-Ii6jR-yLsQ5MVTHg']

print('\ninput user details:')
user_df.select('user_id','user_name', 'review_count') \
    .filter(user_df.user_id.isin(uids) == True).show(truncate=False)
    
# get top 10 similar users
sim_users = getUserDetails(getSimilarUsers(uids, all_user_vecs, 10))

print('Top 10 similar Users for each input restaurant are:"')
sim_users_df = sim_users.select('input_user_id', 'a.user_id', 'user_name', 'score','review_count').toPandas()
sim_users_df


input user details:
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|nOTl4aPC4tKHK35T3bNauQ|Katherine|148         |
|QBac9-Ii6jR-yLsQ5MVTHg|Alex     |13          |
+----------------------+---------+------------+



  


Top 10 similar Users for each input restaurant are:"


Unnamed: 0,input_user_id,user_id,user_name,score,review_count
0,QBac9-Ii6jR-yLsQ5MVTHg,J5Eb7LhJaOa20k0ppcOCOg,Alek,0.907664,34
1,nOTl4aPC4tKHK35T3bNauQ,ZBllYKrFzaI0I7v6Wl26Wg,Cecilia,0.963999,135
2,QBac9-Ii6jR-yLsQ5MVTHg,cNhHuEQMIpLH_qc9qGz67A,Jay,0.910774,57
3,QBac9-Ii6jR-yLsQ5MVTHg,_IR48ok0ZkPMWJ2PlRCk0A,Michael,0.907098,82
4,nOTl4aPC4tKHK35T3bNauQ,myrcQ3h2G04Gv-ANG_oqrg,Linda,0.971381,112
5,QBac9-Ii6jR-yLsQ5MVTHg,MpN81tQOL86GaFse-_tTRQ,Amy,0.913263,46
6,QBac9-Ii6jR-yLsQ5MVTHg,kw-YtOKPXrRB2a9wRZlmzQ,Jimmy,0.915453,101
7,nOTl4aPC4tKHK35T3bNauQ,uO1w3qNo21c1bVHHFTYW0w,Joanne,0.972255,221
8,QBac9-Ii6jR-yLsQ5MVTHg,bPUpO-bP6BmAGvSwPyDsng,Michael,0.905848,142
9,nOTl4aPC4tKHK35T3bNauQ,PGx4HvY5joEeqXzam6tO7A,Lisa,0.965808,349


In [55]:
review = pd.read_csv("/Volumes/Transcend/MDS_Yelp/yelp_review.csv")

In [28]:
#uids = ['nOTl4aPC4tKHK35T3bNauQ', 'QBac9-Ii6jR-yLsQ5MVTHg']
review_uid1 = review.loc[review['user_id'].isin(['nOTl4aPC4tKHK35T3bNauQ'])]
review_uid2 = review.loc[review['user_id'].isin(['PGx4HvY5joEeqXzam6tO7A'])]
merged_uid = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
merged_uid[['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']] 
#average the rating if more than 1 review

In [40]:
import fastparquet
review_df = pd.read_parquet(data_path + 'review-small.parquet', engine='fastparquet')

In [41]:
review_uid1 = review.loc[review['user_id'].isin(['nOTl4aPC4tKHK35T3bNauQ'])]
review_uid2 = review.loc[review['user_id'].isin(['vJGLEHyhCs9V-5fAe-xx3w'])]
merged_uid = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
merged_uid[['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']] #average the rating if more than 1 review

Unnamed: 0,business_id,stars_x,stars_y,text_x,text_y,date_x,date_y
0,oQylTvXwGIkKFdCjmafKVg,4,4,"We went here after dinner, simply for drinks. ...",We came here for Mother's day as has become tr...,2013-02-10,2013-06-04
1,5N8R7ALESZ30EoAzVJtabw,5,5,Went in on a whim and was not disappointed. I ...,The Dirty Bird invited me back to give them an...,2016-10-17,2015-10-15
2,u2ETlHOcFdRz4BxcdfsK0Q,3,3,We went here for Summerlicious. For $25 we got...,I came here for a ladies lunch with the girls ...,2012-07-08,2013-06-04


In [45]:
review_uid1 = review.loc[review['user_id'].isin(['nOTl4aPC4tKHK35T3bNauQ'])]
review_uid2 = review.loc[review['user_id'].isin(['ZBllYKrFzaI0I7v6Wl26Wg'])]
merged_uid = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
merged_uid[['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']] #average the rating if more than 1 review

Unnamed: 0,business_id,stars_x,stars_y,text_x,text_y,date_x,date_y
0,cefRDEK5O3t_iUuwnmL27Q,4,4,We booked a reservation at Smith for Summerlic...,"I would've given it 5 stars for the food, but ...",2012-07-12,2014-05-05
1,c78Pat78fVUBFPXYeVvbaQ,5,3,I am surprised at the people complaining about...,Overrated. \n\nMistake: coming to Odd Seoul on...,2016-01-24,2016-03-01


In [42]:
review_uid1 = review.loc[review['user_id'].isin(['nOTl4aPC4tKHK35T3bNauQ'])]
review_uid2 = review.loc[review['user_id'].isin(['VVm-TFCpi9M1-k8ED0l1eA'])]
merged_uid = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
merged_uid[['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']] #average the rating if more than 1 review

Unnamed: 0,business_id,stars_x,stars_y,text_x,text_y,date_x,date_y
0,zgQHtqX0gqMw1nlBZl2VnQ,2,4,"While I really enjoyed the noodles, their sign...",Actual rating 3.5\n\nLet's get my main gripe o...,2013-10-19,2014-11-22
1,kGOr_D-LNpgZ2M9N8TT4QQ,5,4,I went here with my husband and another couple...,La Societe looks as good as you'd imagine a re...,2013-01-27,2012-01-05
2,W5d8iNog90R-qw43m5dGwg,5,5,"I have been craving schnitzel for a LONG time,...",Had lunch here today and I'm totally sold. Si...,2012-10-08,2012-08-12
3,OIdOJaNS8M624F58XGV3PQ,4,3,"Really this deserves a 3-1\/2 star rating, but...",Actually around 3.5\n\nHaven't been here in qu...,2012-11-11,2014-12-22


In [44]:
review_uid1 = review.loc[review['user_id'].isin(['QBac9-Ii6jR-yLsQ5MVTHg'])]
review_uid2 = review.loc[review['user_id'].isin(['eV5usRjY2cDqNKVv8wXroA'])]
merged_uid = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
merged_uid[['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']] #average the rating if more than 1 review

Unnamed: 0,business_id,stars_x,stars_y,text_x,text_y,date_x,date_y
0,r_BrIgzYcwo1NAuG9dLbpg,4,3,This review will be shorter than my first two....,Total cheap eat hipster restaurant. The food w...,2016-07-05,2016-05-02


##### Using combination vector of Word2Vec and IDF

##### top users from user-business-set notebook

In [57]:
# converting to pandas dataframe
user_pd = user_df.toPandas()
user_pd = user_pd.sort_values(by = 'review_count', ascending= False)
uids_top_500 = list(user_pd.user_id.iloc[0:500])
user_pd.head()

Unnamed: 0,user_id,user_name,review_count,yelping_since,useful,funny,cool,fans,average_stars
54601,8RcEwGrFIgkt9WQ35E6SnQ,George,7764,2009-11-06,123,139,113,272,3.49
44835,Xwnf20FKuikiHcSpcEbpKQ,Kenneth,6653,2011-06-10,1444,1142,1167,237,3.32
22639,CxDOIDnH8gp9KXzpBHJYXw,Jennifer,5868,2009-11-09,1241,1968,959,610,3.29
48114,HFECrzYDpgbS5EmTBtj2zQ,Eric,5344,2007-03-28,3905,3876,3847,397,3.93
29361,Hi10sGSZNxQH3NLyWSZ1oA,Fox,4537,2009-05-26,40103,40030,39837,871,3.81


In [37]:
sim_users_all = pd.DataFrame(columns = ['input_user_id', 'user_id', 'user_name', 'score', 'review_count'])

In [61]:
iterator = 0
for uid in uids_top_500:
    user_df.select('user_id','user_name', 'review_count') \
            .filter(user_df.user_id.isin([uid]) == True).show(truncate=False)

    sim_users = getUserDetails(getSimilarUsers([uid], all_user_vecs, 10))
    sim_users_pd = sim_users.select('input_user_id', 'a.user_id', 'user_name', 'score','review_count').toPandas()
    sim_users_all = pd.concat([sim_users_all, sim_users_pd])
    sim_users_all = sim_users_all.drop_duplicates()
    sim_users_all = sim_users_all.sort_values(by = 'input_user_id')
    iterator = iterator + 1
    print(iterator)
    if iterator > 20:
        sim_users_all.to_csv("sim_users_200.csv")

with open("sim_users_200.txt", "wb") as fp:   #Pickling
    pickle.dump(sim_users_all, fp)

sim_users_all.to_csv("sim_users_500.csv")

+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|8RcEwGrFIgkt9WQ35E6SnQ|George   |7764        |
+----------------------+---------+------------+

8RcEwGrFIgkt9WQ35E6SnQ
1


  


1
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|Xwnf20FKuikiHcSpcEbpKQ|Kenneth  |6653        |
+----------------------+---------+------------+

Xwnf20FKuikiHcSpcEbpKQ
1
2
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|CxDOIDnH8gp9KXzpBHJYXw|Jennifer |5868        |
+----------------------+---------+------------+

CxDOIDnH8gp9KXzpBHJYXw
1
3
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|HFECrzYDpgbS5EmTBtj2zQ|Eric     |5344        |
+----------------------+---------+------------+

HFECrzYDpgbS5EmTBtj2zQ
1
4
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|Hi10sGSZNxQH3NLyWSZ1oA|Fox      |4537        |
+-

32
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|9akppeqi5dnaIqPyJ75aCw|Mimi     |2310        |
+----------------------+---------+------------+

9akppeqi5dnaIqPyJ75aCw
1
33
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|IucvvxdQXXhjQ4z6Or6Nrw|Sunil    |2293        |
+----------------------+---------+------------+

IucvvxdQXXhjQ4z6Or6Nrw
1
34
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|iLjMdZi0Tm7DQxX1C1_2dg|Ruggy    |2258        |
+----------------------+---------+------------+

iLjMdZi0Tm7DQxX1C1_2dg
1
35
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|PLOYtrCMUFPHQe2IbYAd5g|Joanna   |2203        

63
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|lvGlIBv4xkHiQBnNOuRYZQ|Jonathan |1687        |
+----------------------+---------+------------+

lvGlIBv4xkHiQBnNOuRYZQ
1
64
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|dHxvSPU6PZqX1LQRXajGDQ|Kimberly |1659        |
+----------------------+---------+------------+

dHxvSPU6PZqX1LQRXajGDQ
1
65
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|-3s52C4zL_DHRK0ULG6qtg|Sara     |1657        |
+----------------------+---------+------------+

-3s52C4zL_DHRK0ULG6qtg
1
66
+----------------------+-----------+------------+
|user_id               |user_name  |review_count|
+----------------------+-----------+------------+
|v-HcprOKPC1_F5_15t4JDw|Christopher|1646

94
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|pRfWy61wU9e1nrCVrOEWLA|Richard  |1434        |
+----------------------+---------+------------+

pRfWy61wU9e1nrCVrOEWLA
1
95
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|9GhiQOLaM6ZGrrFG-SppwQ|Bryan    |1429        |
+----------------------+---------+------------+

9GhiQOLaM6ZGrrFG-SppwQ
1
96
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|CJmtS1Uk1KSsY2MWol_E7g|Asuka    |1397        |
+----------------------+---------+------------+

CJmtS1Uk1KSsY2MWol_E7g
1
97
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|dsX3MiQiTy3OVI8MKn0I4w|Andy     |1385        

125
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|rQLEfpfSjVK-3dBmIMKZDA|Luke     |1254        |
+----------------------+---------+------------+

rQLEfpfSjVK-3dBmIMKZDA
1
126
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|r9Wii37p0kEDIUt2e2_LqQ|Randy    |1252        |
+----------------------+---------+------------+

r9Wii37p0kEDIUt2e2_LqQ
1
127
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|wo632pzj23-Di2RakSugjQ|Jeff     |1248        |
+----------------------+---------+------------+

wo632pzj23-Di2RakSugjQ
1
128
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|NNL1zLTP2J_SOputgoPYeQ|Alex     |1244    

156
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|bVthsfeUU3Bd5AeJLqKT-w|Flora    |1157        |
+----------------------+---------+------------+

bVthsfeUU3Bd5AeJLqKT-w
1
157
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|aIzN19RssPWSw9nDCPXU3A|Jonathan |1154        |
+----------------------+---------+------------+

aIzN19RssPWSw9nDCPXU3A
1
158
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|Q9mA60HnY87C1TW5kjAZ6Q|Evelyn   |1140        |
+----------------------+---------+------------+

Q9mA60HnY87C1TW5kjAZ6Q
1
159
+----------------------+---------+------------+
|user_id               |user_name|review_count|
+----------------------+---------+------------+
|hntJTn1Ev6TXfusbvTdutw|Lucky    |1139    

Py4JJavaError: An error occurred while calling o42506.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3136.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3136.0 (TID 99908, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at net.razorvine.pickle.PickleUtils.readbytes(PickleUtils.java:54)
	at net.razorvine.pickle.Unpickler.load_long_binput(Unpickler.java:655)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:236)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$5.apply(limit.scala:150)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$5.apply(limit.scala:149)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at net.razorvine.pickle.PickleUtils.readbytes(PickleUtils.java:54)
	at net.razorvine.pickle.Unpickler.load_long_binput(Unpickler.java:655)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:236)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$5.apply(limit.scala:150)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$5.apply(limit.scala:149)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 61865)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/anaconda3/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/anaconda3/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/anaconda3/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/local/Cellar/apache-spark/2.3.1/libexec/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/Cellar/apache-spark/2.3.1/libexec/python/pyspark/serializers.py", line 685, in read_int
    raise EOFError
EOFError
----------------------------------------


In [None]:
merged_100 = pd.DataFrame(columns = ['business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y'])

for i in range(len(sim_users_all)):
    review_uid1 = review.loc[review['user_id'].isin([sim_users_all.input_user_id.iloc[i]])] 
    review_uid2 = review.loc[review['user_id'].isin([sim_users_all.user_id.iloc[i]])] 
    merged = pd.merge(review_uid1, review_uid2, how = 'inner', on = 'business_id')
    merged = merged[['user_id_x', 'user_id_y', 'business_id', 'stars_x', 'stars_y', 'text_x', 'text_y', 'date_x', 'date_y']]
    merged_100 = pd.concat([merged_100, merged])
    
with open("user_simil_business_ratings.txt", "wb") as fp:   #Pickling
    pickle.dump(merged_100, fp)

merged_100.to_csv("user_simil_business_ratings_500.csv")

In [None]:
# use this similarity matrix, recommend highly rated businesses from similar users
# high similarity can be seen, even if they have not reviewed a business in common.
# using similar model of users for businesses, can identify highly similar businesses
    # these businesses can be similar in ratings, category of business