# Sentiment Analysis of Movie Reviews
## Data Set Description:
IMDB dataset is obtained from Kaggle URL: https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews?select=IMDB+Dataset.csv

It has 50K highly polar movie reviews. Each row in the data has a review and a sentiment (positive and negative) about a movie.

## Classification Problem:
My goal is to build a machine learning classifier that can predict whether a review about a movie is positive or negative based on the review text only. 



# Colab Setup for PySpark

In [1]:
!pip install --ignore-installed -q pyspark==3.2.1

[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 24.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [20]:
import os
import sys
import requests
from operator import add

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.feature import ChiSqSelector
from pyspark.mllib.evaluation import MulticlassMetrics
import time

spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
# Computation of time elapsed
# Start time
begin_time = time.time()
# End time
end_time = time.time() - begin_time
print("Total execution time to learn SVM model on training data: ", end_time)

# Pyscript

In [None]:
%%writefile run_Term_Project.py

from __future__ import print_function
import os
import sys
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.feature import ChiSqSelector
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# Function to predict the sentiment(positive, negative) of text for different classifiers
def getPrediction(text, model):
    # Check if text is one review or a list of reviews
    if (isinstance(text, str)):
        review = [text]
    else:
        review = text

    # Create a dataframe of review list
    df_new_data = spark.createDataFrame(review, StringType()) 

    # Rename the dataframe column
    df_new_data = df_new_data.withColumnRenamed("value", "review")

    # Predict sentiment using the SVM model
    predict = model.transform(df_new_data)
    predict = predict.select("review", "prediction")
    return predict

# main() starts here
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file> <output> ", file=sys.stderr)
        exit(-1)

    # Create spark context   
    spark = SparkSession \
        .builder \
        .appName("Sentiment Analysis of IMDB Dataset") \
        .getOrCreate()

    # Set your file path here 
    data_file = "/Users/munibasiddiqi/Desktop/BUCS777/Homework_Assignments/Term_Project/IMDB_Dataset.csv"
    
    # Colab path
    #data_file = "IMDB_Dataset.csv"

    # Google cloud path
    #data_file = "gs://met-cs-777-data/TrainingData.txt.bz2"

    # Upload data
    spark_df = spark.read.format("csv").option("header", "true").option("escape","\"").option("multiLine","true").load(data_file)

    # Add label column to dataframe (Positive sentiment = 1, Negative sentiment = 0)
    df = spark_df.withColumn("label", F.when(F.col("sentiment")=="positive",1).otherwise(0)).cache()

    # Split the dataset into training and test set
    df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed=100)

    # Check if the dataset is balanced or imbalanced
    print("Taining Data")
    df_train.groupby("label").count().show()
    print("Test Data")
    df_test.groupby("label").count().show()

    ############################### Data Preprocessing #######################################
    
    # Converts text to lowercase and split text on non-word character
    regexTokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
    
    # Remove stopwords
    remover = StopWordsRemover(inputCol = regexTokenizer.getOutputCol(), outputCol="filtered")

    # Remove stopWords=["br", 'm', 've', 're', 'll', 'd']
    remover2 = StopWordsRemover(inputCol= remover.getOutputCol(), outputCol="token",stopWords=["br", 'm', 've', 're', 'll', 'd'])

    # Extracts a vocabulary from document collections and generates a CountVectorizerModel
    # During the fitting process, CountVectorizer will select the top vocabSize words ordered 
    # by term frequency across the corpus.
    countVectorizer = CountVectorizer(inputCol= remover2.getOutputCol(), outputCol="rawFeatures", vocabSize=5000)
    
    # The IDF Model takes feature vectors and scales each feature. 
    # Intuitively, it down-weights features which appear frequently in a corpus
    idf = IDF(inputCol= countVectorizer.getOutputCol(), outputCol="featuresIDF")

    # Chi-Squared feature selection. It operates on labeled data with categorical features. 
    # ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose. 
    selector = ChiSqSelector(numTopFeatures=500, featuresCol=idf.getOutputCol(),
                         outputCol="features", labelCol="label")


############################### LOGISTIC REGRESSION ######################################

    ##################### Training Model ####################

    # Start time
    begin_time = time.time()

    # LogisticRegression classifier
    classifier_logreg = LogisticRegression(maxIter=20)

    # Chain indexers and classifier_logreg in a Pipeline
    pipeline_logreg = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf, classifier_logreg])
    
    # Train model. 
    model_logreg = pipeline_logreg.fit(df_train)

    # Print the coefficients and intercept for linear SVC
    print("Logistic Regression Model")
    print("First 10 Coefficients: " + str(model_logreg.stages[5].coefficients[:10]))
    print("Intercept: " + str(model_logreg.stages[5].intercept))

    # Top 20 vocabulary words
    #pipeline_logreg.getStages()
    vocabulary = model_logreg.stages[3].vocabulary
    print("Top twenty vocabulary words", vocabulary[0:20])

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to train logistic regression model on the train data: ", end_time)
   
    # Create a dataframe of top 20 vocabulary words to save as csv file
    df_top20 = spark.createDataFrame(vocabulary[0:20], StringType())

    # Store this result in a single file on the cluster
    df_top20.coalesce(1).write.format("csv").option("header",True).save(sys.argv[1]+'.top20_words_IDF')

    ##################### Model Testing ####################

    # Start time
    begin_time = time.time()

    # Make predictions.
    predictions_logreg = model_logreg.transform(df_test).cache()

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to test logistic regression model on the test data: ", end_time)

    ##################### Model evaluation ####################

    # Start time
    begin_time = time.time()
    
    # Covert dataframe to RDD for Model evaluation
    predictionAndLabels_logreg = predictions_logreg.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()

    # Instantiate metrics object
    metrics_logreg = MulticlassMetrics(predictionAndLabels_logreg)

    # Statistics by class
    #labels = data.map(lambda lp: lp.label).distinct().collect()
    print("Summary statistics for Logistic regression classifier")
    labels = [0.0, 1.0]
    for label in sorted(labels):
        print("Class %s precision = %s" % (label, metrics_logreg.precision(label)))
        print("Class %s recall = %s" % (label, metrics_logreg.recall(label)))
        print("Class %s F1 Measure = %s" % (label, metrics_logreg.fMeasure(label, beta=1.0)))
    
    print("Accuracy = %s" % metrics_logreg.accuracy)
    print("Confusion Matrix")
    print(metrics_logreg.confusionMatrix().toArray().astype(int))

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to evalute the performance of logistic regression model on test data: ", end_time)

    # Create a dataframe to store the summary of results
    data = [("Accuracy", str(metrics_logreg.accuracy)),("Confusion Matrix",str(metrics_logreg.confusionMatrix().toArray()))]
    df = spark.createDataFrame(data)

    # Store this result in a single file on the cluster
    df.coalesce(1).write.format("csv").option("header",True).save(sys.argv[1]+'.logreg_statistics')



 ############################### SUPPORT VECTOR MACHINE ######################################

    ##################### Training Model ####################

    # Start time
    begin_time = time.time()

    # SVM classifier
    classifier_lsvc = LinearSVC(maxIter=20)

    # Fit the model
    #lsvcModel = classifier.fit(df_train)

    # Chain indexers and classifier_lsvc in a Pipeline
    pipeline_lsvc = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf, classifier_lsvc])

    # Train model. 
    model_lsvc = pipeline_lsvc.fit(df_train)

    # Print the coefficients and intercept for linear SVC
    print("Support Vector Machine Model")
    print("First 10 Coefficients: " + str(model_lsvc.stages[5].coefficients[:10]))
    print("Intercept: " + str(model_lsvc.stages[5].intercept))

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to train SVM model on the train data: ", end_time)

    ##################### Model Testing ####################

    # Start time
    begin_time = time.time()

    # Make predictions.
    predictions_lsvc = model_lsvc.transform(df_test).cache()

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to test SVM model on the test data: ", end_time)

    ##################### Model evaluation ####################

    # Start time
    begin_time = time.time()
    
    # Covert dataframe to RDD for Model evaluation
    predictionAndLabels_lsvc = predictions_lsvc.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()

    # Instantiate metrics object
    metrics_lsvc = MulticlassMetrics(predictionAndLabels_lsvc)


    # Statistics by class
    #labels = data.map(lambda lp: lp.label).distinct().collect()
    labels = [0.0, 1.0]
    for label in sorted(labels):
        print("Class %s precision = %s" % (label, metrics_lsvc.precision(label)))
        print("Class %s recall = %s" % (label, metrics_lsvc.recall(label)))
        print("Class %s F1 Measure = %s" % (label, metrics_lsvc.fMeasure(label, beta=1.0)))
    
    print("Accuracy = %s" % metrics_lsvc.accuracy)
    print(metrics_lsvc.confusionMatrix().toArray().astype(int))

    # End time
    end_time = time.time() - begin_time
    print("Total execution time to evalute the performance of SVM model on test data: ", end_time)

    # Create a dataframe to store the summary of results
    data = [("Accuracy", str(metrics_lsvc.accuracy)),("Confusion Matrix",str(metrics_lsvc.confusionMatrix().toArray()))]
    df = spark.createDataFrame(data)

    # Store this result in a single file on the cluster
    df.coalesce(1).write.format("csv").option("header",True).save(sys.argv[1]+'.SVM_statistics')


########################## Predicting Sentiments on New Data (Reviews)  ############################

   # A list of reviews 
    new_data = ['This movie was horrible, plot was boring, acting was okay.',
                'The film really sucked. I want my money back',
                'What a beautiful movie. Great plot, great acting.',
                'Harry Potter was a good movie.'
                ]

    ################# Prediction using logistic regression model ###############

    # Call to getPrediction function with a list of reviews and logistic regression model
    predict = getPrediction(new_data, model_logreg)
    print("Prediction using Logistic Regression model:")
    predict.show()

    # Store this result in a single file on the cluster
    predict.coalesce(1).write.format("csv").option("header",True).save(sys.argv[1]+'.logreg_prediction')

    ##################### Prediction using SVM model model ####################

    # Call to getPrediction function with a list of reviews and SVM model
    predict = getPrediction(new_data, model_lsvc)
    print("Prediction using SVM model:")
    predict.show()

    # Store this result in a single file on the cluster
    predict.coalesce(1).write.format("csv").option("header",True).save(sys.argv[1]+'.SVM_prediction')

    # Stop spark context   
    spark.stop()


# Load the data

In [21]:
# Set your file path here 
data_file = "/Users/munibasiddiqi/Desktop/BUCS777/Homework_Assignments/Term_Project/IMDB_Dataset.csv"

In [22]:
# Upload data into a dataframe
spark_df = spark.read.format("csv").option("header", "true").option("escape","\"").option("multiLine","true").load(data_file)

In [23]:
# Top 10 rows
spark_df.show(10)

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
+--------------------+---------+
only showing top 10 rows



In [24]:
# Add label column to dataframe (Positive sentiment = 1, Negative sentiment = 0)
df = spark_df.withColumn("label", F.when(F.col("sentiment")=="positive",1).otherwise(0)).cache()

In [25]:
df.show()

+--------------------+---------+-----+
|              review|sentiment|label|
+--------------------+---------+-----+
|One of the other ...| positive|    1|
|A wonderful littl...| positive|    1|
|I thought this wa...| positive|    1|
|Basically there's...| negative|    0|
|Petter Mattei's "...| positive|    1|
|Probably my all-t...| positive|    1|
|I sure would like...| positive|    1|
|This show was an ...| negative|    0|
|Encouraged by the...| negative|    0|
|If you like origi...| positive|    1|
|Phil the Alien is...| negative|    0|
|I saw this movie ...| negative|    0|
|So im not a big f...| negative|    0|
|The cast played S...| negative|    0|
|This a fantastic ...| positive|    1|
|Kind of drawn in ...| negative|    0|
|Some films just s...| positive|    1|
|This movie made i...| negative|    0|
|I remember this f...| positive|    1|
|An awful film! It...| negative|    0|
+--------------------+---------+-----+
only showing top 20 rows



In [26]:
# Split the dataset into training and test set
print("Training Data")
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed=100)

Training Data


In [27]:
# Count
df_train.count()

34993

In [28]:
df_test.count()

15007

In [29]:
# Check if the dataset is balanced or imbalanced
df_train.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|17450|
|    0|17543|
+-----+-----+



In [30]:
df_test.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 7550|
|    0| 7457|
+-----+-----+



In [92]:
df_test.filter("label == 1").count()

7550

# Tokenize - Converts to lowercase and split text on non-word character

In [31]:
regexTokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
df_regexTokenized = regexTokenizer.transform(df_train)

countTokens = udf(lambda words: len(words), IntegerType())
df_regexTokenized.select("review", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(2)

#regexTokenized.select("words").show(truncate=0)
df_regexTokenized.show()

+--------------------+--------------------+------+
|              review|               words|tokens|
+--------------------+--------------------+------+
|\b\b\b\bA Turkish...|[a, turkish, bath...|   635|
|!!! Spoiler alert...|[spoiler, alert, ...|   177|
+--------------------+--------------------+------+
only showing top 2 rows

+--------------------+---------+-----+--------------------+
|              review|sentiment|label|               words|
+--------------------+---------+-----+--------------------+
|\b\b\b\bA Turkish...| positive|    1|[a, turkish, bath...|
|!!! Spoiler alert...| negative|    0|[spoiler, alert, ...|
|!!!! MILD SPOILER...| negative|    0|[mild, spoilers, ...|
|!!!! POSSIBLE MIL...| negative|    0|[possible, mild, ...|
|!!!!! OF COURSE T...| negative|    0|[of, course, ther...|
|!!!!! POSSIBLE SP...| negative|    0|[possible, spoile...|
|" Domino " has be...| positive|    1|[domino, has, bee...|
|" I have wrestled...| positive|    1|[i, have, wrestle...|
|" It had

# Remove stopwords

In [32]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_stopwords_removed = remover.transform(df_regexTokenized)
df_stopwords_removed.show(2)
df_stopwords_removed.select("filtered").show(2, truncate=0)


+--------------------+---------+-----+--------------------+--------------------+
|              review|sentiment|label|               words|            filtered|
+--------------------+---------+-----+--------------------+--------------------+
|\b\b\b\bA Turkish...| positive|    1|[a, turkish, bath...|[turkish, bath, s...|
|!!! Spoiler alert...| negative|    0|[spoiler, alert, ...|[spoiler, alert, ...|
+--------------------+---------+-----+--------------------+--------------------+
only showing top 2 rows

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [77]:

remover = StopWordsRemover(inputCol="filtered", outputCol="token",stopWords=["br", 'm', 've', 're', 'll', 'd'])
df_stopwords_removed2 = remover.transform(df_stopwords_removed)
df_stopwords_removed2.show(2)
df_stopwords_removed2.select("token").show(1, truncate=0)

+--------------------+---------+-----+--------------------+--------------------+--------------------+
|              review|sentiment|label|               words|            filtered|               token|
+--------------------+---------+-----+--------------------+--------------------+--------------------+
|\b\b\b\bA Turkish...| positive|    1|[a, turkish, bath...|[turkish, bath, s...|[turkish, bath, s...|
|!!! Spoiler alert...| negative|    0|[spoiler, alert, ...|[spoiler, alert, ...|[spoiler, alert, ...|
+--------------------+---------+-----+--------------------+--------------------+--------------------+
only showing top 2 rows

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
df_stopwords_removed2.select("token") \
    .withColumn("tokens", countTokens(col("token"))).show(2)

+--------------------+------+
|               token|tokens|
+--------------------+------+
|[turkish, bath, s...|   313|
|[spoiler, alert, ...|    81|
+--------------------+------+
only showing top 2 rows



# Extracts a vocabulary from document collections and generates a CountVectorizerModel.During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus.

In [35]:
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="token", outputCol="rawFeatures", vocabSize=5000)
cv_model = cv.fit(df_stopwords_removed2)

df_featurizedData = cv_model.transform(df_stopwords_removed2)

df_featurizedData.show(2)
df_featurizedData.select("rawFeatures").show(2,truncate=0)

+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+
|              review|sentiment|label|               words|            filtered|               token|         rawFeatures|
+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+
|\b\b\b\bA Turkish...| positive|    1|[a, turkish, bath...|[turkish, bath, s...|[turkish, bath, s...|(5000,[0,1,2,3,4,...|
|!!! Spoiler alert...| negative|    0|[spoiler, alert, ...|[spoiler, alert, ...|[spoiler, alert, ...|(5000,[0,1,2,3,6,...|
+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Top 10 vocabulary words

In [36]:
cv_model.vocabulary[0:10]

['movie',
 'film',
 'one',
 'like',
 'good',
 'time',
 'even',
 'story',
 'really',
 'see']

In [76]:
df_top10 = spark.createDataFrame(cv_model.vocabulary[0:10], StringType())
df_top10.withColumnRenamed("value", "Top 10 vocabulary words").show(truncate=0)

+-----------------------+
|Top 10 vocabulary words|
+-----------------------+
|movie                  |
|film                   |
|one                    |
|like                   |
|good                   |
|time                   |
|even                   |
|story                  |
|really                 |
|see                    |
+-----------------------+



In [38]:
cv = CountVectorizer(inputCol="token", outputCol="rawFeatures", vocabSize=5000, minDF=100, maxDF= 0.75)
cv_model = cv.fit(df_stopwords_removed2)

cv_model.vocabulary[0:10]

['movie',
 'film',
 'one',
 'like',
 'good',
 'time',
 'even',
 'story',
 'really',
 'see']

# IDF: IDF is an Estimator which is fit on a dataset and produces an IDFModel. The IDFModel takes feature vectors (generally created from HashingTF or CountVectorizer) and scales each feature. Intuitively, it down-weights features which appear frequently in a corpus.

In [91]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_featurizedData)
df_rescaledData = idfModel.transform(df_featurizedData)
df_rescaledData.cache()
df_rescaledData.select("sentiment", "label", "features").show(10)
#df_rescaledData.show(2)



+---------+-----+--------------------+
|sentiment|label|            features|
+---------+-----+--------------------+
| positive|    1|(5000,[0,1,2,3,4,...|
| negative|    0|(5000,[0,1,2,3,6,...|
| negative|    0|(5000,[0,3,4,7,11...|
| negative|    0|(5000,[1,3,5,6,17...|
| negative|    0|(5000,[1,2,9,11,1...|
| negative|    0|(5000,[0,1,3,7,15...|
| positive|    1|(5000,[0,1,3,7,8,...|
| positive|    1|(5000,[1,2,3,5,7,...|
| negative|    0|(5000,[0,8,16,18,...|
| negative|    0|(5000,[1,3,10,25,...|
+---------+-----+--------------------+
only showing top 10 rows



# Data Preprocessing

In [40]:
#tokenizer = Tokenizer(inputCol="text", outputCol="words")
# Converts text to lowercase and split text on non-word character
regexTokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
    
# Remove stopwords
remover = StopWordsRemover(inputCol = regexTokenizer.getOutputCol(), outputCol="filtered")

# Remove stopWords=["br", 'm', 've', 're', 'll', 'd']
remover2 = StopWordsRemover(inputCol= remover.getOutputCol(), outputCol="token",stopWords=["br", 'm', 've', 're', 'll', 'd'])

# Extracts a vocabulary from document collections and generates a CountVectorizerModel
# During the fitting process, CountVectorizer will select the top vocabSize words ordered 
# by term frequency across the corpus.
countVectorizer = CountVectorizer(inputCol= remover2.getOutputCol(), outputCol="rawFeatures", vocabSize=5000)
    
# The IDF Model takes feature vectors and scales each feature. 
# Intuitively, it down-weights features which appear frequently in a corpus
idf = IDF(inputCol= countVectorizer.getOutputCol(), outputCol="features")

# Logistic Regression Classifier
## Model Training

In [41]:
# LogisticRegression classifier
classifier_logreg = LogisticRegression(maxIter=20)

# Chain indexers and classifier_logreg in a Pipeline
pipeline_logreg = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf, classifier_logreg])
# Train model. 
model_logreg = pipeline_logreg.fit(df_train)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(model_logreg.stages[5].coefficients[:10]))
print("Intercept: " + str(model_logreg.stages[5].intercept))

Coefficients: [-0.07463579  0.01510059 -0.09580708  0.06750367  0.29349161  0.26400688
 -0.18612552  0.0903201  -0.04318012  0.31925709]
Intercept: -0.1429045478912475


In [323]:
pipeline_logreg.getStages()
vocabulary = model_logreg.stages[3].vocabulary
print(vocabulary[:200])

['movie', 'film', 'one', 'like', 'good', 'time', 'even', 'story', 'really', 'see', 'well', 'much', 'get', 'bad', 'people', 'great', 'also', 'first', 'made', 'make', 'way', 'movies', 'characters', 'think', 'films', 'watch', 'character', 'two', 'many', 'seen', 'life', 'love', 'never', 'acting', 'show', 'plot', 'best', 'know', 'little', 'ever', 'man', 'better', 'end', 'scene', 'still', 'say', 'scenes', 'something', 'go', 'back', 'real', 'watching', 'thing', 'director', 'actors', 'years', 'doesn', 'though', 'didn', 'funny', 'another', '10', 'work', 'old', 'actually', 'look', 'nothing', 'going', 'makes', 'find', 'lot', 'new', 'every', 'part', 'us', 'things', 'world', 'cast', 'horror', 'want', 'quite', 'pretty', 'around', 'seems', 'young', 'take', 'however', 'long', 'got', 'big', 'enough', 'thought', 'fact', 'give', 'series', 'comedy', 'right', 'must', 'music', 'action', 'may', 'without', 'come', 'guy', 'always', 'isn', 'saw', 'point', 'original', 'gets', 'done', 'times', 'almost', 'role', '

## Model Testing

In [42]:
# Make predictions.
predictions_logreg = model_logreg.transform(df_test).cache()

selected = predictions_logreg.select("label", "sentiment", "prediction")
selected.show(10)

+-----+---------+----------+
|label|sentiment|prediction|
+-----+---------+----------+
|    0| negative|       0.0|
|    1| positive|       1.0|
|    1| positive|       1.0|
|    1| positive|       0.0|
|    1| positive|       1.0|
|    1| positive|       1.0|
|    0| negative|       0.0|
|    1| positive|       0.0|
|    0| negative|       0.0|
|    0| negative|       0.0|
+-----+---------+----------+
only showing top 10 rows



## Model evaluation

In [43]:
# Covert dataframe to RDD for Model evaluation
predictionAndLabels_logreg = predictions_logreg.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()
predictionAndLabels_logreg.take(40)

[(0.0, 0.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 0.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (1.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 0.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 0.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0)]

In [44]:
# Model evaluation
from pyspark.mllib.evaluation import MulticlassMetrics
# Instantiate metrics object
metrics_logreg = MulticlassMetrics(predictionAndLabels_logreg)

# Overall statistics
print("Summary statistics for Logistic regression classifier")
print("Precision = %s" % metrics_logreg.precision(1.0))
print("Recall = %s" % metrics_logreg.recall(1.0))
print("F1 Score = %s" % metrics_logreg.fMeasure(1.0))
print("Accuracy = %s" % metrics_logreg.accuracy)

# Statistics by class
#labels = data.map(lambda lp: lp.label).distinct().collect()
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_logreg.precision(label)))
    print("Class %s recall = %s" % (label, metrics_logreg.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics_logreg.fMeasure(label, beta=1.0)))

print(metrics_logreg.confusionMatrix().toArray().astype(int))



Summary statistics for Logistic regression classifier
Precision = 0.8560264900662252
Recall = 0.858414132022845
F1 Score = 0.8572186484514889
Accuracy = 0.8565336176450989
Class 0.0 precision = 0.8570470698672389
Class 0.0 recall = 0.8546402781492377
Class 0.0 F1 Measure = 0.8558419819216606
Class 1.0 precision = 0.8560264900662252
Class 1.0 recall = 0.858414132022845
Class 1.0 F1 Measure = 0.8572186484514889
[[6391 1087]
 [1066 6463]]


# SVM
## Training Model

In [45]:
# SVM classifier
classifier_lsvc = LinearSVC(maxIter=20)

# Fit the model
#lsvcModel = classifier.fit(df_train)

# Chain indexers and classifier_lsvc in a Pipeline
pipeline_lsvc = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf, classifier_lsvc])

# Train model. 
model_lsvc = pipeline_lsvc.fit(df_train)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(model_lsvc.stages[5].coefficients[:10]))
print("Intercept: " + str(model_lsvc.stages[5].intercept))

Coefficients: [-0.02944016  0.0002544   0.00930135 -0.00202794  0.08904828  0.04703349
 -0.07170383  0.00976485 -0.00446598  0.10477262]
Intercept: -0.07833200139903072


In [46]:
pipeline_lsvc.getStages()
vocabulary = model_lsvc.stages[3].vocabulary
print(vocabulary[:200])

['movie', 'film', 'one', 'like', 'good', 'time', 'even', 'story', 'really', 'see', 'well', 'much', 'get', 'bad', 'people', 'great', 'also', 'first', 'made', 'make', 'way', 'movies', 'characters', 'think', 'films', 'watch', 'character', 'two', 'many', 'seen', 'life', 'love', 'never', 'acting', 'show', 'plot', 'best', 'know', 'little', 'ever', 'man', 'better', 'end', 'scene', 'still', 'say', 'scenes', 'something', 'go', 'back', 'real', 'watching', 'thing', 'director', 'actors', 'years', 'doesn', 'though', 'didn', 'funny', 'another', '10', 'work', 'old', 'actually', 'look', 'nothing', 'going', 'makes', 'find', 'lot', 'new', 'every', 'part', 'us', 'things', 'world', 'cast', 'horror', 'want', 'quite', 'pretty', 'around', 'seems', 'young', 'take', 'however', 'long', 'got', 'big', 'enough', 'thought', 'fact', 'give', 'series', 'comedy', 'right', 'must', 'music', 'action', 'may', 'without', 'come', 'guy', 'always', 'isn', 'saw', 'point', 'original', 'gets', 'done', 'times', 'almost', 'role', '

## Model Testing

In [47]:
# Make predictions.
predictions_lsvc = model_lsvc.transform(df_test).cache()

selected = predictions_lsvc.select("label", "sentiment", "prediction")
selected.show(10)

+-----+---------+----------+
|label|sentiment|prediction|
+-----+---------+----------+
|    0| negative|       0.0|
|    1| positive|       1.0|
|    1| positive|       1.0|
|    1| positive|       0.0|
|    1| positive|       1.0|
|    1| positive|       1.0|
|    0| negative|       0.0|
|    1| positive|       0.0|
|    0| negative|       0.0|
|    0| negative|       0.0|
+-----+---------+----------+
only showing top 10 rows



# Model evaluation

In [48]:
# Model evaluation
from pyspark.mllib.evaluation import MulticlassMetrics

# Covert dataframe to RDD for Model evaluation
predictionAndLabels_lsvc = predictions_lsvc.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()
predictionAndLabels_lsvc.take(40)

# Instantiate metrics object
metrics_lsvc = MulticlassMetrics(predictionAndLabels_lsvc)

# Overall statistics
print("Summary statistics for SVM classier")
print("Precision = %s" % metrics_lsvc.precision(1.0))
print("Recall = %s" % metrics_lsvc.recall(1.0))
print("F1 Score = %s" % metrics_lsvc.fMeasure(1.0))
print("Accuracy = %s" % metrics_lsvc.accuracy)

# Statistics by class
#labels = data.map(lambda lp: lp.label).distinct().collect()
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_lsvc.precision(label)))
    print("Class %s recall = %s" % (label, metrics_lsvc.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics_lsvc.fMeasure(label, beta=1.0)))

print(metrics_lsvc.confusionMatrix().toArray().astype(int))

Summary statistics for SVM classier
Precision = 0.8794701986754967
Recall = 0.8825093035619351
F1 Score = 0.8809871301578877
Accuracy = 0.8804557872992603
Class 0.0 precision = 0.8814536676947834
Class 0.0 recall = 0.8783910196445276
Class 0.0 F1 Measure = 0.8799196787148594
Class 1.0 precision = 0.8794701986754967
Class 1.0 recall = 0.8825093035619351
Class 1.0 F1 Measure = 0.8809871301578877
[[6573  910]
 [ 884 6640]]


# Data Preprocessing

In [85]:
# Converts text to lowercase and split text on non-word character
regexTokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
    
# Remove stopwords
remover = StopWordsRemover(inputCol = regexTokenizer.getOutputCol(), outputCol="filtered")

# Remove stopWords=["br", 'm', 've', 're', 'll', 'd']
remover2 = StopWordsRemover(inputCol= remover.getOutputCol(), outputCol="token",stopWords=["br", 'm', 've', 're', 'll', 'd'])

# Extracts a vocabulary from document collections and generates a CountVectorizerModel
# During the fitting process, CountVectorizer will select the top vocabSize words ordered 
# by term frequency across the corpus.
countVectorizer = CountVectorizer(inputCol= remover2.getOutputCol(), outputCol="rawFeatures", vocabSize=5000)
    
# The IDF Model takes feature vectors and scales each feature. 
# Intuitively, it down-weights features which appear frequently in a corpus
idf_features = IDF(inputCol= countVectorizer.getOutputCol(), outputCol="featuresIDF")

# Feature Selection

In [86]:
# Chi-Squared feature selection. It operates on labeled data with categorical features. 
# ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose. 
selector = ChiSqSelector(numTopFeatures=500, featuresCol=idf_features.getOutputCol(),
                         outputCol="features", labelCol="label")

## Logistic Regression - Feature Selection

In [51]:
classifier_logreg = LogisticRegression(maxIter=20)

print("Logistic Regression Classifier output with top %d features selected using ChiSqSelector " % selector.getNumTopFeatures())

# Chain indexers and classifier_logreg in a Pipeline
pipeline_logreg_feature_selection = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf_features, selector, classifier_logreg])

# Train model. 
model_logreg_feature_selection = pipeline_logreg_feature_selection.fit(df_train)

# Make predictions.
predictions_logreg_feature_selection = model_logreg_feature_selection.transform(df_test).cache()

#selected = predictions_logreg.select("label","sentiment" , "probability", "prediction")
#selected.show(2)

predictionAndLabels_logreg_feature_selection = predictions_logreg_feature_selection.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()
#predictionAndLabels_logreg.take(40)

# Model evaluation

# Instantiate metrics object
metrics_logreg_feature_selection = MulticlassMetrics(predictionAndLabels_logreg_feature_selection)

# Overall statistics
print("Summary statistics for Logistic Regression classifier with feature reduction")
print("Precision = %s" % metrics_logreg_feature_selection.precision(1.0))
print("Recall = %s" % metrics_logreg_feature_selection.recall(1.0))
print("F1 Score = %s" % metrics_logreg_feature_selection.fMeasure(1.0))
print("Accuracy = %s" % metrics_logreg_feature_selection.accuracy)

# Statistics by class
#labels = data.map(lambda lp: lp.label).distinct().collect()
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_logreg_feature_selection.precision(label)))
    print("Class %s recall = %s" % (label, metrics_logreg_feature_selection.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics_logreg_feature_selection.fMeasure(label, beta=1.0)))

print(metrics_logreg_feature_selection.confusionMatrix().toArray().astype(int))

Logistic Regression Classifier output with top 500 features selected using ChiSqSelector 
Summary statistics for Logistic Regression classifier with feature reduction
Precision = 0.8882119205298014
Recall = 0.8742015382609829
F1 Score = 0.8811510413244859
Accuracy = 0.8794562537482509
Class 0.0 precision = 0.8705913906396674
Class 0.0 recall = 0.8849509269356598
Class 0.0 F1 Measure = 0.8777124315554655
Class 1.0 precision = 0.8882119205298014
Class 1.0 recall = 0.8742015382609829
Class 1.0 F1 Measure = 0.8811510413244859
[[6492  844]
 [ 965 6706]]


## SVM - Feature Selection

In [81]:
classifier_lsvc = LinearSVC(maxIter=20)

print("SVM output with top %d features selected using ChiSqSelector" % selector.getNumTopFeatures())

# Chain indexers and classifier_logreg in a Pipeline
pipeline_lsvc__feature_selection = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, idf_features, selector, classifier_lsvc])

# Train model. 
model_lsvc_feature_selection = pipeline_lsvc__feature_selection.fit(df_train)

# Make predictions.
predictions_lsvc__feature_selection = model_lsvc_feature_selection.transform(df_test).cache()

#selected = predictions_lsvc.select("label", "sentiment", "prediction")
#selected.show(10)
predictionAndLabels_lsvc_feature_selection = predictions_lsvc__feature_selection.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()
#predictionAndLabels_lsvc.take(40)

# Model evaluation

# Instantiate metrics object
metrics_lsvc_feature_selection = MulticlassMetrics(predictionAndLabels_lsvc_feature_selection)

# Overall statistics
print("Summary statistics for SVM classifier with feature reduction")
print("Precision = %s" % metrics_lsvc_feature_selection.precision(1.0))
print("Recall = %s" % metrics_lsvc_feature_selection.recall(1.0))
print("F1 Score = %s" % metrics_lsvc_feature_selection.fMeasure(1.0))
print("Accuracy = %s" % metrics_lsvc_feature_selection.accuracy)

# Statistics by class
#labels = data.map(lambda lp: lp.label).distinct().collect()
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_lsvc_feature_selection.precision(label)))
    print("Class %s recall = %s" % (label, metrics_lsvc_feature_selection.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics_lsvc_feature_selection.fMeasure(label, beta=1.0)))

print(metrics_lsvc_feature_selection.confusionMatrix().toArray().astype(int))

SVM output with top 500 features selected using ChiSqSelector




Summary statistics for SVM classifier with feature reduction
Precision = 0.8921854304635761
Recall = 0.8710720289667658
F1 Score = 0.8815023228423738
Accuracy = 0.8793229826081163
Class 0.0 precision = 0.8663001206919673
Class 0.0 recall = 0.8880945834478966
Class 0.0 F1 Measure = 0.8770619781413347
Class 1.0 precision = 0.8921854304635761
Class 1.0 recall = 0.8710720289667658
Class 1.0 F1 Measure = 0.8815023228423738
[[6460  814]
 [ 997 6736]]


In [53]:
def getPrediction(text, model):
    # Check if text is one review or a list of reviews
    if (isinstance(text, str)):
        review = [text]
    else:
        review = text

    # Create a dataframe of review list
    df_new_data = spark.createDataFrame(review, StringType()) 

    # Rename the dataframe column
    df_new_data = df_new_data.withColumnRenamed("value", "review")

    # Predict sentiment using the SVM model
    predict = model.transform(df_new_data)
    predict = predict.select("review", "prediction")
    return predict

# Prediction Using the Four Models
## model_lsvc, model_logreg, model_lsvc_feature_selection, model_logreg_feature_selection

In [54]:
# model_lsvc, model_logreg, model_lsvc_features, model_logreg_features
# Call to getPrediction function with just one review text
predict = getPrediction('Harry Potter was a good movie.', model_lsvc)
type(predict)
predict.show()

+--------------------+----------+
|              review|prediction|
+--------------------+----------+
|Harry Potter was ...|       1.0|
+--------------------+----------+



In [58]:
# model_lsvc, model_logreg, model_lsvc_features, model_logreg_features
# Call to getPrediction function with just one review text
predict = getPrediction('Harry Potter was a good movie.', model_lsvc_feature_selection)
type(predict)
predict.show()

+--------------------+----------+
|              review|prediction|
+--------------------+----------+
|Harry Potter was ...|       1.0|
+--------------------+----------+



In [56]:
# Call to getPrediction function with a list of reviews
new_data = ['This movie was horrible, plot was boring, acting was okay.',
            'The film really sucked. I want my money back',
            'What a beautiful movie. Great plot, great acting.',
            'Harry Potter was a good movie.'
            'It was horrible and great at the sametime.'
            ]

predict = getPrediction(new_data, model_logreg_feature_selection)
predict.show(truncate=0)

+--------------------+----------+
|              review|prediction|
+--------------------+----------+
|This movie was ho...|       0.0|
|The film really s...|       0.0|
|What a beautiful ...|       1.0|
|Harry Potter was ...|       0.0|
+--------------------+----------+



In [63]:
# Call to getPrediction function with a list of reviews
new_data = ['This movie was horrible, plot was boring, acting was okay.',
            'The film really sucked. I want my money back',
            'What a beautiful movie. Great plot, great acting.',
            'Harry Potter was a good movie.'
            ]

predict = getPrediction(new_data, model_logreg_feature_selection)
predict.show(truncate=0)

+----------------------------------------------------------+----------+
|review                                                    |prediction|
+----------------------------------------------------------+----------+
|This movie was horrible, plot was boring, acting was okay.|0.0       |
|The film really sucked. I want my money back              |0.0       |
|What a beautiful movie. Great plot, great acting.         |1.0       |
|Harry Potter was a good movie.                            |1.0       |
+----------------------------------------------------------+----------+



# Gradient-boosted tree classifier

In [60]:
from pyspark.ml.classification import GBTClassifier

classifier_gbt = GBTClassifier(maxIter=10, featuresCol="rawFeatures", labelCol="label", predictionCol="prediction")

# Chain indexers and classifier_logreg in a Pipeline
pipeline_gbt = Pipeline(stages=[regexTokenizer, remover, remover2, countVectorizer, classifier_gbt])

# Train model.  This also runs the indexers.
model_gbt = pipeline_gbt.fit(df_train)

# Make predictions.
predictions_gbt = model_gbt.transform(df_test).cache()

# Select example rows to display.
#predictions_gbt.select("review", "label", "prediction").show(5)
#selected = predictions_gbt.select("label", "prediction")
#selected.show(10)

predictionAndLabels_gbt = predictions_gbt.select("label",  "prediction").rdd.map(lambda x : (float(x[0]), float(x[1]))).cache()
#predictionAndLabels_lsvc.take(40)

# Model evaluation

# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)

# Overall statistics
print("Summary Stats")
print("Precision = %s" % metrics_gbt.precision(1.0))
print("Recall = %s" % metrics_gbt.recall(1.0))
print("F1 Score = %s" % metrics_gbt.fMeasure(1.0))
print("Accuracy = %s" % metrics_gbt.accuracy)

# Statistics by class
#labels = data.map(lambda lp: lp.label).distinct().collect()
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_gbt.precision(label)))
    print("Class %s recall = %s" % (label, metrics_gbt.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics_gbt.fMeasure(label, beta=1.0)))

print(metrics_gbt.confusionMatrix().toArray().astype(int))



Summary Stats
Precision = 0.8654304635761589
Recall = 0.7140203256474702
F1 Score = 0.7824681156816956
Accuracy = 0.7579129739454921
Class 0.0 precision = 0.6490545795896473
Class 0.0 recall = 0.8265027322404371
Class 0.0 F1 Measure = 0.7271088409824983
Class 1.0 precision = 0.8654304635761589
Class 1.0 recall = 0.7140203256474702
Class 1.0 F1 Measure = 0.7824681156816956
[[4840 1016]
 [2617 6534]]


In [62]:
# Call to getPrediction function with a list of reviews
new_data = ['this movie was horrible, plot was boring, acting was okay.',
            'the film really sucked. i want my money back',
            'what a beautiful movie. great plot, great acting.',
            'Harry Potter was a great movie'
            ]

predict = getPrediction(new_data, model_gbt)
predict.show()

+--------------------+----------+
|              review|prediction|
+--------------------+----------+
|this movie was ho...|       0.0|
|the film really s...|       1.0|
|what a beautiful ...|       1.0|
|Harry Potter was ...|       1.0|
+--------------------+----------+

