In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [0]:
# Import functions that will be used in NLP process (pipeline)
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [0]:
from pyspark.sql.functions import length

# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [0]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

Note that the StringIndexer encodes a string column to a column of table indexes.

We're working with positive and negative game reviews, which will be converted to 0 and 1. This will form our labels, which we'll delve into in the ML unit. The label is what we’re trying to predict: will the review’s given text let us know if it was positive or negative?

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

We’ll create a feature vector containing the output from the IDFModel (the last stage in the pipeline) and the length. This will combine all the raw features to train the ML model that we’ll be using. 

In [0]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline

data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

Import the pipeline from pyspark.ml, and then store a list of the stages created earlier. It’s important to list the stages in the order they need to be executed becausethe output from one stage will then be passed off to another stage.

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [0]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[33933,69...|
|  1.0|(262145,[15889,13...|
|  1.0|(262145,[25570,63...|
|  0.0|(262145,[6286,272...|
|  0.0|(262145,[6979,255...|
|  1.0|(262145,[24417,24...|
|  1.0|(262145,[12084,48...|
|  1.0|(262145,[3645,963...|
|  0.0|(262145,[53777,10...|
|  0.0|(262145,[138356,2...|
|  0.0|(262145,[24113,25...|
|  1.0|(262145,[68867,13...|
|  1.0|(262145,[24417,36...|
|  0.0|(262145,[18098,24...|
|  1.0|(262145,[24417,25...|
|  1.0|(262145,[24417,25...|
|  0.0|(262145,[31704,21...|
|  1.0|(262145,[25570,27...|
|  1.0|(262145,[12329,15...|
|  1.0|(262145,[8287,139...|
+-----+--------------------+
only showing top 20 rows



**Training data** is the data that will be passed to our NLP model that will train our model to predict results. The testing data is used to test our predictions. We can do this with the randomSplit method, which takes in a list of the percent of data we want split into each group. Standard conventions use 70% with training and 30% with testing.

In [0]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

The array supplied to randomSplit is the percentage of the data that will be broken into training and testing respectively. So 70% to training and 30% to testing. The second number supplied is called a seed. The seed assures that this random split will also be the same, this is good for when we want to make sure we all have similar results.

In [0]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

Import ML model "NaiveBayes". **Naive Bayes** is a group of classifier algorithms based on Bayes’ theorem. Bayes theorem provides a way to determine the probability of an event based on new conditions or information that might be related to the event.

In [0]:
# Transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative| "As for the ""mains|    19|  1.0|["as, for, the, "...|      ["as, ""mains]|(262144,[16332,10...|(262144,[16332,10...|(262145,[16332,10...|[-235.05580770267...|[0.03900923601832...|       1.0|
|negative|"I don't know wha...|    85|  1.0|["i, don't, know,...|["i, know, big, d...|(262144,[8478,158...|(262144,[8478,158...|(262145,[8478,158...|[-876.28450835078...|[5.82709985215757.

This prediction column will indicate with a 1.0 if the model thinks this review is negative and 0.0 if it thinks it’s positive.

In [0]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuray of model at predicting reviews was: %f" % acc)

Accuray of model at predicting reviews was: 0.718984


**MulticlassClassificationEvaluator** displays how accurate our model is in determining if a review will be positive or negative based solely on the text within a review.

total number of correct predictions for X
/
total number of predictions made