In [1]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt 
import seaborn as sns 
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName('recommendation').getOrCreate()

In [3]:
la = pd.read_csv("../scraper/la_restaurant.csv")
sd = pd.read_csv('../scraper/san_diego_restaurant.csv')
nyc = pd.read_csv('../scraper/NYC_restaurant.csv')
sf = pd.read_csv('../scraper/sf_restaurant.csv')
oc = pd.read_csv('../scraper/OC_restaurant.csv')


In [4]:
df = pd.concat([la,sd,oc,sf,nyc])

# Data Cleaning, Wrangling and Prepping

In [5]:
df['rating'] = df['rating']/10
df = df[~df['restaurant'].str.contains('http')]

### Cleaning rating, user, and restuarant data for ALS

In [6]:
from pyspark.sql.functions import monotonically_increasing_id, col, avg

In [7]:
def clean_for_als(df, min_reviews):
    
    df_spark = spark.createDataFrame(df)

    users = df_spark.select('user_name').distinct()
    users = users.coalesce(1)
    users = users.withColumn("user_id", monotonically_increasing_id()).persist()
    

    rest = df_spark.select('restaurant').distinct()
    rest = rest.coalesce(1)
    rest = rest.withColumn('rest_id', monotonically_increasing_id()).persist()
    
    df_ids = df_spark.join(users,"user_name","left").join(rest,'restaurant','left')
    final_df = df_ids.select(col('user_id'), col('rest_id'),col('rating'))
    
    final_df = final_df.groupBy(['user_id','rest_id']).avg()
    final_df = final_df.select(col('user_id'), col('rest_id'),col('avg(rating)').alias('rating'))
    
    users_spark = final_df.groupBy('user_id').count().filter(col('count')>=min_reviews).select('user_id')
    users_spark_list = list(users_spark.toPandas()['user_id'])
    
    final_df = final_df[final_df.user_id.isin(users_spark_list)]

    return final_df

In [None]:
final_df = clean_for_als(df,6)

In [None]:
final_df.count()

## Exploratory Data Analysis
---
### 1) Numerical Data 

### Ratings descriptive statistics across all restaurants

In [None]:
df['rating'].describe()

### The amount of reviews per restuarant

In [None]:
df['restaurant'].value_counts().describe()

In [None]:
review_counts = df['restaurant'].value_counts()


fig, axes = plt.subplots(nrows=1,ncols=2, figsize=(13,5))
axes[0].hist(df['rating'])
axes[0].set_title('Ratings Distribution')
axes[0].set_ylabel('Number of Ratings')
axes[0].set_xlabel('Rating')

axes[1].hist(review_counts, bins=18)
axes[1].set_title('Review Counts Distribution')
axes[1].set_ylabel('Number of Restaurants')
axes[1].set_xlabel('Number of Reviews')


plt.show()

- Ratings and review counts are skewd to the left. The average rating is is a 4.26 while the max is a 5. More popular restuarants get the majority of the reviews. This is confirmed by the the mean for review counts by restaurant is 70 despite a 56 count being the 75th percentile. 75 percent of restuarants have less than 75 percent of total reviews.

### 2) Categorical Plots

In [None]:
avg_rating_restaurant = df.groupby('restaurant')['rating'].mean()

In [None]:
top_by_count = df['restaurant'].value_counts().head(20)


In [None]:
avg_rating_restaurant[top_by_count.index].plot.bar()
plt.title('Average Rating per Most Reviewed Fast Food Restaurant')
plt.ylabel('Average Rating')
plt.show()

In [None]:
top_by_count.plot.bar()
plt.title('Number of Reviews Per Most Reviewed Restaurant')
plt.ylabel('Count')
plt.show()

# ALS

In [11]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [None]:
print("**Best Model**")
print(f"RMSE =  {rmse}")
print(f" Rank: {best2.rank}")
print(f" MaxIter: {best2._java_obj.parent().getMaxIter()}")
print(f" RegParam: {best2._java_obj.parent().getRegParam()}")

In [12]:
def grid_search(df,min_reviews):
    models = {}
    rmses = {}
    for i in min_reviews:
        final = clean_for_als(df,i)
        (train,test) = final.randomSplit([0.8,0.2])

        als = ALS(userCol="user_id" , itemCol="rest_id", ratingCol="rating",coldStartStrategy="drop",
                  nonnegative = True, implicitPrefs = False) 

        param_grid = ParamGridBuilder().addGrid(als.rank,[15,18,20]).addGrid(als.maxIter, [18,19]).addGrid(als.regParam, [.12,.13]).build()

        evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

        cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 5)

        model = cv.fit(train)
        
        best = model.bestModel
        predictions = best.transform(test)
        rmse = evaluator.evaluate(predictions)
        
        models[i] = best
        rmses[i] = rmse
        
    return models, rmses


In [13]:
min_reviews = [4,5,6,7,8,9]

models, rmses = grid_search(df,min_reviews)

In [14]:
rmses

{4: 1.13989692922892,
 5: 1.1377484693801205,
 6: 1.1454884849460252,
 7: 1.1791340256260716,
 8: 1.1503140768512947,
 9: 1.169544231246566}

5 minimum review yielded the lowest rmse errors 

In [17]:
key_min = min(rmses.keys(), key=(lambda k: rmses[k]))
best = models[key_min]

In [19]:
print("**Best Model**")
print(f"RMSE =  {rmses[key_min]}")
print(f" Rank: {best.rank}")
print(f" MaxIter: {best._java_obj.parent().getMaxIter()}")
print(f" RegParam: {best._java_obj.parent().getRegParam()}")

**Best Model**
RMSE =  1.1377484693801205
 Rank: 20
 MaxIter: 19
 RegParam: 0.13


In [21]:
final_df = clean_for_als(df, key_min)

In [22]:
final_df.show(5)

+-------+-------+------+
|user_id|rest_id|rating|
+-------+-------+------+
|  50717|    251|   5.0|
|  58914|    435|   5.0|
|  51780|    158|   5.0|
|   7420|    477|   5.0|
|  21945|    218|   4.0|
+-------+-------+------+
only showing top 5 rows

