In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d yelp-dataset/yelp-dataset
!unzip yelp-dataset.zip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Downloading yelp-dataset.zip to /content
100% 4.07G/4.07G [00:40<00:00, 101MB/s]
100% 4.07G/4.07G [00:40<00:00, 107MB/s]
Archive:  yelp-dataset.zip
  inflating: Dataset_User_Agreement.pdf  
  inflating: yelp_academic_dataset_business.json  
  inflating: yelp_academic_dataset_checkin.json  
  inflating: yelp_academic_dataset_review.json  
  inflating: yelp_academic_dataset_tip.json  
  inflating: yelp_academic_dataset_user.json  


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=3591afdce7b574640bcc1b20fe5550b0776486411105ec6d0d9a65b7fc39fdea
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Importing Libraries

In [3]:
%matplotlib inline
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, MapType
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.sql import Window
from pyspark.ml.evaluation import RegressionEvaluator
sns.set_theme(style="whitegrid", palette="pastel")

In [4]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

In [5]:
# conf = SparkConf().set("spark.kryoserializer.buffer.max", "4g")
spark = SparkSession.builder.getOrCreate()
spark_context = spark.sparkContext
sqlContext = SQLContext(spark_context)



# Data Loading

## Loading Businesses Dataset

In [6]:
businesses = spark.read.json("yelp_academic_dataset_business.json")

## Loading Reviews Dataset

In [7]:
reviews =  spark.read.json("yelp_academic_dataset_review.json")

## Loading Users Dataset

In [8]:
users =  spark.read.json("yelp_academic_dataset_user.json")

# Data Cleaning

## Cleaning Businesses Dataset

**Renaming and Dropping Columns**

In [9]:
#change name for starts to avoid duplicates
businesses=businesses.withColumnRenamed("stars", "Restaurant_stars")
businesses=businesses.withColumnRenamed("name", "Restaurant_name")
businesses=businesses.filter(F.col('categories').rlike('Restaurants'))
businesses = businesses.select('*','attributes.*','hours.*')
columns_to_drop = ['address','postal_code','review_count','attributes','hours']
businesses = businesses.drop(*columns_to_drop)

**Renaming and Filtering Columns**

In [10]:
businesses = businesses.withColumn('categories', F.regexp_replace(F.col("categories"), "(,?\ ?Restaurants,?)", ""))
businesses = businesses.withColumn('categories', F.regexp_replace(F.col("categories"), "( ?)", ""))
businesses = businesses.filter(F.col("is_open").contains("1"))

In [11]:
businesses = businesses.filter(F.col('state')=='PA')

## Cleaning Reviews Dataset

**Dropping Unrequired Columns**

In [12]:
columns_to_drop = ['cool','funny','average_stars']
reviews = reviews.drop(*columns_to_drop)

## Cleaning Users Dataset

**Dropping and renaming columns**

In [13]:
columns_to_drop = ['elite','useful','yelping_since','review_count','average_stars']
users = users.drop(*columns_to_drop)
users=users.withColumnRenamed("name", "user_name")

# Data Transformation and Merging

Converting Ids(uuid/hex) to int

In [14]:
w = Window().orderBy('business_id')
businesses= businesses.withColumn("business_id_int", F.row_number().over(w))
w = Window().orderBy('user_id')
users= users.withColumn("user_id_int", F.row_number().over(w))

In [15]:
#joining three tables into one table in case need for future
df = reviews.join(businesses,on ='business_id', how = 'inner')
df = df.join(users,on ='user_id', how = 'inner')

In [16]:
# df.show(5) # Takes too much time to execute, so commenting it for now

+--------------------+--------------------+-------------------+--------------------+-----+--------------------+------+--------------------+------------+-------+-------------+--------------+--------------------+----------------+-----+----------------+-----------+-----------+--------------------+----+-----------+--------------------+-----------+----------------------+--------------------------+--------------------+-----------------+------+---------+-------+-------------------+-----------+---------+--------------+-----------+--------------------+-----------------+---------+-----+--------------------+----------+-----------+--------------+-----------------+-------------------------+-------------------+------------------------+----------------------+-----------------------+-----------------------+------------------+-------+--------------------+-------+----------+-------+----------+---------+----------+----------+----------+---------------+---------------+---------------+----------------+----

# **ASL Model**

In [17]:
# Importing ALS Library
from pyspark.ml.recommendation import ALS

**Selecting Required Columns**

In [18]:
ratings = df.select('user_id_int','business_id_int','Restaurant_name','user_name','stars')

In [19]:
train_df, test_df = ratings.randomSplit([.8,.2],seed=1)
als = ALS(maxIter=10, regParam=0.3, userCol="user_id_int", itemCol="business_id_int", ratingCol="stars",
          coldStartStrategy="drop", rank=10, nonnegative = True)

In [51]:
model = als.fit(df)
predictions = model.transform(df)

In [21]:
predictions.show(5)

+-----------+---------------+--------------------+---------+-----+----------+
|user_id_int|business_id_int|     Restaurant_name|user_name|stars|prediction|
+-----------+---------------+--------------------+---------+-----+----------+
|         44|           1070|Dragon & Phoenix ...|     Bert|  4.0|  3.567086|
|         44|           7572|     JJ Thai Cuisine|     Bert|  5.0| 4.3269286|
|         44|           7639|        Vetri Cucina|     Bert|  4.0|  4.421284|
|         44|           7680|            El Limon|     Bert|  5.0|  4.299594|
|        127|           1378|The Farm and Fish...|  Michael|  1.0|0.97495496|
+-----------+---------------+--------------------+---------+-----+----------+
only showing top 5 rows



In [22]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='stars')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.3014098322040726


In [53]:
userRecs = model.recommendForAllUsers(3)

In [24]:
userRecs.show(5)

+-----------+--------------------+
|user_id_int|     recommendations|
+-----------+--------------------+
|          5|[{643, 3.7245593}...|
|         19|[{554, 2.2800214}...|
|         31|[{554, 4.4678254}...|
|         41|[{3671, 1.139868}...|
|         44|[{554, 5.274683},...|
+-----------+--------------------+
only showing top 5 rows



In [55]:
userRecs_DF = (userRecs
  .select("user_id_int", F.explode("recommendations")
  .alias("recommendation"))
  .select("user_id_int", "recommendation.*")
)
userRecs_DF2 = userRecs_DF.join(users.select('user_id_int','user_name'), on='user_id_int', how ='inner').join(businesses.select('business_id_int','Restaurant_name'), on='business_id_int', how ='inner')
userRecs_DF2_pd = userRecs_DF2.toPandas()

In [26]:
userRecs_DF2_pd.sort_values(['user_name','rating'],ascending=[True,False]).head(20)

Unnamed: 0,business_id_int,user_id_int,rating,user_name,Restaurant_name
17,554,48,5.577898,Amber,Frog Commissary
16,1912,48,5.396627,Amber,Sunny Chang's Pizza & More
15,3671,48,5.38192,Amber,The Chilly Banana
5,554,19,2.280021,Andrew,Frog Commissary
4,4859,19,2.212214,Andrew,Chef Jeff’s Hot Meals To-Go
3,6083,19,2.191591,Andrew,Cherish Philly
8,554,31,4.467825,Anthony,Frog Commissary
7,610,31,4.245423,Anthony,Academic Bistro
6,3068,31,4.216107,Anthony,Umi Sushi And Seafood
14,554,44,5.274683,Bert,Frog Commissary


In [42]:
def get_business_id(user_id):
    result = df.filter(df.user_id == user_id).select("user_id_int").collect()
    if result:
        return result[0].user_id_int
    else:
        return None

# Recommendations for Desirae

In [57]:
userId = get_business_id("6O4seIFz_buDGYXCOIT03A")
userRecs_DF2_pd[userRecs_DF2_pd["user_id_int"] == userId][["user_name","Restaurant_name","rating"]]

Unnamed: 0,user_name,Restaurant_name,rating
46803,Desirae,Steel Penny Cafe,5.619937
46804,Desirae,Otolith Sustainable Seafood,5.552305
46805,Desirae,The Chilly Banana,5.521576


# Recommendations for Aaron

In [58]:
userId = get_business_id("A3DrdXmkNb1I6x-lSbj96g")
userRecs_DF2_pd[userRecs_DF2_pd["user_id_int"] == userId][["user_name","Restaurant_name","rating"]]

Unnamed: 0,user_name,Restaurant_name,rating
70068,Aaron,Sunny Chang's Pizza & More,1.044408
70069,Aaron,Steel Penny Cafe,1.038287
70070,Aaron,Academic Bistro,1.024865


# Recommendations for Brett

In [59]:
userId = get_business_id("pou3BbKsIozfH50rxmnMew")
userRecs_DF2_pd[userRecs_DF2_pd["user_id_int"] == userId][["user_name","Restaurant_name","rating"]]

Unnamed: 0,user_name,Restaurant_name,rating
743433,Brett,Academic Bistro,4.982213
743434,Brett,Otolith Sustainable Seafood,4.95285
743435,Brett,El Primo Produce,4.901747
