# Random Forest Analysis on Yelp Dataset

# Step: Initiating Spark Session

In [1]:
#importing spark session
import findspark

findspark.init()

In [2]:
#importing library
import pyspark
from pyspark.sql import SparkSession

In [3]:
#unified entry point to the cluster
spark = SparkSession.builder.getOrCreate()

In [4]:
#SparkContext and SQLContext
sc = spark.sparkContext
sqlContext = spark

In [5]:
#import functions package
from pyspark.sql import functions as fn

In [6]:
#import models
from pyspark.ml import feature
from pyspark.ml import regression
from pyspark.ml import Pipeline, PipelineModel
import os

## Step: Import the Dataset and clean the dataframe

In [157]:
#Load user reviews data
df_all = spark.read\
    .format('com.databricks.spark.csv')\
    .option('header','true')\
    .option('charset', 'UTF-8')\
    .csv('df_all.csv')

In [158]:
df_all = df_all.na.drop(subset=['review_id','business_id','user_id','review_rating','review_rating_f','user_avg_rating','business_rating','useful','cool','funny','compliment_profile','compliment_cool','compliment_funny','compliment_plain','compliment_writer','compliment_more','compliment_photos','compliment_note','compliment_cute','compliment_list','compliment_hot'])                   
# Remove rows with NULL in column 'content
df_ratings = df_all.select('review_id','business_id','user_id','review_rating','review_rating_f','user_avg_rating',\
              'business_rating','useful','cool','funny',\
             ((fn.col('compliment_profile')+fn.col('compliment_cool')+fn.col('compliment_funny')+fn.col('compliment_plain')+\
               fn.col('compliment_writer')+fn.col('compliment_more')+fn.col('compliment_photos')+fn.col('compliment_note')+\
               fn.col('compliment_cute')+fn.col('compliment_list')+fn.col('compliment_hot'))/11).alias('compliment_average'))
#Select all the relevant columns

In [159]:
df_ratings.printSchema()
df_ratings.show(100)
#Check Dataset schema and columns after cleaning

root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- review_rating: string (nullable = true)
 |-- review_rating_f: string (nullable = true)
 |-- user_avg_rating: string (nullable = true)
 |-- business_rating: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- cool: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- compliment_average: double (nullable = true)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+
|           review_id|         business_id|             user_id|       review_rating|     review_rating_f|     user_avg_rating|  business_rating|              useful|                cool|             funny| compliment_average|
+--------------------+--------------------+--------------------+------

In [160]:
#changing the data type to double
df_ratings = df_ratings.withColumn('review_rating',fn.col('review_rating').cast('double'))
df_ratings = df_ratings.withColumn('review_rating_f',fn.col('review_rating_f').cast('double'))
df_ratings = df_ratings.withColumn('user_avg_rating',fn.col('user_avg_rating').cast('double'))
df_ratings = df_ratings.withColumn('business_rating',fn.col('business_rating').cast('double'))
df_ratings = df_ratings.withColumn('useful',fn.col('useful').cast('double'))
df_ratings = df_ratings.withColumn('cool',fn.col('cool').cast('double'))
df_ratings = df_ratings.withColumn('funny',fn.col('funny').cast('double'))


In [161]:
df_ratings = df_ratings.na.drop(subset=['review_id','business_id','user_id','review_rating','user_avg_rating','business_rating','useful','cool','funny','compliment_average']) 
df_ratings.printSchema()
#Dropping unnecessary columns from dataset

root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- review_rating: double (nullable = true)
 |-- review_rating_f: double (nullable = true)
 |-- user_avg_rating: double (nullable = true)
 |-- business_rating: double (nullable = true)
 |-- useful: double (nullable = true)
 |-- cool: double (nullable = true)
 |-- funny: double (nullable = true)
 |-- compliment_average: double (nullable = true)



In [162]:
#Changing the dependent variable name to 'label'
df_ratings = df_ratings.withColumnRenamed('review_rating_f','label')

In [163]:
df_ratings.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- review_rating: double (nullable = true)
 |-- label: double (nullable = true)
 |-- user_avg_rating: double (nullable = true)
 |-- business_rating: double (nullable = true)
 |-- useful: double (nullable = true)
 |-- cool: double (nullable = true)
 |-- funny: double (nullable = true)
 |-- compliment_average: double (nullable = true)



In [164]:
df_ratings.show(100)

+--------------------+--------------------+--------------------+-------------+-----+---------------+---------------+------+----+-----+-------------------+
|           review_id|         business_id|             user_id|review_rating|label|user_avg_rating|business_rating|useful|cool|funny| compliment_average|
+--------------------+--------------------+--------------------+-------------+-----+---------------+---------------+------+----+-----+-------------------+
|xqFpx6FkEpjow6JAh...|XqDeiaPSG0-fBbOXD...|efaUGV60LFI4v6bWP...|          5.0|  1.0|           4.08|            3.5|   1.0| 1.0|  1.0|                0.0|
|fpq7iwxvRdG9vssv0...|zC7ldegnDoXg-Wln5...|efaUGV60LFI4v6bWP...|          5.0|  1.0|           4.08|            5.0|   0.0| 0.0|  0.0|                0.0|
|jDOVgU7ICogRtpLrX...|O7RMINvCcGVNTMlD7...|efaUGV60LFI4v6bWP...|          5.0|  1.0|           4.08|            4.5|   0.0| 0.0|  0.0|                0.0|
|NEP1fCrzA7i50c875...|oYwLxROH5RihyFxrd...|hPHsKqUwO_RKJNxkB...|      

In [165]:
df_ratings.groupby('label').agg(fn.count('*')).show()
#Check the count of 0/1 in the label column

+-----+--------+
|label|count(1)|
+-----+--------+
|  0.0|  210674|
|  1.0|  444808|
+-----+--------+



## Step: Split  the dataframe and apply Random Forest Classifier Algorithms

In [166]:
training_df, validation_df, testing_df = df_ratings.randomSplit([0.6, 0.3, 0.1])
# Split the dataset into Training, Validation and Testing

In [167]:
from pyspark.ml.feature import VectorAssembler
# Import Vector Assembler

In [168]:
# build a pipeline for analysis
va = VectorAssembler().setInputCols(training_df.columns[5:11]).setOutputCol('features')

In [169]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
#Import logistic regression and random forest classifier models

In [170]:
lr = LogisticRegression(regParam=0.1)
# Create lr model for logistic regression

In [171]:
training_df.columns[5:11]
# Subsetting the training_df dataset in relevant weight columns 

['user_avg_rating',
 'business_rating',
 'useful',
 'cool',
 'funny',
 'compliment_average']

In [172]:
lr_pipeline = Pipeline(stages=[va, lr]).fit(training_df)
# Fitting the logistic regression model for training dataset 

In [173]:
rf = RandomForestClassifier()
#Creating the random forest classifier model

In [174]:
rf_pipeline = Pipeline(stages=[va, rf]).fit(training_df)
#Creating a pipeline for vector assembler and random forest and applying it on Training dataset

In [175]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Applying Binary Classification Evaluator

In [176]:
bce = BinaryClassificationEvaluator()

In [177]:
bce.evaluate(rf_pipeline.transform(validation_df))

0.9021520576134945

In [178]:
lr_model = lr_pipeline.stages[-1]

## Step: Calcuate the weights of individual features

In [179]:
import pandas as pd

In [186]:
pd.DataFrame(list(zip(df_ratings.columns[5:11], lr_model.coefficients.toArray())),
            columns = ['column', 'weight']).sort_values('weight')
#Checking the weights of individual features

Unnamed: 0,column,weight
2,useful,-0.079718
4,funny,-0.054745
5,compliment_average,4.7e-05
3,cool,0.103749
1,business_rating,0.73301
0,user_avg_rating,0.745241


In [187]:
rf_model = rf_pipeline.stages[-1]

In [188]:
pd.DataFrame(list(zip(df_ratings.columns[5:11], rf_model.featureImportances.toArray())),
            columns = ['column', 'weight']).sort_values('weight')
#Checking the weights of individual features using Random Forest Classifier

Unnamed: 0,column,weight
4,funny,0.008764
5,compliment_average,0.009043
3,cool,0.022607
2,useful,0.03087
0,user_avg_rating,0.4055
1,business_rating,0.523215


In [189]:
len(rf_model.trees)

20

In [190]:
print(rf_model.trees[0].toDebugString)

DecisionTreeClassificationModel (uid=dtc_4beee059f92a) of depth 5 with 63 nodes
  If (feature 0 <= 3.55)
   If (feature 2 <= 1.0)
    If (feature 0 <= 2.15)
     If (feature 1 <= 4.5)
      If (feature 0 <= 1.0)
       Predict: 0.0
      Else (feature 0 > 1.0)
       Predict: 0.0
     Else (feature 1 > 4.5)
      If (feature 4 <= 0.0)
       Predict: 1.0
      Else (feature 4 > 0.0)
       Predict: 0.0
    Else (feature 0 > 2.15)
     If (feature 1 <= 3.5)
      If (feature 1 <= 2.5)
       Predict: 0.0
      Else (feature 1 > 2.5)
       Predict: 0.0
     Else (feature 1 > 3.5)
      If (feature 4 <= 0.0)
       Predict: 1.0
      Else (feature 4 > 0.0)
       Predict: 1.0
   Else (feature 2 > 1.0)
    If (feature 0 <= 3.09)
     If (feature 3 <= 0.0)
      If (feature 0 <= 2.15)
       Predict: 0.0
      Else (feature 0 > 2.15)
       Predict: 0.0
     Else (feature 3 > 0.0)
      If (feature 4 <= 0.0)
       Predict: 0.0
      Else (feature 4 > 0.0)
       Predict: 0.0
    Else (fea