### This script is written to test our Naive Bayes, Random Forest Classifier and Linear SVC model to easier testing

#### We have used non-San Francisco reviews are training and testing dataset and San Francisco Hotel reviews as prediction dataset

#### These working model were then adapted into machine_learning.py script

In [49]:
import pandas as pd
import re
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.functions import col, when, coalesce
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, IndexToString
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

### Use Spark to read in training and prediction json files

In [60]:
# set up Spark stuff
spark = SparkSession.builder \
    .appName("Machine Learning Models") \
    .master("local[*]") \
    .config('spark.driver.memory', '8g') \
    .config('spark.executor.memory', '8g') \
    .config('spark.network.timeout', '600000') \
    .config('spark.sql.broadcastTimeout', '600000') \
    .getOrCreate()

In [61]:
# Schema for review_comments.json
review_comments_schema = types.StructType([
    types.StructField('comment_id', types.StringType()),
    types.StructField('review', types.StringType()),
    types.StructField('review_text', types.StringType()),
])

In [62]:
training_spark_df = spark.read.load("./../filtered_data/training_data.json", format="json", schema = review_comments_schema).cache()
prediction_spark_df = spark.read.load("./../filtered_data/validation_data.json", format="json", schema = review_comments_schema).cache()

In [63]:
print("Dimensions of training_spark_df after reading : ")
print((training_spark_df.count(), len(training_spark_df.columns)))

print("Dimensions of prediction_spark_df after reading : ")
print((prediction_spark_df.count(), len(prediction_spark_df.columns)))

Dimensions of training_spark_df after reading : 


                                                                                

(131760, 3)
Dimensions of prediction_spark_df after reading : 
(15274, 3)


In [None]:
training_spark_df.show()

In [None]:
prediction_spark_df.show()

In [66]:
# remove null from all the columns before running the model
training_spark_df = training_spark_df.na.drop()

prediction_spark_df = prediction_spark_df.na.drop()

In [67]:
print("Dimensions of training_spark_df after removing null values : ")
print((training_spark_df.count(), len(training_spark_df.columns)))

print("Dimensions of prediction_spark_df after removing null values  : ")
print((prediction_spark_df.count(), len(prediction_spark_df.columns)))

Dimensions of training_spark_df after removing null values : 
(131673, 3)
Dimensions of prediction_spark_df after removing null values  : 
(15261, 3)


In [68]:
# balancing
negative_training_reviews = training_spark_df.filter(training_spark_df.review == "negative")
positive_training_reviews = training_spark_df.filter(training_spark_df.review == "positive")

negative_training_reviews_counts = negative_training_reviews.count()
original_training_data_count = training_spark_df.count()
sampled_positive_training_reviews = positive_training_reviews.sample((negative_training_reviews_counts/original_training_data_count) + 0.1)

balanced_training_data_spark_df = sampled_positive_training_reviews.union(negative_training_reviews)

In [69]:
print("Dimensions of balanced_training_data_spark_df: ")
print((balanced_training_data_spark_df.count(), len(balanced_training_data_spark_df.columns)))

print("Dimensions of prediction_spark_df: ")
print((prediction_spark_df.count(), len(prediction_spark_df.columns)))

# Notice dimensions of training data is reduced

Dimensions of balanced_training_data_spark_df: 
(45691, 3)
Dimensions of prediction_spark_df: 
(15261, 3)


In [70]:
train_split, test_split = balanced_training_data_spark_df.randomSplit(weights = [0.80, 0.20], seed = 13)

### Train Multinomial Naive Bayes model

Taken help to write NLP pipeline from this article https://medium.datadriveninvestor.com/nlp-with-pyspark-9e5f1fca7adf

In [71]:
# feature engineering on review_text for NLP
tokenizer = Tokenizer(inputCol = "review_text", 
                      outputCol = "words")

stopWordsRemover = StopWordsRemover(inputCol = "words", 
                                    outputCol = "words_without_stopwords")

vectorizer = CountVectorizer(inputCol = "words_without_stopwords", 
                                  outputCol = "features")

# We have strings(poitive or negative) in reviews which are our expected converting to index
labelEncoder = StringIndexer(inputCol = 'review', 
                             outputCol = 'reviewIndexed').fit(train_split)

# Use NaiveBayes multinomial model on features and reviewIndexed 
naive_bayes = NaiveBayes(modelType = "multinomial", 
                         featuresCol = 'features', 
                         labelCol = 'reviewIndexed')

# Convert prediction which are indexed format to labels
labelConverter = IndexToString(inputCol = "prediction", 
                               outputCol = "predicted_review_label", 
                               labels = labelEncoder.labels)

# make pipeline
pipeline = Pipeline(
    stages = [
        tokenizer,
        stopWordsRemover, 
        vectorizer,
        labelEncoder, 
        naive_bayes, 
        labelConverter
    ])

# fit training data which is non-ny hotel reviews
naive_bayes_model = pipeline.fit(train_split)

# make predictions on validating data which is ny-hotel reviews
nb_predictions = naive_bayes_model.transform(prediction_spark_df)

23/08/02 22:39:56 WARN DAGScheduler: Broadcasting large task binary with size 1234.2 KiB
23/08/02 22:39:58 WARN DAGScheduler: Broadcasting large task binary with size 1217.2 KiB
                                                                                

In [72]:
# score on validations data set / predictions
evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes on predictions data is = %g"% (nb_accuracy))

23/08/02 22:39:58 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Accuracy of NaiveBayes on predictions data is = 0.931001


                                                                                

In [73]:
nd_metric = MulticlassMetrics(nb_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", nd_metric.accuracy)
print("Precision : ", nd_metric.precision(0.0))
print("f1Score : ", nd_metric.fMeasure(0.0))
print("recall : ", nd_metric.recall(0.0))
print("false poitive rate : ", nd_metric.falsePositiveRate(0.0))
print("true poitive rate : ", nd_metric.truePositiveRate(0.0))

23/08/02 22:39:59 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
23/08/02 22:40:00 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Accuracy :  0.9310005897385493
Precision :  0.9460645737241482
f1Score :  0.9602446483180428
recall :  0.9748562667688769
false poitive rate :  0.32716606498194944
true poitive rate :  0.9748562667688769


                                                                                

In [74]:
nb_test_split_predictions = naive_bayes_model.transform(test_split)
# score on validations data set / predictions
nb_training_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
nb_training_accuracy = nb_training_evaluator.evaluate(nb_test_split_predictions)
print("Accuracy of NaiveBayes on test dataset = %g"% (nb_training_accuracy))

23/08/02 22:40:01 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


Accuracy of NaiveBayes on test dataset = 0.907288


In [75]:
nd_test_split_metric = MulticlassMetrics(nb_test_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", nd_test_split_metric.accuracy)
print("Precision : ", nd_test_split_metric.precision(0.0))
print("f1Score : ", nd_test_split_metric.fMeasure(0.0))
print("recall : ", nd_test_split_metric.recall(0.0))
print("false poitive rate : ", nd_test_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", nd_test_split_metric.truePositiveRate(0.0))

23/08/02 22:40:02 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
23/08/02 22:40:02 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Accuracy :  0.9072884983665653
Precision :  0.9293488638533511
f1Score :  0.9220422468504309
recall :  0.9148496240601504
false poitive rate :  0.10402024177677818
true poitive rate :  0.9148496240601504




In [76]:
nb_train_split_predictions = naive_bayes_model.transform(train_split)
# score on validations data set / predictions
nb_training_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
nb_training_accuracy = nb_training_evaluator.evaluate(nb_train_split_predictions)
print("Accuracy of NaiveBayes on training dataset = %g"% (nb_training_accuracy))

23/08/02 22:40:04 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Accuracy of NaiveBayes on training dataset = 0.928397




In [77]:
nd_train_split_metric = MulticlassMetrics(nb_train_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", nd_train_split_metric.accuracy)
print("Precision : ", nd_train_split_metric.precision(0.0))
print("f1Score : ", nd_train_split_metric.fMeasure(0.0))
print("recall : ", nd_train_split_metric.recall(0.0))
print("false poitive rate : ", nd_train_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", nd_train_split_metric.truePositiveRate(0.0))

23/08/02 22:40:06 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
23/08/02 22:40:06 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Accuracy :  0.928396805563101
Precision :  0.9385492700729927
f1Score :  0.939792608834681
recall :  0.9410392461805873
false poitive rate :  0.09008828250401284
true poitive rate :  0.9410392461805873




In [78]:
# nb_prediction_spark_df = nb_predictions.select("comment_id",
#                                             "review",
#                                             "review_text",
#                                             "reviewIndexed",
#                                             "prediction",
#                                             "predicted_review_label"
#                                            )
# # nb_prediction_spark_df.show()

In [79]:
# # converting results to pandas and merge with original ny-hotel-english-data
# nb_prediction_pandas_df = nb_prediction_spark_df.toPandas()

In [80]:
# nb_prediction_pandas_df['comment_id'] = nb_prediction_pandas_df['comment_id'].astype('int64')
# nb_prediction_on_ny_hotel_english_reviews = validating_ny_hotel_pd.merge(nb_prediction_pandas_df, on = 'comment_id', how = 'inner')
# nb_prediction_on_ny_hotel_english_reviews = nb_prediction_on_ny_hotel_english_reviews.drop("review_text", axis = 1)

# # nb_prediction_on_ny_hotel_english_reviews
# # 40007 rows × 26 columns

In [81]:
# nb_prediction_on_ny_hotel_english_reviews

In [82]:
# Export Results as csv
#nb_prediction_pandas_df.to_csv('./../predicted_data/nb_prediction_on_ny_hotel_english_reviews.csv.gz', index = False, compression = 'gzip')

### Train Random Forest Classifier model

In [None]:
# feature engineering on review_text for NLP
tokenizer = Tokenizer(inputCol = "review_text", 
                      outputCol = "words")

stopWordsRemover = StopWordsRemover(inputCol = "words", 
                                    outputCol = "words_without_stopwords")

vectorizer = CountVectorizer(inputCol = "words_without_stopwords", 
                                  outputCol = "features")

# We have strings(poitive or negative) in reviews which are our expected converting to index
labelEncoder = StringIndexer(inputCol = 'review', 
                             outputCol = 'reviewIndexed').fit(train_split)

# Use NaiveBayes multinomial model on features and reviewIndexed 
random_forest = RandomForestClassifier(numTrees=100, maxDepth=30, featuresCol = 'features', 
                         labelCol = 'reviewIndexed')

# Convert prediction which are indexed format to labels
labelConverter = IndexToString(inputCol = "prediction", 
                               outputCol = "predicted_review_label", 
                               labels = labelEncoder.labels)

# make pipeline
pipeline = Pipeline(
    stages = [
        tokenizer,
        stopWordsRemover, 
        vectorizer,
        labelEncoder, 
        random_forest, 
        labelConverter
    ])

# fit training data which is non-ny hotel reviews
random_forest_classifier_model = pipeline.fit(train_split)

# make predictions on validating data which is ny-hotel reviews
rfc_predictions = random_forest_classifier_model.transform(prediction_spark_df)

In [84]:
# score on validations data set / predictions
rfc_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
rfc_accuracy = rfc_evaluator.evaluate(rfc_predictions)
print("Accuracy of Random Forest Classifer on prediction data = %g"% (rfc_accuracy))

23/08/02 22:54:11 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy of Random Forest Classifer on prediction data = 0.926676


                                                                                

In [85]:
rfc_metric = MulticlassMetrics(rfc_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", rfc_metric.accuracy)
print("Precision : ", rfc_metric.precision(0.0))
print("f1Score : ", rfc_metric.fMeasure(0.0))
print("recall : ", rfc_metric.recall(0.0))
print("false poitive rate : ", rfc_metric.falsePositiveRate(0.0))
print("true poitive rate : ", rfc_metric.truePositiveRate(0.0))

23/08/02 22:54:16 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB
23/08/02 22:54:18 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy :  0.9266758403774327
Precision :  0.9764915935128701
f1Score :  0.9591173139454167
recall :  0.9423504917797401
false poitive rate :  0.23723723723723725
true poitive rate :  0.9423504917797401


                                                                                

In [86]:
rfc_test_split_predictions = random_forest_classifier_model.transform(test_split)
rfc_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
rfc_accuracy = rfc_evaluator.evaluate(rfc_test_split_predictions)
print("Accuracy of Random Forest Classifer on prediction data = %g"% (rfc_accuracy))

23/08/02 22:54:22 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy of Random Forest Classifer on prediction data = 0.83767




In [87]:
rfc_test_split_metric = MulticlassMetrics(rfc_test_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", rfc_test_split_metric.accuracy)
print("Precision : ", rfc_test_split_metric.precision(0.0))
print("f1Score : ", rfc_test_split_metric.fMeasure(0.0))
print("recall : ", rfc_test_split_metric.recall(0.0))
print("false poitive rate : ", rfc_test_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", rfc_test_split_metric.truePositiveRate(0.0))

23/08/02 22:54:27 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB
23/08/02 22:54:28 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy :  0.8376703841387856
Precision :  0.9683024632423143
f1Score :  0.8755935422602089
recall :  0.799086038449417
false poitive rate :  0.06558672461477677
true poitive rate :  0.799086038449417


                                                                                

In [88]:
rfc_train_split_predictions = random_forest_classifier_model.transform(train_split)
# score on validations data set / predictions
rfc_training_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", predictionCol="prediction", metricName="accuracy")
rfc_training_accuracy = rfc_training_evaluator.evaluate(rfc_train_split_predictions)
print("Accuracy of Random Forest Classifer on training dataset is %g"% (rfc_training_accuracy))

23/08/02 22:54:34 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy of Random Forest Classifer on training dataset is 0.886864


                                                                                

In [89]:
rfc_train_split_metric = MulticlassMetrics(rfc_train_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", rfc_train_split_metric.accuracy)
print("Precision : ", rfc_train_split_metric.precision(0.0))
print("f1Score : ", rfc_train_split_metric.fMeasure(0.0))
print("recall : ", rfc_train_split_metric.recall(0.0))
print("false poitive rate : ", rfc_train_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", rfc_train_split_metric.truePositiveRate(0.0))

23/08/02 22:54:43 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB
23/08/02 22:54:44 WARN DAGScheduler: Broadcasting large task binary with size 19.2 MiB

Accuracy :  0.8868636931602107
Precision :  0.9938868613138686
f1Score :  0.912751115486939
recall :  0.8438625711740326
false poitive rate :  0.012185141402200601
true poitive rate :  0.8438625711740326


                                                                                

### Train SVC model

In [None]:
# feature engineering on review_text for NLP
tokenizer = Tokenizer(inputCol = "review_text", 
                      outputCol = "words")

stopWordsRemover = StopWordsRemover(inputCol = "words", 
                                    outputCol = "words_without_stopwords")

vectorizer = CountVectorizer(inputCol = "words_without_stopwords", 
                                  outputCol = "features")

# We have strings(poitive or negative) in reviews which are our expected converting to index
labelEncoder = StringIndexer(inputCol = 'review', 
                             outputCol = 'reviewIndexed').fit(train_split)

# Use NaiveBayes multinomial model on features and reviewIndexed 
svc = LinearSVC(maxIter=20,
                regParam = 0.01,
                featuresCol = 'features', 
                labelCol = 'reviewIndexed')

# Convert prediction which are indexed format to labels
labelConverter = IndexToString(inputCol = "prediction", 
                               outputCol = "predicted_review_label", 
                               labels = labelEncoder.labels)

# make pipeline
pipeline = Pipeline(
    stages = [
        tokenizer,
        stopWordsRemover, 
        vectorizer,
        labelEncoder, 
        svc, 
        labelConverter
    ])

# fit training data which is non-ny hotel reviews
svc_model = pipeline.fit(train_split)

# make predictions on validating data which is ny-hotel reviews
svc_predictions = svc_model.transform(prediction_spark_df)

In [91]:
# score on validations data set / predictions
svc_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", 
                                                  predictionCol="prediction", 
                                                  metricName="accuracy")
svc_accuracy = svc_evaluator.evaluate(svc_predictions)
print("Accuracy of Linear SVC on prediction data = %g"% (svc_accuracy))

23/08/02 22:55:17 WARN DAGScheduler: Broadcasting large task binary with size 1982.7 KiB

Accuracy of Linear SVC on prediction data = 0.928314


                                                                                

In [92]:
svc_metric = MulticlassMetrics(svc_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", svc_metric.accuracy)
print("Precision : ", svc_metric.precision(0.0))
print("f1Score : ", svc_metric.fMeasure(0.0))
print("recall : ", svc_metric.recall(0.0))
print("false poitive rate : ", svc_metric.falsePositiveRate(0.0))
print("true poitive rate : ", svc_metric.truePositiveRate(0.0))

23/08/02 22:55:18 WARN DAGScheduler: Broadcasting large task binary with size 1978.6 KiB
23/08/02 22:55:18 WARN DAGScheduler: Broadcasting large task binary with size 1990.2 KiB

Accuracy :  0.9283140030142193
Precision :  0.9386252045826514
f1Score :  0.9584472804618657
recall :  0.9791246313828962
false poitive rate :  0.3473684210526316
true poitive rate :  0.9791246313828962


                                                                                

In [93]:
svc_test_split_predictions = svc_model.transform(test_split)
# score on validations data set / predictions
svc_training_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", 
                                                  predictionCol="prediction", 
                                                  metricName="accuracy")
svc_training_accuracy = svc_training_evaluator.evaluate(svc_test_split_predictions)
print("Accuracy of Linear SVC on training data is = %g"% (svc_training_accuracy))

23/08/02 22:55:20 WARN DAGScheduler: Broadcasting large task binary with size 1996.7 KiB


Accuracy of Linear SVC on training data is = 0.913259




In [94]:
svc_test_split_metric = MulticlassMetrics(svc_test_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", svc_test_split_metric.accuracy)
print("Precision : ", svc_test_split_metric.precision(0.0))
print("f1Score : ", svc_test_split_metric.fMeasure(0.0))
print("recall : ", svc_test_split_metric.recall(0.0))
print("false poitive rate : ", svc_test_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", svc_test_split_metric.truePositiveRate(0.0))

23/08/02 22:55:20 WARN DAGScheduler: Broadcasting large task binary with size 1996.4 KiB
23/08/02 22:55:21 WARN DAGScheduler: Broadcasting large task binary with size 2008.1 KiB

Accuracy :  0.9132589838909542
Precision :  0.931640252052702
f1Score :  0.9268617021276596
recall :  0.9221319221319221
false poitive rate :  0.09983268265476855
true poitive rate :  0.9221319221319221


                                                                                

In [95]:
svc_train_split_predictions = svc_model.transform(train_split)
# score on validations data set / predictions
svc_training_evaluator = MulticlassClassificationEvaluator(labelCol="reviewIndexed", 
                                                  predictionCol="prediction", 
                                                  metricName="accuracy")
svc_training_accuracy = svc_training_evaluator.evaluate(svc_train_split_predictions)
print("Accuracy of Linear SVC on training data is = %g"% (svc_training_accuracy))

23/08/02 22:55:22 WARN DAGScheduler: Broadcasting large task binary with size 1996.7 KiB

Accuracy of Linear SVC on training data is = 0.993915




In [96]:
svc_train_split_metric = MulticlassMetrics(svc_train_split_predictions['reviewIndexed','prediction'].rdd)

print("Accuracy : ", svc_train_split_metric.accuracy)
print("Precision : ", svc_train_split_metric.precision(0.0))
print("f1Score : ", svc_train_split_metric.fMeasure(0.0))
print("recall : ", svc_train_split_metric.recall(0.0))
print("false poitive rate : ", svc_train_split_metric.falsePositiveRate(0.0))
print("true poitive rate : ", svc_train_split_metric.truePositiveRate(0.0))

23/08/02 22:55:24 WARN DAGScheduler: Broadcasting large task binary with size 1996.4 KiB
23/08/02 22:55:24 WARN DAGScheduler: Broadcasting large task binary with size 2008.1 KiB

Accuracy :  0.993915358287608
Precision :  0.9972171532846715
f1Score :  0.9949023713076328
recall :  0.992598310780129
false poitive rate :  0.004123850730124392
true poitive rate :  0.992598310780129


                                                                                

### Next step is analysis on obtained predictions to return user friendly results

* Generate WordCloud
* Perform analysis by grouping relevant data