In [1]:
# install packages not native to Google CoLab - PySpark & Java

import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.<enter version>'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [82.3 kB]
Hit:12 http://ppa.launchpad.net/cran/

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [3]:
# Read in Yelp data from Amazon S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [4]:
# Import functions needed in our NLP process/pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [5]:
# Import the length function which acts as our udf for finding the length of words for an input data point
from pyspark.sql.functions import length

# Create a length column to be used as a future feature - data is derived from the length function
data_df = df.withColumn('length', length(df['text']))

data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [6]:
# Create all the features/functions for transformations later in the data set


# Create a feature that will encode a positive or negative score as translated from 
# if the text of the game review results in an overall positive or negative review score
pos_neg_to_num = StringIndexer(inputCol='class', outputCol='label')

# Create a tokenizer feature
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')

# Create a Stop Word Removal filter feature
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')

# Create feature that hashes to create term frequency values for words
hashingTF = HashingTF(inputCol='stop_tokens', outputCol='hash_token')

# Create a feature to apply the HashingTF Term Frequency results to an IDF (Inverse Document Frequency evaluator)
# To determine TF-IDF
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [7]:
# Create a feature vector that contains the output from the IDFModel and the length

# Our code will combine all of the raw features needed to train the Machine Learning Model that we'll be using



# Import necessary features for Vector Assembly
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')


In [8]:
# Create and run our data processing Pipeline



# Import the Pipeline feature
from pyspark.ml import Pipeline

# Create/Format the Pipeline in chronological order as to ensure that previous cells feed into subsequent cells
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [11]:
# Now We need to fit the outcome with our original DataFrame and then transform it


# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)

cleaned = cleaner.transform(data_df)

cleaned.select(['label', 'class', 'length', 'features', 'text']).show(truncate=False)

+-----+--------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------+
|label|class   |length|features                                                                                                                                                                                                                                                                     |text                                                                                                           |
+-----+--------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
# Now lets run our Machine Learning model on the data

# We need to break the data into Training Data and Testing Data
                    # Testing Data: Data passed into our NLP model that trains it to predict results
                                    # Conventionally, Training Data is allocated 70% of the overall data
                    # Testing Data: Data used to test predictions
                                    # Conventionally, Testing Data is allocated 30% of the overall data
                    # Seed Number: An Overall Arbitrary number used to identify a particular testing instance
                            # AS LONG AS THE SAME SEED IS USED, THE RESULT WILL BE THE SAME EACH TIME
                            # USING A SEED NUMBER ENSURES REPRODUCIBLE RESULTS

# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [14]:
# We'll use a - Naive Bayes - Machine Learning Mode
# Naive Bayes is a group of classifier algorithms based on Bayes' theorem
# Bayes' theorem provides a way to determine the probability of an event based on new conditions
# Or based on information that might be related to the event


# Import Naive Bayes ML (Machine Learning) model
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

# Transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

# ^ The import outputs/columns we're looking for are the prediction column(s) to the far right of the resulting DataFrame
# The prediction column will indicate with a 1.0 if the model thinks this review is negative
# The prediction column will indicate with a 0.0 if the model thinks this review is positive


+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [15]:
# Now we need to evaluate for ourselves how well the model performs in its assigned task: prediction

# To do so we'll input the BinaryClassificationEvaluator, which displays how accurate our model is
# At determining if a review will be positive or negative based solely on the text within the review

#  The BinaryClassificationEvaluator uses two arguments: labelCol and rawPredictionCol
              # labelCol takes the resulting labels from using StringIndexer (from pos_neg_to_num feature)
                              # labelCol then converts our positive and negative strings to integers
              # rawPredictionCol takes in numerical predictions from the output of running the Naive Bayes model

# The performance of a model can be measured based on the difference between its predicted values and actual values
# This is what the BinaryClassificationEvaluator does

# import BinaryClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.700298
