In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import pandas as pd

                                                                                

In [12]:
spark = SparkSession.builder.appName('YelpRec').getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])


In [13]:
dataDir = "gs://msca-bdp-student-gcs/group2/yelp-datasample2"
business = spark.read.json(dataDir + "/sample_business")
checkin = spark.read.json(dataDir + "/sample_checkin")
review = spark.read.json(dataDir + "/sample_review")
tip = spark.read.json(dataDir + "/sample_tip")
user = spark.read.json(dataDir + "/sample_user")

                                                                                

In [14]:
business.show(5)

+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|            address|          attributes|         business_id|          categories|               city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|      2141 Caton St|{null, null, null...|FR5xx5YEzxAnoMdqG...|Fitness & Instruc...|        New Orleans|{0:0-0:0, 0:0-0:0...|      1|   29.9991154|   -90.0607732|     Anytime Fitness|      70122|           6|  3.0|   LA|
|5413 Neshaminy Blvd|{null, null, u'no...|p184f-Zvf4ToPwLba...|Chinese, Restaura...|           Bensalem|{11:30-23:0,

In [15]:
business.take(1)

[Row(address='2141 Caton St', attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, Ambience=None, BYOB=None, BYOBCorkage=None, BestNights=None, BikeParking='True', BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards='True', BusinessParking="{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}", ByAppointmentOnly='False', Caters=None, CoatCheck=None, Corkage=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids='False', GoodForMeal=None, HairSpecializesIn=None, HappyHour=None, HasTV=None, Music=None, NoiseLevel=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None), business_id='FR5xx5YEzxAnoMdqGl7vRQ', categories='Fitness & Instruction, Gyms, Weight Loss Centers, Health & Medical, Trainers, Active Lif

In [16]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: string (nullable = true)
 |    |-- GoodForKids: string (nul

## Recommendation system

In [25]:
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType,StructField,IntegerType
import matplotlib.pyplot as plt
%matplotlib inline

In [17]:
df_business = business.select("business_id","name", "stars", 
                                 "review_count", "attributes", 
                                 "categories", "city").withColumnRenamed("stars", "stars_restaurant")

#df_business = df_business.filter((df_business['city'] == 'Toronto') & (df_business.categories.contains('Restaurants'))).drop('city')
df_business = df_business.filter(df_business.categories.contains('Restaurants')).drop('city')

In [19]:
df_review = review.join(df_business, on='business_id', how='inner')

In [20]:
df_review.select(['business_id', 'user_id', 'stars']).show()

+--------------------+--------------------+-----+
|         business_id|             user_id|stars|
+--------------------+--------------------+-----+
|PWGcA2s9sf4iMb4dc...|StZTVDuFzahNvjl5q...|  5.0|
|Fbtcj6j9-byxq68AC...|SdsUs5x9RU6UXBdAR...|  5.0|
|SKcKQOqjMCUYsQHuJ...|UWEamXSwBYCYz_iAd...|  1.0|
|yR_EfNX8NF_JQTo2L...|em25aY8vM7SXDMTlC...|  5.0|
|2rOBCDR32E8GXAya7...|73MRrt4TT5PUWkNwo...|  4.0|
|dqhpX2YseNDqoWQCQ...|9jYJxYIt0eueKR6g4...|  2.0|
|SCjUeZfjafJ88fWWQ...|dHDOOMOkrn4jEJsXF...|  1.0|
|SLuak4P9JyB_T6Dsk...|ZiKLC4zrw21aq6cVE...|  5.0|
|DsPomqfQZ44v5wD-M...|ZoNfQYo2AY9KVZ6yt...|  4.0|
|SCjUeZfjafJ88fWWQ...|pG9RRevErfNk-MCgL...|  5.0|
|_NiLHTix7qYc3CTcR...|yRRqo-LWmxz9oa1Ek...|  5.0|
|DsPomqfQZ44v5wD-M...|U2bCFCjqMbkO0Bp2S...|  2.0|
|Fzrpwup-gVia-806_...|_y9Zm4YaZyMiRou_z...|  4.0|
|-sTrihdzACrsOSu1F...|yG1GzPk_U4D7caYQT...|  5.0|
|0lxPrv9a3zo68r8ds...|6HxW1iSqG0nS4Obvl...|  5.0|
|_tvR2q9UJmFAxf7FE...|3BoVJJRVBgfqKIbPI...|  4.0|
|t24lsA2rPTTAiWEq6...|TPZvG0UXOkS1Nxi2G...|  3.0|


In [21]:
df_review.count()

                                                                                

385

In [22]:
df_review.take(1)

[Row(business_id='PWGcA2s9sf4iMb4dcLz6Hg', cool=0, date='2021-11-10 20:54:26', funny=0, review_id='sTvp0m9uss5BU6rGBh1IwQ', stars=5.0, text='I have been eating here since 2008 and I have to say they are one of the most consistent restaurants in the entire valley.  The food is always great, service excellent, and the beer is cold.  This is an extremely well managed restaurant.  Alway great...every time!', useful=0, user_id='StZTVDuFzahNvjl5qu6l7Q', name='Goodwood Barbecue Company', stars_restaurant=4.0, review_count=247, attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol="u'full_bar'", Ambience="{'touristy': False, 'hipster': False, 'romantic': False, 'divey': False, 'intimate': False, 'trendy': False, 'upscale': False, 'classy': True, 'casual': True}", BYOB=None, BYOBCorkage=None, BestNights="{'monday': False, 'tuesday': False, 'friday': False, 'wednesday': False, 'thursday': False, 'sunday': False, 'saturday': True}", BikeParking='True', BusinessAcceptsBitcoin=None, Busin

In [23]:
df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- stars_restaurant: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = 

In [26]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['business_id', 'user_id']]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(df_review).transform(df_review)
transformed.select(['business_id', 'user_id','business_id_index', 'user_id_index'])

                                                                                

DataFrame[business_id: string, user_id: string, business_id_index: double, user_id_index: double]

In [34]:
transformed.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+----------------+------------+--------------------+--------------------+-----------------+-------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|                name|stars_restaurant|review_count|          attributes|          categories|business_id_index|user_id_index|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+----------------+------------+--------------------+--------------------+-----------------+-------------+
|PWGcA2s9sf4iMb4dc...|   0|2021-11-10 20:54:26|    0|sTvp0m9uss5BU6rGB...|  5.0|I have been eatin...|     0|StZTVDuFzahNvjl5q...|Goodwood Barbecue...|             4.0|         247|{null, null, u'fu...|Sandwiches, Cockt...|             

In [27]:
(training, test) = transformed.randomSplit([0.8, 0.2])

In [31]:
als=ALS(maxIter=5,
        regParam=0.09,
        rank=25,
        userCol="user_id_index",
        itemCol="business_id_index",
        ratingCol="stars",
        coldStartStrategy="drop",
        nonnegative=True)

model=als.fit(training)

                                                                                

In [33]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions.show()

[Stage 235:>                                                        (0 + 1) / 1]

+-----------+----+----+-----+---------+-----+----+------+-------+----+----------------+------------+----------+----------+-----------------+-------------+----------+
|business_id|cool|date|funny|review_id|stars|text|useful|user_id|name|stars_restaurant|review_count|attributes|categories|business_id_index|user_id_index|prediction|
+-----------+----+----+-----+---------+-----+----+------+-------+----+----------------+------------+----------+----------+-----------------+-------------+----------+
+-----------+----+----+-----+---------+-----+----+------+-------+----+----------------+------------+----------+----------+-----------------+-------------+----------+



                                                                                

In [32]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

                                                                                

IllegalArgumentException: requirement failed: Nothing has been added to this summarizer.