In [103]:
from pyspark.sql import SparkSession

In [104]:
review = 'gs://big_data_project_bucket1/yelp_dataset/yelp_academic_dataset_review.json'
business = 'gs://big_data_project_bucket1/yelp_dataset/yelp_academic_dataset_business.json'
tip = 'gs://big_data_project_bucket1/yelp_dataset/yelp_academic_dataset_tip.json'
user = 'gs://big_data_project_bucket1/yelp_dataset/yelp_academic_dataset_user.json'
checkin = 'gs://big_data_project_bucket1/yelp_dataset/yelp_academic_dataset_checkin.json'

In [105]:
spark = SparkSession.builder.getOrCreate()

In [106]:
dfr = spark.read.json(review)

                                                                                

In [107]:
dfr.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|buF9druCkbuXLX526...|   1|2014-10-11 03:34:02|    1|lWC-xP3rd6obsecCY...|  4.0|Apparently Prides...|     3|ak0TdVmGKo4pwqdJS...|
|RA4V8pr014UyUbDvI...|   0|2015-07-03 20:38:25|    0|8bFej1QE5LXp4O05q...|  4.0|This store is pre...|     1|YoVfDbnISlW0f7abN...|
|_sS2LBIGNT5NQb6PD...|   0|2013-05-28 20:38:06|    0|NDhkzczKjLshODbqD...|  5.0|I called WVM on t...|     0|eC5evKn1TWDyHCyQA...|
|0AzLzHfOJgL7ROwhd...|   1|2010-01-08 02:29:15|    1|T5fAqjjFooT4V0OeZ...|  2.0|I've stayed at ma...|     1|SFQ1jcnGguO0LYWnb...|
|8zehGz9jnxPqXtOc7...|   0|2011-07-28 18:05:01|    0|sjm_uUcQVxab_EeLC...|  4.0|The food i

In [108]:
dfb = spark.read.json(business)

                                                                                

In [109]:
import pyspark.sql.functions as F

In [110]:
df1 = dfr.alias("a")
df2 = dfb.alias("b")

#df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)


df_join = df1.join(df2, F.col("a.business_id")==F.col("b.business_id")).select("a.business_id", 
                                                                           "a.review_id", 
                                                                           "a.stars", 
                                                                           "a.text", 
                                                                           "b.categories")

In [111]:
df_res = df_join.where(df_join.categories.contains("Restaurants"))
df_res.count()

                                                                                

5574795

In [112]:
from pyspark.sql.functions import when

In [113]:
import pyspark.sql.functions as F

In [114]:
dfclas = df_res.withColumn('stars_rev_class', F.when(dfr.stars>=4,1.0).otherwise(0.0))

In [115]:
dfclas.show(5)

+--------------------+--------------------+-----+--------------------+--------------------+---------------+
|         business_id|           review_id|stars|                text|          categories|stars_rev_class|
+--------------------+--------------------+-----+--------------------+--------------------+---------------+
|buF9druCkbuXLX526...|lWC-xP3rd6obsecCY...|  4.0|Apparently Prides...|Restaurants, Wine...|            1.0|
|0AzLzHfOJgL7ROwhd...|T5fAqjjFooT4V0OeZ...|  2.0|I've stayed at ma...|Hotels, Hotels & ...|            0.0|
|8zehGz9jnxPqXtOc7...|sjm_uUcQVxab_EeLC...|  4.0|The food is alway...| Restaurants, French|            1.0|
|xGXzsc-hzam-VArK6...|J4a2TuhDasjn2k3wW...|  1.0|This place used t...|Restaurants, Bars...|            0.0|
|EXOsmAB1s71WePlQk...|28gGfkLs3igtjVy61...|  2.0|The setting is pe...|         Restaurants|            0.0|
+--------------------+--------------------+-----+--------------------+--------------------+---------------+
only showing top 5 rows



In [116]:
dfclass_trial = dfclas.limit(200000)

In [117]:
dfclass_trial.toPandas().to_csv('trial_df_200k.csv', index=False)

                                                                                

In [118]:
%%time
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

# Spark NLP requires the input dataframe or column to be converted to document. 
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")
# Split sentence to tokens(array)
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
# clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")
# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
# stem the words to bring them to the root form.
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")
# Finisher is the most important annotator. Spark NLP adds its own structure when we convert each row in the dataframe to document. Finisher helps us to bring back the expected structure viz. array of tokens.
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)
# We build a ml pipeline so that each phase can be executed in sequence. This pipeline can also be used to test the model. 
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher])
# train the pipeline
nlp_model = nlp_pipeline.fit(dfclass_trial)
# apply the pipeline to transform dataframe.
processed_df  = nlp_model.transform(dfclass_trial)

tokens_df = processed_df.select("text", "stars", "stars_rev_class", "tokens", "business_id", "review_id")
tokens_df.show(1)

[Stage 1819:>                                                       (0 + 1) / 1]

+--------------------+-----+---------------+--------------------+--------------------+--------------------+
|                text|stars|stars_rev_class|              tokens|         business_id|           review_id|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+
|Apparently Prides...|  4.0|            1.0|[appar, pride, os...|buF9druCkbuXLX526...|lWC-xP3rd6obsecCY...|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+
only showing top 1 row

CPU times: user 104 ms, sys: 45.4 ms, total: 149 ms
Wall time: 33.7 s


                                                                                

In [119]:
train, test = tokens_df.randomSplit(weights=[0.8,0.2], seed=200)

In [120]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="token_vector", minDF=3.0)
# train the model
cv_model = cv.fit(train)
# transform the data. Output column name will be features.
train_vec = cv_model.transform(train)

                                                                                

In [121]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes 
from pyspark.ml.classification import DecisionTreeClassifier

In [122]:



# Train a RandomForest model.
#rf = RandomForestClassifier(labelCol="stars_rev_class", featuresCol="token_vector", numTrees=10)
lr = LogisticRegression(maxIter=100, 
                        labelCol="stars_rev_class", featuresCol="token_vector")


# Train model.  This also runs the indexers.
#model = rf.fit(train_vec)

lr_model = lr.fit(train_vec)

# # Make predictions.

# test_vec = cv_model.transform(test)
# predictions = model.transform(test_vec)



In [None]:

nb = NaiveBayes(modelType="multinomial", 
                        labelCol="stars_rev_class", featuresCol="token_vector")

nb_model = lr.fit(train_vec)



In [None]:

dtc = DecisionTreeClassifier(labelCol="stars_rev_class", featuresCol="token_vector")
dtc_model = dtc.fit(train_vec)

                                                                                

In [None]:

rf = RandomForestClassifier(labelCol="stars_rev_class", featuresCol="token_vector", numTrees=10)
rf_model = rf.fit(train_vec)

22/03/08 20:30:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1027.0 KiB
22/03/08 20:31:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1075.8 KiB

In [None]:

from pyspark.sql.types import IntegerType, DoubleType
from pyspark.mllib.evaluation import MulticlassMetrics

models = {"lr": lr_model, "nb": nb_model, "dtc": dtc_model, "rf": rf_model}

pred_list = []
pred_df_list = []
test_vec = cv_model.transform(test)

for m in models:
    
    label = m
    print(label)
    predictions = models[m].transform(test_vec)
    print(predictions.columns)
    predictions = predictions.withColumn("prediction", predictions["prediction"].cast(DoubleType()))
    predictions = predictions.withColumn("stars_rev_class", predictions["stars_rev_class"].cast(DoubleType()))
    results = predictions.select(['prediction', 'stars_rev_class'])
    predictionAndLabels=results.rdd
    metrics = MulticlassMetrics(predictionAndLabels)
    cm = metrics.confusionMatrix().toArray()
    print(cm)
    accuracy = (cm[0][0]+cm[1][1])/cm.sum()
    precision = (cm[0][0])/(cm[0][0]+cm[1][0])
    recall = (cm[0][0])/(cm[0][0]+cm[0][1])

    f1_score = 2 * (precision * recall) / (precision + recall)

    print(precision, recall, f1_score, accuracy)
    pred_list.append({"label": label, "precision": precision, "recall": recall,
                      "f1_score": f1_score, "accuracy": accuracy})
    pred_df_list.append(predictions)
    
    
    
    

lr
['text', 'stars', 'stars_rev_class', 'tokens', 'business_id', 'review_id', 'token_vector', 'rawPrediction', 'probability', 'prediction']


                                                                                

[[10292.  3151.]
 [ 2708. 23933.]]
0.7916923076923077 0.7656029160157702 0.7784290738569754 0.8538319528989123
nb
['text', 'stars', 'stars_rev_class', 'tokens', 'business_id', 'review_id', 'token_vector', 'rawPrediction', 'probability', 'prediction']


                                                                                

[[ 9282.  4083.]
 [ 7366. 19353.]]
0.5575444497837578 0.6945005611672278 0.618531969479892 0.7143748128929248
rf
['text', 'stars', 'stars_rev_class', 'tokens', 'business_id', 'review_id', 'token_vector', 'rawPrediction', 'probability', 'prediction']


[Stage 2360:>                                                       (0 + 1) / 1]

[[  590. 12991.]
 [   27. 26476.]]
0.9562398703403565 0.043443045431117 0.08311029722496127 0.6752320127731763


                                                                                

In [None]:
import json

#pred_tuple_list

#pred_tuple_list.append((label, precision, recall, f1_score, accuracy, predictions))

#metrics_vals = {'precision': precision, 'recall': recall, 'f1_score': f1_score, 
#                'accuracy': accuracy}


    
with open('metrics_vals_0308_200k.json', 'w') as file:
     file.write(json.dumps(pred_list, indent=4)) 


In [None]:
#predictions.toPandas().to_csv('predictions200k.csv')

In [132]:
pred_list

[{'label': 'lr',
  'precision': 0.7916923076923077,
  'recall': 0.7656029160157702,
  'f1_score': 0.7784290738569754,
  'accuracy': 0.8538319528989123},
 {'label': 'nb',
  'precision': 0.7749785508150691,
  'recall': 0.7434343434343434,
  'f1_score': 0.758878790193233,
  'accuracy': 0.8424807903402854},
 {'label': 'dtc',
  'precision': 0.5575444497837578,
  'recall': 0.6945005611672278,
  'f1_score': 0.618531969479892,
  'accuracy': 0.7143748128929248},
 {'label': 'rf',
  'precision': 0.9562398703403565,
  'recall': 0.043443045431117,
  'f1_score': 0.08311029722496127,
  'accuracy': 0.6752320127731763}]

In [130]:
pred_df_list

[DataFrame[text: string, stars: double, stars_rev_class: double, tokens: array<string>, business_id: string, review_id: string, token_vector: vector, rawPrediction: vector, probability: vector, prediction: double],
 DataFrame[text: string, stars: double, stars_rev_class: double, tokens: array<string>, business_id: string, review_id: string, token_vector: vector, rawPrediction: vector, probability: vector, prediction: double],
 DataFrame[text: string, stars: double, stars_rev_class: double, tokens: array<string>, business_id: string, review_id: string, token_vector: vector, rawPrediction: vector, probability: vector, prediction: double],
 DataFrame[text: string, stars: double, stars_rev_class: double, tokens: array<string>, business_id: string, review_id: string, token_vector: vector, rawPrediction: vector, probability: vector, prediction: double]]

In [133]:
train_vec.show(3)

[Stage 2365:>                                                       (0 + 1) / 1]

+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+
|                text|stars|stars_rev_class|              tokens|         business_id|           review_id|        token_vector|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+
|! Best Korean foo...|  5.0|            1.0|[best, korean, fo...|XgRQCwVbRCxYV76x_...|oKXs7Uo3T_0AwHAnH...|(27547,[0,5,11,13...|
|! Love this place...|  5.0|            1.0|[love, place, mar...|a0UVUJGOcZOxYsl-n...|zzkEpRUBxXWeefIoP...|(27547,[2,4,7,11,...|
|!! So we've tried...|  5.0|            1.0|[weve, tri, verac...|yHjaVbz6wjL1ahml1...|PDAVI86WbvxVXyZHP...|(27547,[0,1,4,9,1...|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



                                                                                

In [135]:
train_vec.count()

                                                                                

159916

In [None]:
train_vec.toPandas().to_csv('train_vec200k.csv')

                                                                                

In [140]:
len(cv_model.vocabulary)

27547

In [144]:
list(lr_model.coefficients)

[-0.11119117840590159,
 0.18387574304993218,
 -0.0007085294354838757,
 -0.2122295465461596,
 0.930861288549735,
 0.006843297592824651,
 0.05111849603426329,
 -0.06307506934865298,
 -0.11256552351343045,
 0.051361106241721924,
 -0.05895297964413289,
 0.1231372516337148,
 0.1693851708236671,
 -0.01716914451483657,
 0.9860318463270628,
 0.3248856245420474,
 0.015103745297022705,
 -0.04310685351958978,
 0.04795500169161896,
 0.07222168327883531,
 0.04160870322361366,
 0.3516418800923916,
 0.09991702109940072,
 1.6680114210540324,
 -0.07349695274197462,
 -0.011946916073025622,
 -0.14301064819099396,
 0.39170661143543456,
 1.0566138346869167,
 -0.12544307014726314,
 0.2272145352293205,
 0.10242086411170997,
 0.05891270323509785,
 0.07691039432866068,
 0.04635269587890129,
 -0.13876233966965856,
 -0.054426843248787195,
 -0.16753008533188618,
 -0.5102322618233258,
 -0.10617532253327812,
 0.20218399936935597,
 -0.26090431076677795,
 0.7713127198395333,
 0.4381181431866891,
 -0.09605047625271528

In [178]:
list_coefficients = list(lr_model.coefficients)
#list_coefficients = [abs(val) for val in list_coefficients]

In [179]:
#zip(*heap.nlargest(2, enumerate(a), key=operator.itemgetter(1)))[0]

In [None]:
highest_coefficients = sorted(range(len(list_coefficients)), key=lambda i: list_coefficients[i])[-20:]

In [None]:
lowest_coefficients = sorted(range(len(list_coefficients)), key=lambda i: list_coefficients[i])[20:]

In [None]:
for h in highest_coefficients:
    print(cv_model.vocabulary[h])

+year
cantmiss
cor
sixpack
groggi
arw
wideei
homespun
housesmok
scovil
hyperact
acceler
mono
rougher
olden
culantro
cong
twostori
kampachi
autopilot


In [None]:
for l in lowest_coefficients:
    print(cv_model.vocabulary[l])

audaci
sinclair
hovel
ogunquit
polaroid
antidot
ironwork
soliloqui
caf
studious
cacophoni
betterth
deform
againbecaus
badmouth
crisco
bedsid
ocasion
rightsiz
gallega
mem
oban
fredericksburg
somet
webbervil
brocollini
insubstanti
restauranta
tra
fishit
fliet
curacao
fishiest
twoitem
laboratori
gratine
muchlaud
licha
descriptor
coagul
squiggl
midjuli
postscript
deport
althought
rando
tightwad
leer
dup
desicc
ineptitud
placesi
lifter
powderhous
mayuri
unaccustom
backfir
liquer
vesuviu
identif
lunchabl
oneyear
pornographi
decentbut
putrid
swimsuit
tangl
amateurish
guayan
edi
nonattent
insati
extran
monik
libanai
ramshackl
cadr
unfathom
creolecajun
littletono
coffeedessert
until
highenergi
experiencei
chitown
blandest
illog
perchanc
punt
mul
selfprofess
fez
butttt
dumont
therein
avant
yeep
capsaicin
satchel
están
knowth
aspara
expressli
kneel
barheight
more
hepat
citywid
loudspeak
nonrush
lenoir
contentedli
jingl
soif
unglov
unforun
slackjaw
readytoeat
inact
gothic
apprentic
badit
pne
teai


In [None]:
for l in lowest_coefficients:
    print(cv_model.vocabulary[l])

audaci
sinclair
hovel
ogunquit
polaroid
antidot
ironwork
soliloqui
caf
studious
cacophoni
betterth
deform
againbecaus
badmouth
crisco
bedsid
ocasion
rightsiz
gallega
mem
oban
fredericksburg
somet
webbervil
brocollini
insubstanti
restauranta
tra
fishit
fliet
curacao
fishiest
twoitem
laboratori
gratine
muchlaud
licha
descriptor
coagul
squiggl
midjuli
postscript
deport
althought
rando
tightwad
leer
dup
desicc
ineptitud
placesi
lifter
powderhous
mayuri
unaccustom
backfir
liquer
vesuviu
identif
lunchabl
oneyear
pornographi
decentbut
putrid
swimsuit
tangl
amateurish
guayan
edi
nonattent
insati
extran
monik
libanai
ramshackl
cadr
unfathom
creolecajun
littletono
coffeedessert
until
highenergi
experiencei
chitown
blandest
illog
perchanc
punt
mul
selfprofess
fez
butttt
dumont
therein
avant
yeep
capsaicin
satchel
están
knowth
aspara
expressli
kneel
barheight
more
hepat
citywid
loudspeak
nonrush
lenoir
contentedli
jingl
soif
unglov
unforun
slackjaw
readytoeat
inact
gothic
apprentic
badit
pne
teai


In [None]:
1+1

2

In [193]:
2+2

4