In [None]:
from pyspark import SparkContext
sc=SparkContext()

In [None]:
from pyspark import SQLContext
sqlContext=SQLContext(sc)

In [None]:
from pyspark.mllib.linalg import Vectors
import json
from pyspark.sql.functions import *


Load the data

In [None]:
mydir = ('file:/home/ubuntu/review.json')
df1 = sqlContext.read.json(mydir)
mydir2 = ('file:/home/ubuntu/business.json')
df2 = sqlContext.read.json(mydir2)
df = df1.join(df2,(df1.business_id==df2.business_id)).drop(df2.business_id).drop(df1.stars)

Filter for chinese cuisine

In [None]:
asian=df.where(array_contains(df.categories,"Chinese"))
asian.registerTempTable("asian")
asian_sample = sqlContext.sql("select business_id,text,stars from asian")
asian.registerTempTable("asian_sample")
asian_text=asian_sample.select('business_id','text')

Filter the reviews for Service related aspects

In [None]:
words = [x.lower() for x in ['servic','time','price','lunch','friendli']]

from pyspark.sql.types import *

def intersect(row):
    # convert each word in lowercase
    row = [x.lower() for x in row.split()]
    return True if set(row).intersection(set(words)) else False

Filtering sentences based on aspects and aggregate the sentences for each restaurant

In [None]:
import nltk.data
sent_detector = nltk.data.load('tokenizers/punkt/english.pickle')
summarised=asian_text.rdd.map(lambda (business,text):(business,sent_detector.tokenize(text.strip())))\
.map(lambda (business,text):(business," ".join(filter(lambda sent: intersect(sent),text))))\
.reduceByKey(lambda x,y:x+y).toDF()\
.withColumnRenamed('_1','BusinessId').withColumnRenamed('_2','Review')

In [None]:
from textblob import TextBlob, Word, Blobber
from textblob.classifiers import NaiveBayesClassifier
from textblob.taggers import NLTKTagger
asian_sample_rdd=summarised.rdd
asian_sample_polarity = asian_sample_rdd.map(lambda x:(x[0],TextBlob(x[1]).sentiment.polarity))
asian_pol1=asian_sample_polarity.toDF().withColumnRenamed('_1','business_id').withColumnRenamed('_2','pol1')

Filter the reviews for Food related aspects

In [None]:
words = [x.lower() for x in ['noodl','dish','chicken','fri','rice','soup','sauc','beef','pork','tast']]
from pyspark.sql.types import *
def intersect(row):
    # convert each word in lowercase
    row = [x.lower() for x in row.split()]
    return True if set(row).intersection(set(words)) else False

Filtering sentences based on aspects and aggregate the sentences for each restaurant

In [None]:
import nltk.data
sent_detector = nltk.data.load('tokenizers/punkt/english.pickle')
summarised=asian_text.rdd.map(lambda (business,text):(business,sent_detector.tokenize(text.strip())))\
.map(lambda (business,text):(business," ".join(filter(lambda sent: intersect(sent),text))))\
.reduceByKey(lambda x,y:x+y).toDF()\
.withColumnRenamed('_1','BusinessId').withColumnRenamed('_2','Review')

Sentiment analysis using TextBlob

In [None]:
asian_sample_rdd=summarised.rdd
asian_sample_polarity = asian_sample_rdd.map(lambda x:(x[0],TextBlob(x[1]).sentiment.polarity))
asian_pol2=asian_sample_polarity.toDF().withColumnRenamed('_1','business_id').withColumnRenamed('_2','pol2')

Merge the topics for each restaurant

In [None]:
asian_rating = sqlContext.sql("select distinct business_id,stars from asian_sample")
temp = asian_rating.join(asian_pol1,(asian_rating.business_id==asian_pol1.business_id),'left').drop(asian_rating.business_id)
asian_final=temp.join(asian_pol2,(temp.business_id==asian_pol2.business_id),'outer').drop(asian_pol2.business_id)
asian_final=asian_final.withColumn('pol1',when(isnull(asian_final.pol1), 0).otherwise(asian_final.pol1))
asian_final=asian_final.withColumn('pol2',when(isnull(asian_final.pol2), 0).otherwise(asian_final.pol2))asian_final=asian_final.filter(asian_final.business_id.isNotNull())
asian_final.registerTempTable("asian_final1")
asian_final1=sqlContext.sql("select * from asian_final1 where not pol1 ==0 and not pol2==0")

Linear Regression Model

In [None]:
from pyspark.ml.regression import LinearRegression
training = asian_final1
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)
numericCols = ['pol1','pol2']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
lr = LinearRegression(maxIter=10, featuresCol="features",labelCol="stars")
training2=assembler.transform(training)
# Fit the model
lrModel = lr.fit(training2)

Beta co-efficients

In [None]:
lrModel.coefficients

R-squared

In [None]:
lrModel.summary.r2

p-values

In [None]:
lrModel.summary.pValues