# Yelp Data Sentiment Analysis 

#### Perform Sentiment Analysis using Logistic Regression

In [95]:
import os.path
from pathlib import Path
from pyspark.sql import SparkSession
from boto3.session import Session
import seaborn as sns
import pandas
import matplotlib.pyplot as plt
from pyspark.sql.functions import when
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

%matplotlib inline

### Set up Amazon S3 Credentials (Note:- Our access and secret keys are removed for security purposes)

In [2]:
ACCESS_KEY = 'MYACCESSKEY'
SECRET_KEY = 'MYSECRETKEY'
BUCKET_NAME = 'yelpbigdata'
PREFIX = 'root'
MAX_FILES_READ = 4

In [3]:
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

In [4]:
s3 = session.resource('s3')

In [5]:
my_bucket = s3.Bucket(BUCKET_NAME)
print(my_bucket)

s3.Bucket(name='yelpbigdata')


In [6]:
spark = SparkSession.builder.appName('BDF').getOrCreate()

In [8]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-eu-west-1.amazonaws.com")

### Download the Data from the S3 Bucket

In [9]:
my_bucket.download_file('yelp_business.csv',"business.csv")
my_bucket.download_file('yelp_review.csv',"review.csv")
my_bucket.download_file('yelp_tip.csv',"tip.csv")
my_bucket.download_file('yelp_user.csv',"user.csv")

#### Review File cleaning

In [66]:
df_review = spark.read.format("com.databricks.spark.csv").option("wholeFile", "true").option("multiline","true").option("header", "true").option("inferSchema", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").option("charset", "ISO-8859-1").option("quote", "\"").option("escape", "\"").load("/content/review.csv")

In [67]:
# Delete the quotation marks, commas, \n from text column
from pyspark.sql.functions import *
df_review.select('text').show()
df_review = df_review.withColumn('text', regexp_replace('text', ',', ''))
df_review = df_review.withColumn('text', regexp_replace('text', '""', ''))
df_review = df_review.withColumn('text', regexp_replace('text', '\r\n', ''))
df_review = df_review.withColumn('text', regexp_replace('text', '\n', ''))

+--------------------+
|                text|
+--------------------+
|As someone who ha...|
|I am actually hor...|
|I love Deagan's. ...|
|Dismal, lukewarm,...|
|Oh happy day, fin...|
|This is definitel...|
|Really good place...|
|Awesome office an...|
|Most delicious au...|
|I have been here ...|
|Maria is VERY goo...|
|ORDER In (Deliver...|
|We purchased new ...|
|Everything that m...|
|Called for a 5:15...|
|If I could give l...|
|10pm on a super b...|
|A close friend wa...|
|Tried to have my ...|
|My husband and I ...|
+--------------------+
only showing top 20 rows



#### Review-Table

In [68]:
df_review=df_review.drop('_c0')
df_review.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- date: timestamp (nullable = true)



#### Data Consistency Check

In [69]:
df_review.select('date').show()

+-------------------+
|               date|
+-------------------+
|2015-04-15 05:21:16|
|2013-12-07 03:16:52|
|2015-12-05 03:18:11|
|2011-05-27 05:30:52|
|2017-01-14 21:56:57|
|2013-05-07 07:25:25|
|2015-11-05 23:11:05|
|2017-07-18 18:31:54|
|2015-02-16 06:48:47|
|2009-10-13 04:16:41|
|2013-12-28 21:02:55|
|2015-10-17 01:38:13|
|2015-07-03 21:48:51|
|2016-06-11 22:00:11|
|2015-05-26 10:36:47|
|2017-08-07 21:36:36|
|2015-02-02 06:28:00|
|2018-02-01 19:15:00|
|2017-06-28 00:39:18|
|2018-03-04 01:03:53|
+-------------------+
only showing top 20 rows



#### Drop unnecessary columns

In [70]:
df_review = df_review.drop('review_id','text','date')

#### Business-Table

In [71]:
df_business = spark.read.format("com.databricks.spark.csv").option("wholeFile", "true").option("multiline","true").option("header", "true").option("inferSchema", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").option("charset", "ISO-8859-1").option("quote", "\"").option("escape", "\"").load("/content/business.csv")
df_business = df_business.withColumnRenamed('stars', 'business_stars')

#### As there are many columns select the ones we need instead of dropping

In [73]:
df_business = df_business.select('business_id','state','city','latitude','longitude',"business_stars","review_count")

In [116]:
df_business.show()

+--------------------+-----+---------------+--------+----------+--------------+------------+
|         business_id|state|           city|latitude| longitude|business_stars|review_count|
+--------------------+-----+---------------+--------+----------+--------------+------------+
|f9NumwFMBDn751xgF...|   NC|      Cornelius|35.46272| -80.85261|           3.5|        36.0|
|Yzvjg0SayhoZgCljU...|   AZ|     Scottsdale| 33.5694|-111.89026|           5.0|         4.0|
|XNoUzKckATkOD1hP6...|   QC|       Montreal|45.47998| -73.58007|           5.0|         5.0|
|6OAZjbxqM5ol29BuH...|   NV|North Las Vegas|36.21973|-115.12773|           2.5|         3.0|
|51M2Kk903DFYI6gnB...|   AZ|           Mesa|33.42807|-111.72665|           4.5|        26.0|
|cKyLV5oWZJ2NudWgq...|   AZ|        Gilbert| 33.3504|-111.82714|           4.5|        38.0|
|oiAlXZPIFm2nBCt0D...|   NV|      Las Vegas|36.06398|-115.24146|           3.5|        81.0|
|ScYkbYNkDgCneBrD9...|   AZ|           Mesa|33.39388|-111.68223|      

#### Tip-Table

In [75]:
df_tip = spark.read.format("com.databricks.spark.csv").option("wholeFile", "true").option("multiline","true").option("header", "true").option("inferSchema", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").option("charset", "ISO-8859-1").option("quote", "\"").option("escape", "\"").load("/content/tip.csv")

#### Only take necessary columns

In [76]:
df_tip = df_tip.select('user_id','business_id','compliment_count')

#### User Data

In [77]:
df_user = spark.read.format("com.databricks.spark.csv").option("wholeFile", "true").option("multiline","true").option("header", "true").option("inferSchema", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").option("charset", "ISO-8859-1").option("quote", "\"").option("escape", "\"").load("/content/user.csv")

#### Again preprocess and take the columns we need

In [78]:
df_user = df_user.select('user_id','average_stars')

In [117]:
df_user.show()

+--------------------+-------------+
|             user_id|average_stars|
+--------------------+-------------+
|ntlvfPzc8eglqvk92...|         3.57|
|FOBRPlBHa3WPHFB5q...|         3.84|
|zZUnPeh2hEp0WydbA...|         3.44|
|QaELAmRcDc5TfJEyl...|         3.08|
|xvu8G900tezTzbbfq...|         4.37|
|z5_82komKV3mI4ASG...|         2.88|
|ttumcu6hWshk_EJVW...|          4.0|
|f4_MRNHvN-yRn7EA8...|         3.63|
|UYACF30806j2mfbB5...|         3.75|
|QG13XBbgHWydzThRB...|          4.1|
|f6YuZP6iennHFVlnF...|          3.8|
|I_6wY8_RsewziNnKh...|         3.63|
|q-v8elVPvKz0KvK69...|         3.37|
|HwPGLzF_uXB3MF8bc...|          4.5|
|y4UuVowA9i3zj2hHy...|         4.17|
|1WBxJ2r3A2QYfRSEz...|         3.82|
|-TT5e-YQU9xLb1JAG...|         3.91|
|6bbHSJ0PrgSxh7e5n...|         2.21|
|4VmuXuSRhv5UxYUy3...|         3.88|
|pVU2DdtBFppBAX5G5...|         3.79|
+--------------------+-------------+
only showing top 20 rows



### Join the tables

In [81]:
temp_table_1 = df_review.join(df_business, on='business_id', how='left_outer')
temp_table_2 = temp_table_1.join(df_user, on='user_id', how='left_outer')
fin_table = temp_table_2.join(df_tip, on=['business_id', 'user_id',], how='left_outer')

In [83]:
fin_table.count()
fin_table.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- business_stars: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_count: integer (nullable = true)



### Perform Logistic Regression

#### Do the same basic steps as in svm

In [85]:
fin_table = fin_table.filter(fin_table.stars.isNotNull())

#### Now convert the reviews having star rating > 3 to 1 i.e positive and 0 otherwise i.e negative

In [87]:
fin_df = fin_table.withColumn('stars',when(fin_table.stars <= 3, '0').otherwise('1'))

In [88]:
fin_df.groupby(fin_df.stars).count().show()

+-----+------+
|stars| count|
+-----+------+
|    0|343758|
|    1|674889|
+-----+------+



#### As we can see the number of reviews with 1 label are almost twice as compared to 0 i.e (1,2,3) combined

In [91]:
l_indexer = StringIndexer(inputCol="stars", outputCol="label",handleInvalid='keep').fit(fin_df)
l_indexer.labels

['1', '0']

In [92]:
vect_AssF = VectorAssembler(inputCols=["si_state", "si_city", "si_compliment_count", "useful", "funny", "cool", "latitude", "longitude", "business_stars", "review_count", "average_stars"], outputCol="features", handleInvalid='keep')

In [93]:
lab_indexer = StringIndexer(inputCol="stars", outputCol="label",handleInvalid='keep')
ct_indexer = StringIndexer(inputCol="city", outputCol="si_city",handleInvalid='keep')
st_indexer = StringIndexer(inputCol="state", outputCol="si_state",handleInvalid='keep')
cct_indexer = StringIndexer(inputCol="compliment_count", outputCol="si_compliment_count",handleInvalid='keep')

### LR Model creation

In [103]:
log_R = LogisticRegression(labelCol="label", featuresCol="features")

In [104]:
og_label = IndexToString(inputCol="prediction", outputCol="predicted_Label", labels=lab_indexer.fit(fin_df).labels)

#### Generate Feature_Vector

In [105]:
log_RPipe = Pipeline(stages=[lab_indexer, st_indexer, ct_indexer, cct_indexer, vect_AssF, log_R, og_label])

#### Prepare data for training split it 80/20 Train test

In [106]:
data_split = fin_df.randomSplit([0.8, 0.2])
test_df = data_split[1]
train_df = data_split[0]

#### LR Model training and testing

In [107]:
log_RModel = log_RPipe.fit(train_df)

In [108]:
output_pred = log_RModel.transform(test_df)

In [111]:
output_pred.show()

+--------------------+--------------------+-----+------+-----+----+-----+----------+--------+----------+--------------+------------+-------------+----------------+-----+--------+-------+-------------------+--------------------+--------------------+--------------------+----------+---------------+
|         business_id|             user_id|stars|useful|funny|cool|state|      city|latitude| longitude|business_stars|review_count|average_stars|compliment_count|label|si_state|si_city|si_compliment_count|            features|       rawPrediction|         probability|prediction|predicted_Label|
+--------------------+--------------------+-----+------+-----+----+-----+----------+--------+----------+--------------+------------+-------------+----------------+-----+--------+-------+-------------------+--------------------+--------------------+--------------------+----------+---------------+
|-2ToCaDFpTNmmg3QF...|jDUEWiD94qQxXIUcL...|    0|     6|    3|   0|   NV| Las Vegas|36.15466|-115.15922|     

### Check the Accuracy

In [112]:
bin_CE = BinaryClassificationEvaluator()
auc_acc = bin_CE.evaluate(output_pred)

print("Accuracy = %g" % auc_acc)

Accuracy = 0.82632
