In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.3'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [2 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Get:4 https://developer.download.nvidia.com/comp

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieReviews").getOrCreate()

In [8]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
from pyspark.sql import SQLContext
url = "https://databootcamp-washu-reviews-project.s3.us-east-2.amazonaws.com/IMDB_Dataset.csv"
filename = "IMDB_Dataset.csv"
# add the file to the spark context
spark.sparkContext.addFile(url)
# use sql context to add more options when reading csv
sqlContext = SQLContext(spark)
df = sqlContext.read.format('csv').option('header','true').option('maxColumns','2').option('escape','"').load(SparkFiles.get(filename))
# Show DataFrame
df.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
|Phil the Alien is...| negative|
|I saw this movie ...| negative|
|So im not a big f...| negative|
|The cast played S...| negative|
|This a fantastic ...| positive|
|Kind of drawn in ...| negative|
|Some films just s...| positive|
|This movie made i...| negative|
|I remember this f...| positive|
|An awful film! It...| negative|
+--------------------+---------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
df = df.withColumn('length', length(df['review']))
df.show()

+--------------------+---------+------+
|              review|sentiment|length|
+--------------------+---------+------+
|One of the other ...| positive|  1761|
|A wonderful littl...| positive|   998|
|I thought this wa...| positive|   926|
|Basically there's...| negative|   748|
|Petter Mattei's "...| positive|  1317|
|Probably my all-t...| positive|   656|
|I sure would like...| positive|   726|
|This show was an ...| negative|   934|
|Encouraged by the...| negative|   681|
|If you like origi...| positive|   176|
|Phil the Alien is...| negative|   578|
|I saw this movie ...| negative|   937|
|So im not a big f...| negative|  2227|
|The cast played S...| negative|   662|
|This a fantastic ...| positive|   275|
|Kind of drawn in ...| negative|   830|
|Some films just s...| positive|   720|
|This movie made i...| negative|  1322|
|I remember this f...| positive|   639|
|An awful film! It...| negative|   741|
+--------------------+---------+------+
only showing top 20 rows



In [11]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set

# adds column to represent sentiment numarically, 1 = positive, 0 = negative
pos_neg_to_num = StringIndexer(inputCol='sentiment', outputCol='label')
# breaks each review into its individual words
tokenizer = Tokenizer(inputCol="review", outputCol="token_text")
# removes stop words using deafault list
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_removed_tokens')
# calculates the frequency of words in each reiview 
hashingTF = HashingTF(inputCol="stop_removed_tokens", outputCol='hash_token')
# calculates the frequency of words accross whole set
idf = IDF(inputCol='hash_token', outputCol='idf_token')

# TF-IDF is a statistical measure that evaluates how relevant a word is to a document in a collection of documents. 
# This is done by multiplying two metrics: how many times a word appears in a document(hashingTF), and the inverse document frequency of the word across a set of documents(idf).

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors, combining the columns into one feature vector
vectorized = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [13]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, vectorized])

In [14]:
# fit the data to the pipline, then transform the data
data_cleaner = data_prep_pipeline.fit(df)
cleaned_df = data_cleaner.transform(df)

In [17]:
# Show label and resulting features
cleaned_df.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[3280,436...|
|  1.0|(262145,[120,521,...|
|  1.0|(262145,[1043,139...|
|  0.0|(262145,[6512,853...|
|  1.0|(262145,[2751,392...|
|  1.0|(262145,[5381,158...|
|  1.0|(262145,[1889,545...|
|  0.0|(262145,[2437,825...|
|  0.0|(262145,[8538,149...|
|  1.0|(262145,[25629,31...|
|  0.0|(262145,[1326,172...|
|  0.0|(262145,[5451,702...|
|  0.0|(262145,[3493,427...|
|  0.0|(262145,[5670,761...|
|  1.0|(262145,[6558,215...|
|  0.0|(262145,[2101,243...|
|  1.0|(262145,[2701,161...|
|  0.0|(262145,[329,3535...|
|  1.0|(262145,[1009,378...|
|  0.0|(262145,[3924,156...|
+-----+--------------------+
only showing top 20 rows



In [18]:
from pyspark.ml.classification import NaiveBayes

# split the data up into training, and testing sets
training, testing = cleaned_df.randomSplit([0.75, 0.25])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [19]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+---------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|              review|sentiment|length|label|          token_text| stop_removed_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+---------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|!!!! POSSIBLE MIL...| negative|   791|  0.0|[!!!!, possible, ...|[!!!!, possible, ...|(262144,[6512,142...|(262144,[6512,142...|(262145,[6512,142...|[-4505.8785988956...|[0.99999999999999...|       0.0|
|"A Gentleman's Ga...| negative|  1220|  0.0|["a, gentleman's,...|["a, gentleman's,...|(262144,[991,5760...|(262144,[991,5760...|(262145,[991,5760...|[-6025.9654839835...|[0.9999999999

In [20]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.821284
