## SPARK STREAMING AND MACHINE LEARNING WITH SPARK

### Topic Modeling on Moive Reviews with Spark ML


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "2g").appName('spark_ml_imdb').getOrCreate()

In [2]:
reviews = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('IMDB_Reviews.csv.gz', header=True)
reviews.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
|Phil the Alien is...| negative|
|I saw this movie ...| negative|
|So im not a big f...| negative|
|The cast played S...| negative|
|This a fantastic ...| positive|
|Kind of drawn in ...| negative|
|Some films just s...| positive|
|This movie made i...| negative|
|I remember this f...| positive|
|An awful film! It...| negative|
+--------------------+---------+
only showing top 20 rows



First, we should clean up the review texts. Besides those special characters we have tried to remove in exercise, here we also need to remove the html tags in the text.

In [3]:
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft

#remove html tags in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '<[^>]+>', ' '))
#remove special characters and line breaks in the text with regular expression
reviews = reviews.withColumn('review', fn.regexp_replace(fn.col("review"), '([^\s\w_]|_)+', ' ')).withColumn('review', fn.regexp_replace(fn.col("review"), '[\n\r]', ' '))
reviews.take(1)

[Row(review='One of the other reviewers has mentioned that after watching just 1 Oz episode you ll be hooked  They are right  as this is exactly what happened with me   The first thing that struck me about Oz was its brutality and unflinching scenes of violence  which set in right from the word GO  Trust me  this is not a show for the faint hearted or timid  This show pulls no punches with regards to drugs  sex or violence  Its is hardcore  in the classic use of the word   It is called OZ as that is the nickname given to the Oswald Maximum Security State Penitentary  It focuses mainly on Emerald City  an experimental section of the prison where all the cells have glass fronts and face inwards  so privacy is not high on the agenda  Em City is home to many Aryans  Muslims  gangstas  Latinos  Christians  Italians  Irish and more so scuffles  death stares  dodgy dealings and shady agreements are never far away   I would say the main appeal of the show is due to the fact that it goes where 

Now let's create tokenizer to start the data processing.

In [4]:
tokenizer = ft.RegexTokenizer(inputCol='review', outputCol='review_tok', pattern='\s+|[,.\"/!]')
tokenizer.transform(reviews).select('review_tok').take(1)

[Row(review_tok=['one', 'of', 'the', 'other', 'reviewers', 'has', 'mentioned', 'that', 'after', 'watching', 'just', '1', 'oz', 'episode', 'you', 'll', 'be', 'hooked', 'they', 'are', 'right', 'as', 'this', 'is', 'exactly', 'what', 'happened', 'with', 'me', 'the', 'first', 'thing', 'that', 'struck', 'me', 'about', 'oz', 'was', 'its', 'brutality', 'and', 'unflinching', 'scenes', 'of', 'violence', 'which', 'set', 'in', 'right', 'from', 'the', 'word', 'go', 'trust', 'me', 'this', 'is', 'not', 'a', 'show', 'for', 'the', 'faint', 'hearted', 'or', 'timid', 'this', 'show', 'pulls', 'no', 'punches', 'with', 'regards', 'to', 'drugs', 'sex', 'or', 'violence', 'its', 'is', 'hardcore', 'in', 'the', 'classic', 'use', 'of', 'the', 'word', 'it', 'is', 'called', 'oz', 'as', 'that', 'is', 'the', 'nickname', 'given', 'to', 'the', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'it', 'focuses', 'mainly', 'on', 'emerald', 'city', 'an', 'experimental', 'section', 'of', 'the', 'prison', 'where', 'all

Then remove stopwords in the text.

In [5]:
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='review_stop')
stopwords.transform(tokenizer.transform(reviews)).select('review_stop').take(1)

[Row(review_stop=['one', 'reviewers', 'mentioned', 'watching', '1', 'oz', 'episode', 'll', 'hooked', 'right', 'exactly', 'happened', 'first', 'thing', 'struck', 'oz', 'brutality', 'unflinching', 'scenes', 'violence', 'set', 'right', 'word', 'go', 'trust', 'show', 'faint', 'hearted', 'timid', 'show', 'pulls', 'punches', 'regards', 'drugs', 'sex', 'violence', 'hardcore', 'classic', 'use', 'word', 'called', 'oz', 'nickname', 'given', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'focuses', 'mainly', 'emerald', 'city', 'experimental', 'section', 'prison', 'cells', 'glass', 'fronts', 'face', 'inwards', 'privacy', 'high', 'agenda', 'em', 'city', 'home', 'many', 'aryans', 'muslims', 'gangstas', 'latinos', 'christians', 'italians', 'irish', 'scuffles', 'death', 'stares', 'dodgy', 'dealings', 'shady', 'agreements', 'never', 'far', 'away', 'say', 'main', 'appeal', 'show', 'due', 'fact', 'goes', 'shows', 'wouldn', 'dare', 'forget', 'pretty', 'pictures', 'painted', 'mainstream', 'audien

Now same as what we did in the exercise, let's create `CountVectorizer` to transform the text into term frequency vector. 

In [6]:
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf')
tokenized = stopwords.transform(tokenizer.transform(reviews))
tf.fit(tokenized).transform(tokenized).select('review_tf').take(1)

[Row(review_tf=SparseVector(101111, {2: 1.0, 10: 1.0, 13: 2.0, 17: 2.0, 28: 1.0, 32: 1.0, 35: 3.0, 39: 1.0, 45: 2.0, 46: 1.0, 50: 1.0, 53: 1.0, 54: 2.0, 57: 1.0, 83: 1.0, 85: 1.0, 91: 1.0, 93: 1.0, 97: 1.0, 101: 2.0, 108: 1.0, 121: 1.0, 128: 3.0, 138: 2.0, 160: 1.0, 161: 1.0, 169: 1.0, 174: 1.0, 184: 1.0, 191: 2.0, 195: 1.0, 217: 1.0, 235: 1.0, 249: 1.0, 250: 1.0, 251: 1.0, 264: 1.0, 278: 1.0, 286: 2.0, 302: 1.0, 316: 1.0, 324: 1.0, 370: 1.0, 386: 1.0, 409: 2.0, 438: 1.0, 448: 4.0, 453: 1.0, 459: 1.0, 480: 1.0, 501: 1.0, 514: 1.0, 526: 1.0, 534: 2.0, 535: 1.0, 567: 2.0, 582: 1.0, 685: 1.0, 707: 3.0, 754: 1.0, 779: 1.0, 826: 1.0, 921: 1.0, 939: 1.0, 1073: 3.0, 1092: 1.0, 1108: 1.0, 1146: 1.0, 1181: 1.0, 1210: 1.0, 1292: 1.0, 1293: 1.0, 1313: 1.0, 1348: 1.0, 1462: 1.0, 1475: 1.0, 1496: 1.0, 1619: 1.0, 1897: 1.0, 1917: 1.0, 1936: 1.0, 2019: 1.0, 2115: 1.0, 2206: 1.0, 2328: 1.0, 2345: 1.0, 2375: 1.0, 2422: 1.0, 2472: 1.0, 2591: 1.0, 2790: 1.0, 2794: 1.0, 2863: 1.0, 2899: 1.0, 2971: 6.0, 30

Then we use the `LDA` model to do topic modeling. We create the model here with 30 topics.

In [7]:
import pyspark.ml.clustering as clus
lda = clus.LDA(k=30, optimizer='online', maxIter=10, featuresCol=tf.getOutputCol())

Now let's build the pipeline to train the topic model from the raw data. It will take a while to run.

In [9]:
from pyspark.ml import Pipeline

#[Your Code] to build a ML pipeline to fit LDA
pipeline = Pipeline(stages=[tokenizer,stopwords,tf, lda])
pipeline_model = pipeline.fit(reviews)


topics = pipeline_model.transform(reviews)

topics.select('topicDistribution')

DataFrame[topicDistribution: vector]

Let's see if we have properly discovered the topics. This is just the same code we display topics in the exercise - we will reuse it several times here.

In [10]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
one
like
good
movie
really
great
time
well
best
much
story
see
made
watch
even
get
movies
people
two
*************************
topic: 1
*************************
film
movie
good
one
story
really
like
time
well
much
even
acting
love
see
great
get
first
way
m
watch
*************************
topic: 2
*************************
film
movie
one
trivialboring
well
great
best
like
much
really
also
time
better
two
seki
first
director
characters
see
man
*************************
topic: 3
*************************
film
like
plot
movie
know
one
see
ve
well
really
m
characters
real
character
ending
say
seen
even
also
time
*************************
topic: 4
*************************
film
movie
one
like
story
much
good
even
time
films
life
also
better
really
see
many
seen
get
way
love
*************************
topic: 5
*************************
film
one
movie
like
bad
time
people
even
good
well
story
also
get
best
love
life
characters
great
sex
doug
************

How do you think about the topics? Do they make sense? If you think the topics we get from the movie reviews should be better, let's continue to see what we can do to make them better.

One possible reason is that we have many words that do not show up frequently. That is, they are very specific words to certain movies but don't occur across reviews. Such words are not very meaningful and they do not represent common themes in those reviews. So here we limit the frequency of words to at least 5 and run LDA with pipeline again.

In [11]:
#filter the countvectorizer
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf', minDF=5)

In [12]:
pipeline = Pipeline(stages=[tokenizer,stopwords,tf, lda])
pipeline_model = pipeline.fit(reviews)


topics = pipeline_model.transform(reviews)

topics.select('topicDistribution')

DataFrame[topicDistribution: vector]

In [13]:
#Code to extract topics from models
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
one
movie
like
ponyo
good
time
much
well
even
really
great
story
people
also
two
see
love
best
man
*************************
topic: 1
*************************
film
movie
one
see
time
good
story
like
really
even
much
well
character
watch
great
think
seen
films
many
director
*************************
topic: 2
*************************
movie
like
one
people
good
time
film
puerto
even
watch
really
way
much
life
movies
make
know
get
m
comedy
*************************
topic: 3
*************************
movie
film
one
good
even
like
love
see
really
time
story
life
acting
also
get
way
bad
much
films
make
*************************
topic: 4
*************************
film
movie
one
good
like
othello
characters
make
get
din
films
scene
seen
well
story
see
first
ever
time
made
*************************
topic: 5
*************************
movie
lincoln
like
really
people
also
film
even
good
way
one
first
well
story
much
films
character
man
part
great
*********

It is expected that the topics are getting better but still not very satisfying. Some words may be very specific to some reviews. Also, there are lots of words shown in different topics many times; possibly they are too common so they shouldn't be that important. Let's take one more step to use TF-IDF vector rather than TF vector. To build IF-IDF, we first create TF with CountVectorizer then create IDF from TF vector. Then we run LDA model with IF-IDF vector.

In [17]:
#use tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="review_tf", vocabSize=10000)
idf = ft.IDF(inputCol=tf.getOutputCol(), outputCol="review_tfidf", minDocFreq=5)

In [19]:
pipeline = Pipeline(stages=[tokenizer,stopwords,tf,idf,lda])
pipeline_model = pipeline.fit(reviews)


topics = pipeline_model.transform(reviews)

topics.select('topicDistribution')


DataFrame[topicDistribution: vector]

In [20]:
#Code to extract topics from models
tf_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[4]
vocab = tf_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
film
one
stewart
story
tarzan
western
good
movie
fulci
well
chan
great
like
jackie
best
man
much
scott
dean
made
*************************
topic: 1
*************************
movie
film
one
like
good
great
dance
musical
even
better
love
story
superman
time
see
best
much
get
make
people
*************************
topic: 2
*************************
movie
one
good
time
well
film
characters
story
life
two
much
komodo
best
acting
cobra
really
like
way
first
donna
*************************
topic: 3
*************************
movie
film
really
well
way
book
good
people
one
work
think
also
team
bad
plot
great
new
lots
acting
another
*************************
topic: 4
*************************
film
one
like
show
movie
people
see
even
really
first
season
ford
bad
good
time
well
family
episode
series
episodes
*************************
topic: 5
*************************
film
one
well
good
really
also
never
movie
horror
much
friends
young
made
version
great
might
fil

The topics should be more reasonable now. You should believe they can still be further improved by cleaning up the text and tuning the hyperparameter.We can change the model configuration to see if you can get any further improvement.

# Movie Review Sentiment Analysis with Spark ML


In [21]:
#first let's confirm the potential labels
reviews.select('sentiment').distinct().show()

+---------+
|sentiment|
+---------+
| positive|
| negative|
+---------+



In [22]:
#let's create the binary numerical lable from postive/negative
reviews = reviews.withColumn('sentiment_label', fn.when(fn.col('sentiment')=='positive', 1.0).otherwise(0.0))
reviews.show()

+--------------------+---------+---------------+
|              review|sentiment|sentiment_label|
+--------------------+---------+---------------+
|One of the other ...| positive|            1.0|
|A wonderful littl...| positive|            1.0|
|I thought this wa...| positive|            1.0|
|Basically there s...| negative|            0.0|
|Petter Mattei s  ...| positive|            1.0|
|Probably my all t...| positive|            1.0|
|I sure would like...| positive|            1.0|
|This show was an ...| negative|            0.0|
|Encouraged by the...| negative|            0.0|
|If you like origi...| positive|            1.0|
|Phil the Alien is...| negative|            0.0|
|I saw this movie ...| negative|            0.0|
|So im not a big f...| negative|            0.0|
|The cast played S...| negative|            0.0|
|This a fantastic ...| positive|            1.0|
|Kind of drawn in ...| negative|            0.0|
|Some films just s...| positive|            1.0|
|This movie made i..

In [23]:
#split the training and testing set, with 80/20
reviews_train, reviews_test = reviews.randomSplit([0.8, 0.2], seed=200)

In [24]:
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline

lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol=idf.getOutputCol())

In [42]:

pipeline = Pipeline(stages=[tokenizer,stopwords,tf,idf,lr])
lr_model = pipeline.fit(reviews_train)


In [43]:
#make predictions with pipeline model
predictions = lr_model.transform(reviews_test)

In [44]:
predictions.columns

['review',
 'sentiment',
 'sentiment_label',
 'review_tok',
 'review_stop',
 'review_tf',
 'review_tfidf',
 'rawPrediction',
 'probability',
 'prediction']

In [46]:
import pyspark.ml.evaluation as ev
#model evaluation for binary classification
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9338446403526336
0.9273869863834877


The prediction performance looks acceptable. Here note that TF-IDF is a long vector (here we select top 10000 words, but still a large number), so let's try something different. As mentioned in the class, another way to model text is word embedding with the Word2Vec model. So next we create word vector and use it to predict sentiment.

In [47]:
#create word2vec model
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")

In [50]:
#same logistic regression model, but take output from word2vec model
lr = cl.LogisticRegression(maxIter=10, labelCol='sentiment_label', featuresCol=word2vec.getOutputCol())
pipeline = Pipeline(stages=[tokenizer,stopwords,word2vec,lr])
lr_model = pipeline.fit(reviews_train)
predictions = lr_model.transform(reviews_test)

In [51]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.9336958725706269
0.9304295574003703


Check the prediction performance with only 100 features; word2vec model is a very useful representation of words and it reduces dimensionality significantly. 

In the end, let's try an alternative model. In classfication, Support Vector Machine (SVM) is commonly used and let's see how we can use it here. We will keep the orginal configuration of word2vec model (vector size is 100) here for SVM.

In [52]:
#same word2vec model configuration is adopted here
word2vec = ft.Word2Vec(vectorSize=100, minCount=5, inputCol=stopwords.getOutputCol(), outputCol="review_word2vec")
#create svm with LinearSVC, with features from word2vec model outputCol
svm = cl.LinearSVC(maxIter=10, labelCol='sentiment_label', featuresCol=word2vec.getOutputCol())

In [53]:
#build the ml pipeline and train the model; then make predictions 

pipeline = Pipeline(stages=[tokenizer,stopwords,word2vec,svm])
svm_model = pipeline.fit(reviews_train)
predictions = svm_model.transform(reviews_test)

In [54]:
#model evaluation, here slightly different for LinearSVC
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='sentiment_label')
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

0.933331116008816
0.9304339792841155


### SPARK STREAMING FOR REAL TIME PREDICTIONS
Now we have our sentiment prediction model with acceptable predictive performance.That is, with a data stream, we will use the trained model to make real-time predictions. We will still use IMDB reviews and here we will create a simulated data stream from files in a directory, then receive the review stream and predict sentiment using spark structured streaming.

We will first use `streaming_producer.ipynb` specifically for this so that we can simulate a data stream to send files incrementally into a directory and we can read stream from the same directory. To do that, we use `streaming_producer.ipynb` - it will send random reviews in the form of csv files into `review_stream` directory. Then this spark application will read review stream data from `review_stream` directory.

In [175]:
reviews_test_stream = reviews_test.select('review')

In [176]:
import pandas as pd
import numpy as np
import os

#here we save the data as csv to allow streaming_producer.ipynb to take random reviews from it
df = reviews_test_stream.toPandas()
df.to_csv('review_test.csv', index=False)

Now we can read from the stream. For convenience, we just take the existing spark dataframe to reuse the schema.

In [None]:
#run streaming_producer.ipynb notebook (but make sure you try to work on the code below first), then go back here to run subsequent code

In [190]:
#read from data stream in a folder
streaming_path = './review_stream'
if not os.path.exists(streaming_path):
    os.makedirs(streaming_path)
streaming_review = spark.readStream.schema(reviews_test_stream.schema).option("maxFilesPerTrigger", 1).csv(streaming_path)

In [191]:
streaming_review

DataFrame[review: string]

Here from the data stream, we want to know three results. First, how many positive or negative reviews we have received in real time? Second, how many positive or negative reviews in each time window of 60 seconds? Third, we are interested in the positive and negative reviews in each time window of 60 seconds, but with sliding window for every 30 seconds. So we will do some calculations, and to capture time window, we will use the current timestamp to create 'processing_time' (the time we receive the data) and apply window on this timestamp.

In [192]:
streaming_review_time = streaming_review.withColumn('processing_time', fn.current_timestamp())

In [193]:

streaming_sentiment = svm_model.transform(streaming_review_time)


In [194]:
streaming_sentiment = streaming_sentiment.withColumn('predicted', fn.when(fn.col('prediction')==1.0, 'positive').otherwise('negative'))

In [195]:

sentiment_count = streaming_sentiment.groupBy(streaming_sentiment.predicted).count()
sentiment_window_count = streaming_sentiment.groupBy(fn.window('processing_time','60 seconds'),streaming_sentiment.predicted).count()
sentiment_sliding_window_count = streaming_sentiment.groupBy(fn.window('processing_time','60 seconds','30 seconds'),streaming_sentiment.predicted).count()

In [196]:
print(sentiment_count.isStreaming)

True


In [197]:
sentiment_count

DataFrame[predicted: string, count: bigint]

In [198]:
print(sentiment_count.isStreaming)
print(sentiment_window_count.isStreaming)
print(sentiment_sliding_window_count.isStreaming)

True
True
True


In [199]:
query_sentiment = (
    sentiment_count
    .writeStream
    .format('memory')
    .queryName('sentiment')
    .outputMode('complete')
    .start()
)
query_sentiment_window = (
    sentiment_window_count
    .writeStream
    .format('memory')
    .queryName('sentiment_window')
    .outputMode('complete')
    .start()
)
query_sentiment_sliding_window = (
    sentiment_sliding_window_count
    .writeStream
    .format('memory')
    .queryName('sentiment_sliding_window')
    .outputMode('complete')
    .start()
)

In [200]:
import time
start_time = time.time()
current_time = time.time()
while current_time - start_time <= 120:
    spark.sql('select * from sentiment').show()
    spark.sql('select * from sentiment_window order by window').show(truncate=False)
    spark.sql('select * from sentiment_sliding_window order by window').show(truncate=False)
    current_time = time.time()
    time.sleep(10)

+---------+-----+
|predicted|count|
+---------+-----+
+---------+-----+

+------+---------+-----+
|window|predicted|count|
+------+---------+-----+
+------+---------+-----+

+------+---------+-----+
|window|predicted|count|
+------+---------+-----+
+------+---------+-----+

+---------+-----+
|predicted|count|
+---------+-----+
| positive|   63|
| negative|   55|
+---------+-----+

+------------------------------------------+---------+-----+
|window                                    |predicted|count|
+------------------------------------------+---------+-----+
|{2023-03-19 18:41:00, 2023-03-19 18:42:00}|positive |63   |
|{2023-03-19 18:41:00, 2023-03-19 18:42:00}|negative |55   |
+------------------------------------------+---------+-----+

+------------------------------------------+---------+-----+
|window                                    |predicted|count|
+------------------------------------------+---------+-----+
|{2023-03-19 18:41:00, 2023-03-19 18:42:00}|positive |18   |
|{202

In [201]:
query_sentiment.stop()
query_sentiment_window.stop()
query_sentiment_sliding_window.stop()