## MSBX 5420 Assignment 3
This assignment is about Spark Machine Learning and Spark Streaming. First two tasks focus on machine learning, and the third one combines machine learning and streaming analysis. We will use IMDB reviews data for the whole assignment.

### Task 1 - Topic Modeling on Moive Reviews with Spark ML
First of all, let's load the data. The data structure is very simple.One column is review text, and another column is the label of review sentiment (positive or negative). Same as exercise, we can load .csv.gz file directly from spark.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]').config("spark.executor.memory", "1g").config("spark.driver.memory", "2g").appName('spark_ml_imdb').getOrCreate()
#for cluster - change kernel to PySpark
#spark = SparkSession.builder.master('spark://spark-master:7077').appName('spark_ml_imdb').getOrCreate()

In [2]:
reviews = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('data/IMDB_Reviews.csv.gz', header=True)
#for cluster
#reviews = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('s3://msbx5420-spr21/zhiyiwang/IMDB_Reviews.csv.gz', header=True)
reviews.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
|Phil the Alien is...| negative|
|I saw this movie ...| negative|
|So im not a big f...| negative|
|The cast played S...| negative|
|This a fantastic ...| positive|
|Kind of drawn in ...| negative|
|Some films just s...| positive|
|This movie made i...| negative|
|I remember this f...| positive|
|An awful film! It...| negative|
+--------------------+---------+
only showing top 20 rows



First, we should clean up the review texts. Besides those special characters we have tried to remove in exercise, here we also need to remove the html tags in the text.

In [3]:
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft

#remove html tags in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '<[^>]+>', ' '))
#remove special characters and line breaks in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '([^\s\w_]|_)+', ' ')).withColumn('review', fn.regexp_replace(fn.col("review"), '[\n\r]', ' '))
reviews.take(1)

[Row(review='One of the other reviewers has mentioned that after watching just 1 Oz episode you ll be hooked  They are right  as this is exactly what happened with me   The first thing that struck me about Oz was its brutality and unflinching scenes of violence  which set in right from the word GO  Trust me  this is not a show for the faint hearted or timid  This show pulls no punches with regards to drugs  sex or violence  Its is hardcore  in the classic use of the word   It is called OZ as that is the nickname given to the Oswald Maximum Security State Penitentary  It focuses mainly on Emerald City  an experimental section of the prison where all the cells have glass fronts and face inwards  so privacy is not high on the agenda  Em City is home to many Aryans  Muslims  gangstas  Latinos  Christians  Italians  Irish and more so scuffles  death stares  dodgy dealings and shady agreements are never far away   I would say the main appeal of the show is due to the fact that it goes where 

Now let's create tokenizer to start the data processing.

In [4]:
tokenizer = ft.RegexTokenizer(inputCol='review', outputCol='review_tok', pattern='\s+|[,.\"/!]')
tokenizer.transform(reviews).select('review_tok').take(1)

[Row(review_tok=['one', 'of', 'the', 'other', 'reviewers', 'has', 'mentioned', 'that', 'after', 'watching', 'just', '1', 'oz', 'episode', 'you', 'll', 'be', 'hooked', 'they', 'are', 'right', 'as', 'this', 'is', 'exactly', 'what', 'happened', 'with', 'me', 'the', 'first', 'thing', 'that', 'struck', 'me', 'about', 'oz', 'was', 'its', 'brutality', 'and', 'unflinching', 'scenes', 'of', 'violence', 'which', 'set', 'in', 'right', 'from', 'the', 'word', 'go', 'trust', 'me', 'this', 'is', 'not', 'a', 'show', 'for', 'the', 'faint', 'hearted', 'or', 'timid', 'this', 'show', 'pulls', 'no', 'punches', 'with', 'regards', 'to', 'drugs', 'sex', 'or', 'violence', 'its', 'is', 'hardcore', 'in', 'the', 'classic', 'use', 'of', 'the', 'word', 'it', 'is', 'called', 'oz', 'as', 'that', 'is', 'the', 'nickname', 'given', 'to', 'the', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'it', 'focuses', 'mainly', 'on', 'emerald', 'city', 'an', 'experimental', 'section', 'of', 'the', 'prison', 'where', 'all

Then remove stopwords in the text.

In [5]:
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='review_stop')
stopwords.transform(tokenizer.transform(reviews)).select('review_stop').take(1)

[Row(review_stop=['one', 'reviewers', 'mentioned', 'watching', '1', 'oz', 'episode', 'll', 'hooked', 'right', 'exactly', 'happened', 'first', 'thing', 'struck', 'oz', 'brutality', 'unflinching', 'scenes', 'violence', 'set', 'right', 'word', 'go', 'trust', 'show', 'faint', 'hearted', 'timid', 'show', 'pulls', 'punches', 'regards', 'drugs', 'sex', 'violence', 'hardcore', 'classic', 'use', 'word', 'called', 'oz', 'nickname', 'given', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'focuses', 'mainly', 'emerald', 'city', 'experimental', 'section', 'prison', 'cells', 'glass', 'fronts', 'face', 'inwards', 'privacy', 'high', 'agenda', 'em', 'city', 'home', 'many', 'aryans', 'muslims', 'gangstas', 'latinos', 'christians', 'italians', 'irish', 'scuffles', 'death', 'stares', 'dodgy', 'dealings', 'shady', 'agreements', 'never', 'far', 'away', 'say', 'main', 'appeal', 'show', 'due', 'fact', 'goes', 'shows', 'wouldn', 'dare', 'forget', 'pretty', 'pictures', 'painted', 'mainstream', 'audien

Now same as what we did in the exercise, let's create `CountVectorizer` to transform the text into term frequency vector. 

In [7]:
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf')
tokenized = stopwords.transform(tokenizer.transform(reviews))
tf.fit(tokenized).transform(tokenized).select('review_tf').take(1)

[Row(review_tf=SparseVector(101111, {2: 1.0, 10: 1.0, 13: 2.0, 17: 2.0, 28: 1.0, 32: 1.0, 35: 3.0, 39: 1.0, 45: 2.0, 46: 1.0, 50: 1.0, 53: 1.0, 54: 2.0, 57: 1.0, 83: 1.0, 85: 1.0, 91: 1.0, 93: 1.0, 97: 1.0, 101: 2.0, 108: 1.0, 121: 1.0, 128: 3.0, 138: 2.0, 160: 1.0, 161: 1.0, 169: 1.0, 174: 1.0, 184: 1.0, 191: 2.0, 195: 1.0, 217: 1.0, 235: 1.0, 249: 1.0, 250: 1.0, 251: 1.0, 264: 1.0, 278: 1.0, 286: 2.0, 302: 1.0, 316: 1.0, 324: 1.0, 370: 1.0, 386: 1.0, 409: 2.0, 438: 1.0, 448: 4.0, 453: 1.0, 459: 1.0, 480: 1.0, 501: 1.0, 514: 1.0, 526: 1.0, 534: 2.0, 535: 1.0, 567: 2.0, 582: 1.0, 685: 1.0, 707: 3.0, 754: 1.0, 779: 1.0, 826: 1.0, 921: 1.0, 939: 1.0, 1073: 3.0, 1092: 1.0, 1108: 1.0, 1146: 1.0, 1181: 1.0, 1210: 1.0, 1292: 1.0, 1293: 1.0, 1313: 1.0, 1348: 1.0, 1462: 1.0, 1475: 1.0, 1496: 1.0, 1619: 1.0, 1897: 1.0, 1917: 1.0, 1936: 1.0, 2019: 1.0, 2115: 1.0, 2206: 1.0, 2328: 1.0, 2345: 1.0, 2375: 1.0, 2422: 1.0, 2472: 1.0, 2591: 1.0, 2790: 1.0, 2794: 1.0, 2863: 1.0, 2899: 1.0, 2971: 6.0, 30

Then we use the `LDA` model to do topic modeling. We create the model here with 30 topics.

In [8]:
import pyspark.ml.clustering as clus
lda = clus.LDA(k=30, optimizer='online', maxIter=10, featuresCol=tf.getOutputCol())

Now let's build the pipeline to train the topic model from the raw data. It will take a while to run.

In [9]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, stopwords, tf, lda])
reviews_train, reviews_test = reviews.randomSplit([0.7, 0.3], seed=200)
pipeline_model = pipeline.fit(reviews_train)
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.9946, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9894, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.9895, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005,

Let's see if we have properly discovered the topics. This is just the same code we display topics in the exercise - we will reuse it several times here.

In [10]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
        print(word)
    print("*"*25)

topic: 0
*************************
film
movie
story
good
like
one
really
first
quite
end
ever
bad
swedish
world
another
without
new
little
hell
get
*************************
topic: 1
*************************
film
movie
one
make
love
good
school
find
bad
plot
know
may
acting
many
story
little
character
well
time
also
*************************
topic: 2
*************************
manu
arjun
show
one
film
get
like
original
around
days
people
scene
time
model
happy
good
watch
looking
series
see
*************************
topic: 3
*************************
movie
like
one
good
movies
see
people
ever
time
make
watching
even
think
get
funny
really
way
bad
watch
story
*************************
topic: 4
*************************
film
movie
like
films
see
one
even
really
good
bad
much
better
seen
people
ve
life
think
art
well
end
*************************
topic: 5
*************************
film
movie
one
even
first
know
ll
never
think
see
real
well
life
actors
like
good
many
love
beach
story
******

How do you think about the topics? Do they make sense? If you think the topics we get from the movie reviews should be better, let's continue to see what we can do to make them better.

One possible reason is that we have many words that do not show up frequently. That is, they are very specific words to certain movies but don't occur across reviews. Such words are not very meaningful and they do not represent common themes in those reviews. So here we limit the frequency of words to at least 5 and run LDA with pipeline again.

In [11]:
#filter the countvectorizer
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf', minDF=5)

In [12]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, stopwords, tf, lda])
reviews_train, reviews_test = reviews.randomSplit([0.7, 0.3], seed=200)
pipeline_model = pipeline.fit(reviews_train)
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.1895, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0627, 0.0002, 0.0002, 0.0002, 0.0002, 0.7426, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002])),
 Row(topicDistribution=DenseVector([0.0004, 0.4049, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.5847, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0004, 0.8426, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.147, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0005, 0.9863, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0006, 0.0005, 0.0005, 

In [13]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
show
like
one
good
really
life
little
movie
also
well
watch
get
us
ll
children
story
bad
new
barney
*************************
topic: 1
*************************
movie
film
one
like
good
story
time
see
bad
even
great
really
well
movies
people
get
acting
also
plot
much
*************************
topic: 2
*************************
film
people
movie
bed
many
one
garde
man
inspector
even
find
new
romy
times
also
old
must
funny
suspicion
way
*************************
topic: 3
*************************
one
movie
life
characters
say
great
tom
best
new
story
family
time
make
movies
get
real
good
still
performance
long
*************************
topic: 4
*************************
movie
one
good
batman
like
film
really
bad
time
even
movies
see
great
well
also
love
kids
acting
people
make
*************************
topic: 5
*************************
movie
film
one
good
see
story
even
time
really
first
know
make
way
also
little
much
like
get
scenes
plot
********

It is expected that the topics are getting better but still not very satisfying. Some words may be very specific to some reviews. Also, there are lots of words shown in different topics many times; possibly they are too common so they shouldn't be that important. Let's take one more step to use TF-IDF vector rather than TF vector. To build IF-IDF, we first create TF with CountVectorizer then create IDF from TF vector. Then we run LDA model with IF-IDF vector.

In [14]:
#use tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="review_tf", vocabSize=10000)
idf = ft.IDF(inputCol=tf.getOutputCol(), outputCol="review_tfidf", minDocFreq=5)

In [15]:
#[Your Code] to create a LDA model (30 topics) and put everything together into a ML pipeline to fit LDA

pipeline = Pipeline(stages=[tokenizer, stopwords, tf, idf, lda])
reviews_train, reviews_test = reviews.randomSplit([0.7, 0.3], seed=200)
pipeline_model = pipeline.fit(reviews_train)
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

[Row(topicDistribution=DenseVector([0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.6821, 0.0002, 0.312, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002, 0.0002])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.1927, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.7956, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.7296, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.2598, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004])),
 Row(topicDistribution=DenseVector([0.0005, 0.1676, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.6834, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 

In [16]:
#Code to extract topics from models
tf_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[4]
vocab = tf_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
one
movie
story
like
really
pinhead
good
flynn
quite
little
merchant
hell
life
end
hellraiser
well
time
see
love
*************************
topic: 1
*************************
movie
one
movies
good
really
like
ever
see
film
even
great
bad
people
10
first
made
seen
worst
say
time
*************************
topic: 2
*************************
armstrong
earl
film
sullivan
bobby
one
movie
good
japanese
sickness
though
war
travolta
way
like
cowboy
man
ever
urban
well
*************************
topic: 3
*************************
film
movie
well
velvet
one
like
great
horse
much
think
made
good
time
way
life
even
day
watch
make
things
*************************
topic: 4
*************************
movie
film
one
like
good
time
even
life
see
bad
make
watch
much
mathieu
old
something
really
funny
plot
movies
*************************
topic: 5
*************************
scooby
one
doo
show
movie
film
best
shaggy
well
really
story
life
characters
better
episode
even


The topics should be more reasonable now. You should believe they can still be further improved by cleaning up the text and tuning the hyperparameters but let's stop here for assignment. If you want to try yourself beyond the assignment, you can change the model configuration to see if you can get any further improvement.

### Task 2 - Movie Review Sentiment Analysis with Spark ML
The second task we are going to prediction. Let's continue with the reviews data and now we can do sentiment analysis with the TF-IDF. So with the TF-IDF vector, we can train and predict review sentiment.

In [17]:
#first let's confirm the potential labels
#(it is possible sentiment can be neutral so we should make sure if that's the case)
reviews.select('sentiment').distinct().show()

+---------+
|sentiment|
+---------+
| positive|
| negative|
+---------+



In [18]:
#let's create the binary numerical lable from postive/negative
reviews = reviews.withColumn('sentiment_label', fn.when(fn.col('sentiment')=='positive', 1.0).otherwise(0.0))
reviews.show()

+--------------------+---------+---------------+
|              review|sentiment|sentiment_label|
+--------------------+---------+---------------+
|One of the other ...| positive|            1.0|
|A wonderful littl...| positive|            1.0|
|I thought this wa...| positive|            1.0|
|Basically there s...| negative|            0.0|
|Petter Mattei s  ...| positive|            1.0|
|Probably my all t...| positive|            1.0|
|I sure would like...| positive|            1.0|
|This show was an ...| negative|            0.0|
|Encouraged by the...| negative|            0.0|
|If you like origi...| positive|            1.0|
|Phil the Alien is...| negative|            0.0|
|I saw this movie ...| negative|            0.0|
|So im not a big f...| negative|            0.0|
|The cast played S...| negative|            0.0|
|This a fantastic ...| positive|            1.0|
|Kind of drawn in ...| negative|            0.0|
|Some films just s...| positive|            1.0|
|This movie made i..

In [19]:
#split the training and testing set, with 80/20
reviews_train, reviews_test = reviews.randomSplit([0.8, 0.2], seed=200)

In [20]:
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline

#craete logistic gression model and then build the pipeline to train the model
lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol=idf.getOutputCol())

In [21]:
#[Your Code] to build a ML pipeline and train logistic regression model
pipeline = Pipeline(stages=[tokenizer, stopwords, tf, idf, lr])
lr_model = pipeline.fit(reviews_train)

In [22]:
#make predictions with pipeline model
predictions = lr_model.transform(reviews_test)

In [23]:
import pyspark.ml.evaluation as ev
#model evaluation for binary classification
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9477311761170099
0.9440730582447


The prediction performance looks acceptable. Here note that TF-IDF is a long vector (here we select top 10000 words, but still a large number), so let's try something different. As mentioned in the class, another way to model text is word embedding with the Word2Vec model. So next we create word vector and use it to predict sentiment.

In [25]:
#create word2vec model
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")
#craete logistic gression model and then build the pipeline to train the model
lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol=word2vec.getOutputCol())

In [26]:
#same logistic regression model, but take output from word2vec model
#[Your Code] to create a logistic regression model and build pipeline with word2vec to train logistic regession; then make predictions and evaluate model (areaUnderROC and areaUnderPR)
pipeline = Pipeline(stages=[tokenizer, stopwords, word2vec, lr])
lr_model = pipeline.fit(reviews_train)
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9477311761170099
0.9440730582447


Check the prediction performance with only 100 features; word2vec model is a very useful representation of words and it reduces dimensionality significantly. 

In the end, let's try an alternative model. In classfication, Support Vector Machine (SVM) is commonly used and let's see how we can use it here. We will keep the orginal configuration of word2vec model (vector size is 100) here for SVM.

In [27]:
#same word2vec model configuration is adopted here
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")
#create svm with LinearSVC, with features from word2vec model outputCol
svm = cl.LinearSVC(maxIter=10, labelCol='sentiment_label', featuresCol=word2vec.getOutputCol())

In [28]:
#build the ml pipeline and train the model; then make predictions 
#[Your Code] to build the ML pipeline to train SVM model; then make predictions (no need to evaluate, the evaluation is slightly different here so provided below)
pipeline = Pipeline(stages=[tokenizer, stopwords, word2vec, svm])
svm_model = pipeline.fit(reviews_train)
predictions = svm_model.transform(reviews_test)

In [29]:
#model evaluation, here slightly different for LinearSVC
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9295114005209375
0.9270642267120186


### Task 3 - Combine Spark ML and Streaming Analysis
Now we have our sentiment prediction model with acceptable predictive performance. The last task is to combine this machine learning mode with spark streaming. That is, with a data stream, we will use the trained model to make real-time predictions. We will still use IMDB reviews and here we will create a simulated data stream from files in a folder, then receive the review stream and predict sentiment using spark structured streaming.

We will first create multiple files so that we can simulate a data stream to read files incrementally from a folder. To do that, we use the code below - it creates a folder 'review_stream', turn the spark dataframe into pandas dataframe (this is not a recommended approach but here we use it for convenience and conciseness), split the data into 100 smaller ones, and then save each smaller csv file into the folder.

In [30]:
reviews_test_stream = reviews_test.withColumn('review_id', fn.monotonically_increasing_id())

In [31]:
#if you use cluster, do not run the code below, just stream from s3 directory with code for cluster in the next cell 

import pandas as pd
import numpy as np
import os

#code for local mode
#here we just create a local directory to simulate the data stream
if not os.path.exists('review_stream'):
    os.mkdir('review_stream')

#here we use pandas at local docker to speed up, with spark it is slow
df = reviews_test_stream.orderBy('review_id').toPandas()
#here we write a small number of rows into each csv file and read data stream from these files
#we can split the dataframe into 100 smaller ones so each one has 100 rows
for i,chunk in enumerate(np.array_split(df, 100)):
    chunk.to_csv('./review_stream/reviews_{}.csv'.format(i), index=False)

Now we can read from the stream. For convenience, we just take the existing spark dataframe to reuse the schema.

In [32]:
#read from data stream in a folder
streaming_review = spark.readStream.schema(reviews_test_stream.schema).option("maxFilesPerTrigger", 1).csv('./review_stream')
#for cluster
#streaming_review = spark.readStream.schema(reviews_test_stream.schema).option("maxFilesPerTrigger", 1).csv('s3://msbx5420-spr21/zhiyiwang/review_stream')

Here from the data stream, we want to know two results. First, how many positive or negative reviews we have received in real time? Second, how many positive or negative reviews in each time window (so we know whether there is a peak of positive or negative review in certain time)? So we will do some calculations, and to capture time window, we will use the current timestamp to create 'processing_time' (the time we receive the data) and apply window on this timestamp.

In [33]:
streaming_review_time = streaming_review.withColumn('processing_time', fn.current_timestamp())

In [34]:
#we can just take the pipeline model we have trained to make prediction
#[Your Code] to use the last pipeline model with SVM to make predictions on 'streaming_review_time'; save the result as 'streaming_sentiment'
streaming_sentiment = svm_model.transform(streaming_review_time)


In [35]:
#create a predicted text label from 'predicted column' is the prediction
streaming_sentiment = streaming_sentiment.withColumn('predicted', fn.when(fn.col('prediction')==1.0, 'positive').otherwise('negative'))

#here we do transformations to get the results we need
#first, get total number of positive and negative reviews we have received
streaming_sentiment_count = streaming_sentiment.groupBy('predicted').count()

#second, still number of positive and negative reviews we received, but by time window (60 seconds)
streaming_sentiment_window_count = streaming_sentiment.groupBy(fn.window('processing_time', '60 seconds'), 'predicted').count()

In [36]:
#now we have two streaming dataframe results
print(streaming_sentiment_count.isStreaming)
print(streaming_sentiment_window_count.isStreaming)

True
True


In [37]:
#now we can define query to start streaming analysis and set the result table as 'sentiment'
query_sentiment = (streaming_sentiment_count.writeStream.format("memory").queryName("sentiment").outputMode("complete").start())

In [38]:
#define another query for the result table of windowed positive and negative review counts
#[Your Code] to define the second query, name of result table is 'sentiment_window'
query_sentiment_window = (streaming_sentiment_window_count.writeStream.format("memory").queryName("sentiment_window").outputMode("complete").start())

In [39]:
#query the first result table to monitor real time results
spark.sql('select * from sentiment').show()

+---------+-----+
|predicted|count|
+---------+-----+
| positive| 2001|
| negative| 1736|
+---------+-----+



In [40]:
#query the second result table to monitor real time results, order the results by window
spark.sql('select * from sentiment_window order by window').show(truncate=False)

+------------------------------------------+---------+-----+
|window                                    |predicted|count|
+------------------------------------------+---------+-----+
|[2021-03-22 00:02:00, 2021-03-22 00:03:00]|negative |122  |
|[2021-03-22 00:02:00, 2021-03-22 00:03:00]|positive |181  |
|[2021-03-22 00:03:00, 2021-03-22 00:04:00]|positive |543  |
|[2021-03-22 00:03:00, 2021-03-22 00:04:00]|negative |467  |
|[2021-03-22 00:04:00, 2021-03-22 00:05:00]|positive |646  |
|[2021-03-22 00:04:00, 2021-03-22 00:05:00]|negative |465  |
|[2021-03-22 00:05:00, 2021-03-22 00:06:00]|positive |546  |
|[2021-03-22 00:05:00, 2021-03-22 00:06:00]|negative |565  |
|[2021-03-22 00:06:00, 2021-03-22 00:07:00]|negative |117  |
|[2021-03-22 00:06:00, 2021-03-22 00:07:00]|positive |85   |
+------------------------------------------+---------+-----+



In [41]:
#stop query to finish streaming analysis
query_sentiment.stop()

In [42]:
#stop query to finish streaming analysis
query_sentiment_window.stop()