In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



### Feature Transformations


In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set

#we're making all of this but we're not calling them yet

#indexer convers values to numbers so it will convert positive to one number and negative to one number
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')

#make a tokenizer convert inputs into a numeric vectors
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")

#make stopwords remover
stopremove = StopWordsRemover(inputCol='token_text',outputCol='tokens_without_stop_words')

#convert the tokens without stop words
hashingTF = HashingTF(inputCol="tokens_without_stop_words", outputCol='hash_token')


idf = IDF(inputCol='hash_token', outputCol='idf_token')

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors which are the numbers the model will be learning from
features = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')


In [0]:
# Create a and run a data processing Pipeline, this will do one step at a time in order 
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf,features])

In [18]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)
cleaned.show()

+--------+--------------------+------+-----+--------------------+-------------------------+--------------------+--------------------+--------------------+
|   class|                text|length|label|          token_text|tokens_without_stop_words|          hash_token|           idf_token|            features|
+--------+--------------------+------+-----+--------------------+-------------------------+--------------------+--------------------+--------------------+
|positive|Wow... Loved this...|    24|  0.0|[wow..., loved, t...|     [wow..., loved, p...|(262144,[33933,69...|(262144,[33933,69...|(262145,[33933,69...|
|negative|  Crust is not good.|    18|  1.0|[crust, is, not, ...|           [crust, good.]|(262144,[150903,1...|(262144,[150903,1...|(262145,[150903,1...|
|negative|Not tasty and the...|    41|  1.0|[not, tasty, and,...|     [tasty, texture, ...|(262144,[63367,11...|(262144,[63367,11...|(262145,[63367,11...|
|positive|Stopped by during...|    87|  0.0|[stopped, by, dur...|     

In [27]:
# Show label and resulting features
use_for_model= cleaned.select(['label', 'features'])
use_for_model.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[33933,69...|
|  1.0|(262145,[150903,1...|
|  1.0|(262145,[63367,11...|
|  0.0|(262145,[6286,272...|
|  0.0|(262145,[6979,911...|
|  1.0|(262145,[24661,34...|
|  1.0|(262145,[101702,2...|
|  1.0|(262145,[3645,855...|
|  0.0|(262145,[53777,13...|
|  0.0|(262145,[138356,2...|
|  0.0|(262145,[24113,20...|
|  1.0|(262145,[172477,1...|
|  1.0|(262145,[36200,40...|
|  0.0|(262145,[18098,83...|
|  1.0|(262145,[89493,95...|
|  1.0|(262145,[86431,10...|
|  0.0|(262145,[31704,21...|
|  1.0|(262145,[27707,65...|
|  1.0|(262145,[12329,61...|
|  1.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



In [0]:
# impoting our model wich is naivebayes
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set we are giving 75% for training and 25% for testing
training, testing = use_for_model.randomSplit([0.75, 0.25])

# Create a Naive Bayes model and fit training data
model = NaiveBayes()
classifier = model.fit(training)

In [29]:
# Tranform the model with the testing data
test_results = classifier.transform(testing)
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(262145,[14,31463...|[-443.91527844789...|[0.00950475889819...|       1.0|
|  0.0|(262145,[14,32970...|[-335.62519857118...|[0.23060072505452...|       1.0|
|  0.0|(262145,[14,56428...|[-465.06763583831...|[0.99999999999998...|       0.0|
|  0.0|(262145,[14,89717...|[-392.79600374413...|[1.89944578980408...|       1.0|
|  0.0|(262145,[78,5377,...|[-609.31341359553...|[0.9999997971447,...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [30]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.729222
