# Yelp star predictions using Spark
This notebook implements linear regression to predict star rating based on information about the business in Yelp. It uses Spark (specifically pyspark) to read data, engineer features, create a pipeline, fit a model and evaluate it's performance. The data set used can be found here: https://www.yelp.com/dataset/

In [1]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
            .appName("yelp-review") \
            .getOrCreate()

Read the JSON

In [2]:
rest_df = spark.read.json('data/restaurants/')

In [3]:
rest_df.show()

+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|Accepts Credit Cards| Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|          city|  latitude|  longitude|neighborhoods|review_count|stars|state|
+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|                true|full_bar|[false,false,fals...|[false,false,fals...|          1|    true|             false|      Braddock| 40.408735|-79.8663507|           []|          11|  4.5|   PA|
|                true|full_bar|[false,false,fals...|[false,true,false...|          1|    true|             false|      Carnegie| 40.415517| -80.067534|  [Greentree]|          15|  4.0|   PA|
|                true|    none|[false,false,f

In [4]:
rest_df.printSchema()

root
 |-- Accepts Credit Cards: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Good For: struct (nullable = true)
 |    |-- breakfast: boolean (nullable = true)
 |    |-- brunch: boolean (nullable = true)
 |    |-- dessert: boolean (nullable = true)
 |    |-- dinner: boolean (nullable = true)
 |    |-- latenight: boolean (nullable = true)
 |    |-- lunch: boolean (nullable = true)
 |-- Parking: struct (nullable = true)
 |    |-- garage: boolean (nullable = true)
 |    |-- lot: boolean (nullable = true)
 |    |-- street: boolean (nullable = true)
 |    |-- valet: boolean (nullable = true)
 |    |-- validated: boolean (nullable = true)
 |-- Price Range: long (nullable = true)
 |-- Take-out: boolean (nullable = true)
 |-- Takes Reservations: boolean (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: array (nullable = true)
 |    |-- element: string (containsNull = 

In [5]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [6]:
rest_df.createOrReplaceTempView("rest")

In [7]:
rest_df.columns

['Accepts Credit Cards',
 'Alcohol',
 'Good For',
 'Parking',
 'Price Range',
 'Take-out',
 'Takes Reservations',
 'city',
 'latitude',
 'longitude',
 'neighborhoods',
 'review_count',
 'stars',
 'state']

In [8]:
res = spark.sql("select distinct `Accepts Credit Cards` from rest")
res.show()

+--------------------+
|Accepts Credit Cards|
+--------------------+
|               false|
|                true|
+--------------------+



In [9]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.mllib.linalg import VectorUDT, Vectors

In [10]:
rest_df.dtypes

[('Accepts Credit Cards', 'string'),
 ('Alcohol', 'string'),
 ('Good For',
  'struct<breakfast:boolean,brunch:boolean,dessert:boolean,dinner:boolean,latenight:boolean,lunch:boolean>'),
 ('Parking',
  'struct<garage:boolean,lot:boolean,street:boolean,valet:boolean,validated:boolean>'),
 ('Price Range', 'bigint'),
 ('Take-out', 'boolean'),
 ('Takes Reservations', 'boolean'),
 ('city', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('neighborhoods', 'array<string>'),
 ('review_count', 'bigint'),
 ('stars', 'double'),
 ('state', 'string')]

In [11]:
for col, dtype in rest_df.dtypes:
    if dtype in ['boolean', 'bigint', 'long']:
        rest_df = rest_df.withColumn(col, rest_df[col].cast('double'))

In [12]:
rest_df = rest_df.withColumn('Accepts Credit Cards', rest_df['Accepts Credit Cards'].cast('boolean').cast('double'))

In [13]:
rest_df.show()

+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|Accepts Credit Cards| Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|          city|  latitude|  longitude|neighborhoods|review_count|stars|state|
+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|                 1.0|full_bar|[false,false,fals...|[false,false,fals...|        1.0|     1.0|               0.0|      Braddock| 40.408735|-79.8663507|           []|        11.0|  4.5|   PA|
|                 1.0|full_bar|[false,false,fals...|[false,true,false...|        1.0|     1.0|               0.0|      Carnegie| 40.415517| -80.067534|  [Greentree]|        15.0|  4.0|   PA|
|                 1.0|    none|[false,false,f

In [15]:
# fix boolean structs to be Vector Columns of 1s and 0s

def ary_xform(ary):
    return [float(int(x)) if x is not None else 0.0 for x in ary]

udf_bool_ary = F.udf(ary_xform, ArrayType(DoubleType()))
udf_ary_2_vec = F.udf(lambda x: Vectors.dense(x), VectorUDT())

# e.g.
rest_df = rest_df.withColumn('Parking', udf_bool_ary('Parking'))
rest_df = rest_df.withColumn('Parking', udf_ary_2_vec('Parking'))

In [16]:
rest_df = rest_df.withColumn('Good For', udf_bool_ary('Good For'))
rest_df = rest_df.withColumn('Good For', udf_ary_2_vec('Good For'))

In [17]:
rest_df = rest_df.withColumn('Good For', udf_bool_ary('Good For'))
rest_df = rest_df.withColumn('Good For', udf_ary_2_vec('Good For'))

In [18]:
rest_df.show()

+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|Accepts Credit Cards| Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|          city|  latitude|  longitude|neighborhoods|review_count|stars|state|
+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|                 1.0|full_bar|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|        1.0|     1.0|               0.0|      Braddock| 40.408735|-79.8663507|           []|        11.0|  4.5|   PA|
|                 1.0|full_bar|[0.0,0.0,0.0,1.0,...|[0.0,1.0,0.0,0.0,...|        1.0|     1.0|               0.0|      Carnegie| 40.415517| -80.067534|  [Greentree]|        15.0|  4.0|   PA|
|                 1.0|    none|[0.0,0.0,0.0,0

In [19]:
train, test = rest_df.randomSplit([7.0,3.0])

In [20]:
train.show()

+--------------------+-------------+--------------------+--------------------+-----------+--------+------------------+------------+----------+-----------+-------------+------------+-----+-----+
|Accepts Credit Cards|      Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|        city|  latitude|  longitude|neighborhoods|review_count|stars|state|
+--------------------+-------------+--------------------+--------------------+-----------+--------+------------------+------------+----------+-----------+-------------+------------+-----+-----+
|                 0.0|beer_and_wine|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|        1.0|     1.0|               0.0|   Ettlingen|   48.9535|    8.38004|           []|         3.0|  3.5|   BW|
|                 0.0|beer_and_wine|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|        1.0|     1.0|               0.0|   Karlsruhe|   49.0018|     8.3939|           []|         6.0|  4.5|   BW|
|                 0.0|beer_and

In [21]:
from pyspark.ml.feature import StringIndexer

alc_indexer = StringIndexer(inputCol="Alcohol", outputCol="alcoholIndex")
state_indexer = StringIndexer(inputCol="state", outputCol="stateIndex")
city_indexer = StringIndexer(inputCol="city", outputCol="cityIndex")


In [22]:
from pyspark.ml.feature import OneHotEncoder

alc_ohe = OneHotEncoder(inputCol="alcoholIndex", outputCol="alc_ohe")
state_ohe = OneHotEncoder(inputCol="stateIndex", outputCol="state_ohe")
city_ohe = OneHotEncoder(inputCol="cityIndex", outputCol="city_ohe")
price_ohe = OneHotEncoder(inputCol="Price Range", outputCol="price_ohe")


In [23]:
from pyspark.ml.feature import CountVectorizer

neighborhod_cv = CountVectorizer(inputCol='neighborhoods', outputCol='neighborhoods_cv')

In [24]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["alcoholIndex", "stateIndex", "cityIndex", "alc_ohe", "state_ohe","city_ohe","neighborhoods_cv","price_ohe","review_count","latitude","longitude"],
    outputCol="features")



In [25]:
lr = LinearRegression(maxIter=5, labelCol="stars", featuresCol="features", predictionCol="prediction")

In [26]:
pipeline = Pipeline(stages=[alc_indexer, state_indexer, city_indexer,alc_ohe,state_ohe,city_ohe,price_ohe,neighborhod_cv,assembler,lr])


In [27]:
m = pipeline.fit(train)

In [28]:
t = m.transform(train)
t.show()

+--------------------+-------------+--------------------+--------------------+-----------+--------+------------------+------------+----------+-----------+-------------+------------+-----+-----+------------+----------+---------+---------+--------------+-----------------+-------------+----------------+--------------------+------------------+
|Accepts Credit Cards|      Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|        city|  latitude|  longitude|neighborhoods|review_count|stars|state|alcoholIndex|stateIndex|cityIndex|  alc_ohe|     state_ohe|         city_ohe|    price_ohe|neighborhoods_cv|            features|        prediction|
+--------------------+-------------+--------------------+--------------------+-----------+--------+------------------+------------+----------+-----------+-------------+------------+-----+-----+------------+----------+---------+---------+--------------+-----------------+-------------+----------------+---------------

In [29]:
rest_df.show()

+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|Accepts Credit Cards| Alcohol|            Good For|             Parking|Price Range|Take-out|Takes Reservations|          city|  latitude|  longitude|neighborhoods|review_count|stars|state|
+--------------------+--------+--------------------+--------------------+-----------+--------+------------------+--------------+----------+-----------+-------------+------------+-----+-----+
|                 1.0|full_bar|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|        1.0|     1.0|               0.0|      Braddock| 40.408735|-79.8663507|           []|        11.0|  4.5|   PA|
|                 1.0|full_bar|[0.0,0.0,0.0,1.0,...|[0.0,1.0,0.0,0.0,...|        1.0|     1.0|               0.0|      Carnegie| 40.415517| -80.067534|  [Greentree]|        15.0|  4.0|   PA|
|                 1.0|    none|[0.0,0.0,0.0,0

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

In [31]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="stars", metricName="rmse")

In [32]:
m = pipeline.fit(test)
evaluator.evaluate(m.transform(test))

0.6009113789014233

In [33]:
m = pipeline.fit(train)
evaluator.evaluate(m.transform(train))

0.615426212198504

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

output = ParamGridBuilder().build()
cv = CrossValidator(estimator = pipeline, numFolds=3, evaluator=evaluator, estimatorParamMaps=output, handleInvalid='keep')
cvModel = cv.fit(rest_df)