In [None]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

# Sentiment Analaysis ESPRIT

## Part-1: Initials (Preparing Train and Test Data **Frames**)

> ### I.  Importing Modules

In [None]:
# general purpose modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import os
import sys

# pyspark modules
import pyspark
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

# spark nlp modules
import sparknlp



> ### II.  Starting a Pyspark Session

In [None]:
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

> ### III.  Retrieving Train and Test Data

In [None]:
comments_train = spark.read.options(delimiter=';').csv('train data product reviews.csv', inferSchema=True, header=True)
comments_train.show(truncate=True, n=5)
comments_train.count(), comments_train.select('label').distinct().count()

In 'label' column we have 0's and 1's only. Let's rearrange this data frame as *df_train*.

In [None]:
df_train = comments_train.select('text', 'label')
df_train.show(truncate=True, n=5)
df_test.groupBy('label').count().show()

In [None]:
df_train.describe().show()

In [None]:
def balance_check(df, col='label'):
  """
  Checks the balance of data regarding labels and displays.
  df: data frame
  col: string column
  """
  positive = df.where(df.label == '1').count()
  negative = df.where(df.label == '0').count()
  pos_percent = 100 * positive/(positive + negative)
  neg_percent = 100 * negative/(positive + negative)
  print(f'Positive Comments: {positive} which is %{pos_percent}')
  print(f'Negative Comments: {negative} which is %{neg_percent}')

balance_check(df_train)

Given the distribution of the comments in training data we have a relative unbalanced data (~ 0.28 - 0.72). Before deciding whether applying a downsizing or upsizing technique, let's first check whether do we have duplications in the training data.

In [None]:
import pyspark.sql.functions as funcs
df_train.groupBy(df_train.text)\
    .count()\
    .where(funcs.col('count') > 1)\
    .select(funcs.sum('count'))\
    .show()

Let's drop the duplicated rows and keep only the first occurences.

In [None]:
df_train = df_train.dropDuplicates((['text']))
balance_check(df_train)

After removing the duplications, the distribution of comments in the training data changed slightly to the positive (more balanced ~ 0.33 - 0.67). For now, we keep the data in this distribution and do not apply any downsizing or upsizing technique (or generation), but we use the F1 score as a performance metric to avoid being biased by the data distribution.

Now we are going to maintain a *df_test* similar to *df_train*.

In [None]:
comments_test = spark.read.options(delimiter=';').csv('test data product reviews.csv', inferSchema=True, header=True)
comments_test.show(truncate=True, n=5)
comments_test.count()

We are going to use *regex* to describe patters to obtain a clean data frame with columns text and label.

In [None]:
regex_pattern = r'"*([01])(.+)'
comments_test = comments_test.withColumn('text', regexp_extract(col('label,text'), regex_pattern, 2))\
                 .withColumn('label', regexp_extract(col('label,text'), regex_pattern, 1))
df_test = comments_test.select('text', 'label')
df_test.show(truncate=True, n=5)
df_test.count(), df_test.select('label').distinct().count()

In [None]:
import pyspark.sql.functions as funcs
df_test.groupBy(df_test.text)\
    .count()\
    .where(funcs.col('count') > 1)\
    .select(funcs.sum('count'))\
    .show()

In [None]:
balance_check(df_test)

Apperently we do not have duplications in test data. And our test data is balanced.

Now that we have both *df_train* and *df_test* in our targetted composition, we can progress with the **sentiment analysis**.

## Part-2: Spark NLP Sentiment Analysis

In [None]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

> ### I.  Logistic Regression and Naive Bayes with CountVectorizer

>> i. Building Pipeline



In [None]:
!wget -q https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.classification import LogisticRegression, NaiveBayes

In [None]:
%%time

document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentence = SentenceDetector()\
      .setInputCols("document")\
      .setOutputCol("sentence")
    
tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")
      
normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer()\
      .setInputCols(["cleanTokens"])\
      .setOutputCol("stem")

finisher = Finisher()\
      .setInputCols(["stem"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)
naiveBayes = NaiveBayes(smoothing=150)
countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)


>> ii. Forming Pipelines

In [None]:
# Pipeline for Logistic Regression with CountVectorizer
nlp_pipeline_cv_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            logReg
            ])

# Pipeline for Naive Bayes with CountVectorizer
nlp_pipeline_cv_nb = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            naiveBayes
            ])


>> iii. Logistic Regression with CountVectorizer

>>> a. Applying LogReg

In [None]:
modelLR = nlp_pipeline_cv_lr.fit(df_train)
pred_lr = modelLR.transform(df_test)
pred_lr = pred_lr.withColumn('label', pred_lr.label.cast(IntegerType()))
pred_lr.filter(pred_lr['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_lr to pandas data frame in order to using sklearn metrics library
df_lr = pred_lr.select('text','label','prediction').toPandas()
print(classification_report(df_lr.label, df_lr.prediction))
print(accuracy_score(df_lr.label, df_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_lr)


>> iv. Naive Bayes with CountVectorizer

>>> a. Applying Naive Bayes

In [None]:
modelNB = nlp_pipeline_cv_nb.fit(df_train)
pred_nb = modelNB.transform(df_test)
pred_nb = pred_nb.withColumn('label', pred_nb.label.cast(IntegerType()))
pred_nb.filter(pred_nb['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_nb to pandas data frame in order to using sklearn metrics library
df_nb = pred_nb.select('text','label','prediction').toPandas()
print(classification_report(df_nb.label, df_nb.prediction))
print(accuracy_score(df_nb.label, df_nb.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_nb)

> ### II.  TFIDF Logistic Regression and Naive Bayes

>> i. Building Pipeline



In [None]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.classification import LogisticRegression, NaiveBayes

In [None]:
document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentence = SentenceDetector()\
      .setInputCols("document")\
      .setOutputCol("sentence")

tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")

normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)


stemmer = Stemmer()\
      .setInputCols(["cleanTokens"])\
      .setOutputCol("stem")

finisher = Finisher()\
      .setInputCols(["stem"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)
naiveBayes = NaiveBayes(smoothing=140)

>> ii. Forming Pipelines

In [None]:
# Pipeline for Logistic Regression with TFIDF
nlp_pipeline_tf_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            logReg
            ])

# Pipeline for Naive Bayes with TFIDF
nlp_pipeline_tf_nb = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            naiveBayes
            ])


>> iii. Logistic Regression with TFIDF

>>> a. Applying LogReg

In [None]:
modelLR = nlp_pipeline_tf_lr.fit(df_train)
pred_tf_lr = modelLR.transform(df_test)
pred_tf_lr = pred_tf_lr.withColumn('label', pred_tf_lr.label.cast(IntegerType()))
pred_tf_lr.filter(pred_tf_lr['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_tf_lr to pandas data frame in order to using sklearn metrics library
df_tf_lr = pred_tf_lr.select('text','label','prediction').toPandas()
print(classification_report(df_tf_lr.label, df_tf_lr.prediction))
print(accuracy_score(df_tf_lr.label, df_tf_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_tf_lr)


>> iv. Naive Bayes with CountVectorizer

>>> a. Applying Naive Bayes

In [None]:
modelNB = nlp_pipeline_tf_nb.fit(df_train)
pred_tf_nb = modelNB.transform(df_test)
pred_tf_nb = pred_tf_nb.withColumn('label', pred_tf_nb.label.cast(IntegerType()))
pred_tf_nb.filter(pred_tf_nb['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_tf_nb to pandas data frame in order to using sklearn metrics library
df_tf_nb = pred_tf_nb.select('text','label','prediction').toPandas()
print(classification_report(df_lr.label, df_lr.prediction))
print(accuracy_score(df_lr.label, df_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_tf_nb)

> ### IV. Logistic Regression with TFIDF

>> i. Building Pipeline



In [None]:
!wget -q https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt

In [None]:
from pyspark.ml.feature import HashingTF, IDF

In [None]:

%%time

document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")
    
tokenizer = Tokenizer()\
      .setInputCols(["document"])\
      .setOutputCol("token")
      
normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer()\
      .setInputCols(["cleanTokens"])\
      .setOutputCol("stem")

finisher = Finisher()\
      .setInputCols(["stem"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)

nlp_pipeline_tf_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            lemmatizer, 
            finisher,
            hashingTF,
            idf,
            logReg
            ])

Please note that I also built pipelines with **Lemmatizer instead of Stemmer** and/or **without Normalizer**. Within all variants this annotator configuration performed best. Due to the readability of the code, I intentionally did not add any other pipeline variants.

In [None]:
nlp_pipeline_tf = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf])


In [None]:
nlp_model_tf = nlp_pipeline_tf.fit(df_train)
processed_tf_train = nlp_model_tf.transform(df_train)
processed_tf_train.count()


In [None]:
nlp_model_tf = nlp_pipeline_tf.fit(df_train)
processed_tf_test = nlp_model_tf.transform(df_test)
processed_tf_test.count()

>> ii. Applying Logistic Regression

In [None]:
lr = LogisticRegression(maxIter=5, regParam=0.01)

lrModel = lr.fit(processed_tf_train)

predictions_tf = lrModel.transform(processed_tf_test)

In [None]:
predictions_tf.select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
y_true = predictions_tf.select("label")
y_true = y_true.toPandas()

y_pred = predictions_tf.select("prediction")
y_pred = y_pred.toPandas()

print(classification_report(y_true.label, y_pred.prediction))
print(accuracy_score(y_true.label, y_pred.prediction))

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

evaluator.evaluate(predictions_tf)

In [None]:
document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentence = SentenceDetector()\
      .setInputCols("document")\
      .setOutputCol("sentence")

tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")

normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)


lemmatizer = Lemmatizer() \
    .setInputCols(["token"]) \
    .setOutputCol("lemma") \
    .setDictionary("./AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")
    

finisher = Finisher()\
      .setInputCols(["lemma"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)

nlp_pipeline_tf_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            lemmatizer, 
            finisher,
            hashingTF,
            idf,
            label_strIdx,
            logReg
            ])

> ### V.  LogReg with Spark NLP Bert Embeddings

In [None]:
document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")
    
tokenizer = Tokenizer()\
      .setInputCols(["document"])\
      .setOutputCol("token")
    
normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

bert_embeddings = BertEmbeddings\
      .pretrained('bert_base_cased', 'en')\
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("bert")\
      .setCaseSensitive(False)\

embeddingsSentence = SentenceEmbeddings()\
      .setInputCols(["document", "bert"])\
      .setOutputCol("sentence_embeddings")\
      .setPoolingStrategy("AVERAGE")
    
embeddings_finisher = EmbeddingsFinisher()\
      .setInputCols(["sentence_embeddings"])\
      .setOutputCols(["finished_sentence_embeddings"])\
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

In [None]:
nlp_pipeline_bert = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            bert_embeddings,
            embeddingsSentence,
            embeddings_finisher])

In [None]:
nlp_model_bert = nlp_pipeline_bert.fit(df_train)

processed_bert_train = nlp_model_bert.transform(df_train)

processed_bert_train.count()

In [None]:
nlp_model_bert = nlp_pipeline_bert.fit(df_train)

processed_bert_test = nlp_model_bert.transform(df_test)

processed_bert_test.count()

In [None]:
from pyspark.sql.functions import explode

processed_bert_train= processed_bert_train.withColumn("features", explode(processed_bert_train.finished_sentence_embeddings))
processed_bert_test= processed_bert_test.withColumn("features", explode(processed_bert_test.finished_sentence_embeddings))

In [None]:
processed_bert_test.select('text','features','label').show()

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(processed_bert_train)


In [None]:
from pyspark.sql.functions import udf

@udf("long")
def num_nonzeros(column):
    return column.numNonzeros()

processed_bert_test = processed_bert_test.where(num_nonzeros("features") != 0)
processed_bert_train = processed_bert_train.where(num_nonzeros("features") != 0)

In [None]:
predictions_bert = lrModel.transform(processed_bert_test)

In [None]:
predictions_bert.select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

> ### VI.  LogReg with Spark NLP Glove Word Embeddings

In [None]:
document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")
    
tokenizer = Tokenizer()\
      .setInputCols(["document"])\
      .setOutputCol("token")
    
normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

glove_embeddings = WordEmbeddingsModel().pretrained()\
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("embeddings")\
      .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings()\
      .setInputCols(["document", "embeddings"])\
      .setOutputCol("sentence_embeddings")\
      .setPoolingStrategy("AVERAGE")
    
embeddings_finisher = EmbeddingsFinisher()\
      .setInputCols(["sentence_embeddings"])\
      .setOutputCols(["finished_sentence_embeddings"])\
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

explodeVectors = SQLTransformer(statement=
      "SELECT EXPLODE(finished_sentence_embeddings) AS features, * FROM __THIS__")

label_stringIdx = StringIndexer(inputCol = "label", outputCol = "category")


In [None]:
nlp_pipeline_w2v = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            glove_embeddings,
            embeddingsSentence,
            embeddings_finisher,
            explodeVectors])

In [None]:

nlp_model_w2v = nlp_pipeline_w2v.fit(df_train)

processed_w2v_train = nlp_model_w2v.transform(df_train)

processed_w2v_train.count()

In [None]:
nlp_model_w2v = nlp_pipeline_w2v.fit(df_train)

processed_w2v_test = nlp_model_w2v.transform(df_test)

processed_w2v_test.count()

In [None]:
processed_w2v_test.show(truncate=True, n=5)

In [None]:
processed_w2v_test.select('finished_sentence_embeddings').take(1)

In [None]:
processed_w2v_test.select('text','features','label').show()

In [None]:
from pyspark.sql.functions import explode
processed_w2v_train= processed_w2v_train.withColumn("features", explode(processed_w2v_train.finished_sentence_embeddings))
processed_w2v_test= processed_w2v_test.withColumn("features", explode(processed_w2v_test.finished_sentence_embeddings))

In [None]:
from pyspark.sql.functions import udf

@udf("long")
def num_nonzeros(column):
    return column.numNonzeros()

processed_w2v_test = processed_w2v_test.where(num_nonzeros("features") != 0)
processed_w2v_train = processed_w2v_train.where(num_nonzeros("features") != 0)

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)
lrModel_w2v = lr.fit(processed_w2v_train)
predictions_w2v = lrModel_w2v.transform(processed_w2v_test)


In [None]:
predictions_w2v.select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

evaluator.evaluate(predictions_w2v)

> ### VI. Random Forest with TFIDF

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf = RandomForestClassifier(labelCol="label",\
                            featuresCol="features",\
                            numTrees = 100,\
                            maxDepth = 4,\
                            maxBins = 32)

In [None]:
rfModel = rf.fit(processed_tf_train)
predictions_rf = rfModel.transform(processed_tf_test)

In [None]:
predictions_rf.filter(predictions_rf['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

In [None]:
predictions_rf.filter(predictions_rf['prediction'] == 1)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

# Data Pre-processing (Training Data)

**Tokenizer**

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

tokenizer = Tokenizer(inputCol='text', outputCol='sentiment_words')
tokenized_train = tokenizer.transform(df_train)
tokenized_train.show(truncate=True, n=10)

**Removing Stop Words**

In [None]:
from pyspark.ml.feature import StopWordsRemover
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='decisive_words')
swr_free_train = swr.transform(tokenized_train)
swr_free_train.show(truncate=True, n=10)

**Hashing**

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="raw_features")
numeric_train = hashingTF.transform(swr_free_train).select('decisive_words','raw_features', 'label')
numeric_train.show(truncate=True, n=10)


# Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

logreg = LogisticRegression(labelCol='label', featuresCol='raw_features', maxIter = 5, regParam=.001)
model_lr = logreg.fit(numeric_train)
print('model_lr is trained')

In [None]:
tokenized_test = tokenizer.transform(df_test)
swr_free_test = swr.transform(tokenized_test)
numeric_test = hashingTF.transform(swr_free_test).select('decisive_words','raw_features', 'label')
numeric_test.show(truncate=True, n=10)

In [None]:
predict_logreg = model_lr.transform(numeric_test)
predicted_logreg_df = predict_logreg.select(
    "decisive_words", "prediction", "label")
predicted_logreg_df = predicted_logreg_df.withColumn('prediction', predicted_logreg_df.prediction.cast(IntegerType()))
predicted_logreg_df.show(truncate = True, n=10)


In [None]:
def confusion_matrix(df,prediction,label):
    """
    Generates a manual confusion matrix in a pyspark data frame, which is assembled according to the classification prediction. 
    df = Data Frame with prediction and label values
    prediction = string, column name of the prediction values
    label = string, column name of the label values
    """
    correctly_predicted = df.filter(df.prediction == df.label).count()
    false_positive = df.filter((df.prediction == 1) & (df.label == 0)).count()
    false_negative = df.filter((df.prediction == 0) & (df.label == 1)).count()
    true_positive = df.filter((df.prediction == 1) & (df.label == 1)).count()
    true_negative = df.filter((df.prediction == 0) & (df.label == 0)).count()
    
    accuracy = correctly_predicted/df.count()
    precision = true_positive/(true_positive + false_positive)
    recall = true_positive/(true_positive + false_negative)
    f1_score = 2 * ((precision * recall)/(precision + recall))

    
    print(f'Correctly Predicted (True Positive): {correctly_predicted} which is %{correctly_predicted/df.count()}')
    print(f'Type-I Error (False Positive): {false_positive} which is %{false_positive/df.count()}')
    print(f'Type-II Error (False Negative): {false_negative} which is %{false_negative/df.count()}')
    print(f'Accuracy: %{accuracy}')
    print(f'Precision: %{precision}')
    print(f'Sensitivity(Recall): %{recall}')
    print(f'F1 Score: %{f1_score}')



In [None]:
confusion_matrix(predicted_logreg_df, 'prediction','label')

# Random Forest

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

In [None]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [None]:
rf = RandomForestClassifier(featuresCol = 'raw_features', labelCol = 'label')
model_rf = rf.fit(numeric_train)


In [None]:
predict_rf = model_rf.transform(numeric_test)
predicted_rf_df = predict_rf.select(
    "decisive_words", "prediction", "label")
predicted_rf_df = predicted_rf_df.withColumn('prediction', predicted_rf_df.prediction.cast(IntegerType()))
predicted_rf_df.show(truncate = True, n=10)


In [None]:
predicted_rf_df.select('prediction').distinct().count()

In [None]:
confusion_matrix(predicted_rf_df, 'prediction','label')

# Gradient Boosting

In [None]:
from pyspark.ml.classification import GBTClassifier
## Fitting the model
gbt = GBTClassifier(featuresCol = 'raw_features', labelCol = 'label', maxIter=10)
model_gbt = gbt.fit(numeric_train)
predicted_gbt = model_gbt.transform(numeric_test)