<a href="https://colab.research.google.com/github/ozkanyildirim/2020-CS109B/blob/master/ESPRIT_sentiment_analysis_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

# Sentiment Analaysis ESPRIT

Project Goal: Our goal is to build models with high accuracy to make correct predictions regarding given costemer comments, whether they are positive or negative.

Data: **train data product reviews.csv** and **test data product reviews.csv** text data consisting of comments and bicathegorical labels.

Table of contents

1. Explarotory Data Analaysis (Preparing Train and Test Data)
> I. Initials <br> 
> II. Data Preparation <br> 
> III. Data Visualisation <br> 
2. Sentiment Analysis
> I. Logistic Regression and Naive Bayes with CountVectorizer <br> 
> II. Logistic Regression and Naive Bayes with TFIDF<br> 
> III. Universal Sentence Encoder <br> 

3. Conclusion












## 1. Explarotory Data Analaysis (Preparing Train and Test Data)

> ### I.  Initials

>> a. Importing Initial Modules

In [None]:
# general purpose modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import os
import sys

# pyspark modules
import pyspark
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer

# spark nlp modules
import sparknlp



>> b.  Starting a Pyspark Session

In [None]:
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

> ### II.  Data Preparation

>> a.  Retrieving Train Data

In [None]:
comments_train = spark.read.options(delimiter=';').csv('train data product reviews.csv', inferSchema=True, header=True)
comments_train.show(truncate=True, n=5)
comments_train.count(), comments_train.select('label').distinct().count()

In 'label' column we have 0's and 1's only. Let's rearrange this data frame as *df_train*.

In [None]:
df_train = comments_train.select('text', 'label')
df_train.show(truncate=True, n=5)
df_train.groupBy('label').count().show()

In [None]:
df_train.describe().show()

In [None]:
def balance_check(df, col='label'):
  """
  Checks the balance of data regarding labels and displays.
  df: data frame
  col: string column
  """
  positive = df.where(df.label == '1').count()
  negative = df.where(df.label == '0').count()
  pos_percent = 100 * positive/(positive + negative)
  neg_percent = 100 * negative/(positive + negative)
  print(f'Positive Comments: {positive} which is %{pos_percent}')
  print(f'Negative Comments: {negative} which is %{neg_percent}')

balance_check(df_train)

Given the distribution of the comments in training data we have a relative unbalanced data (~ 0.28 - 0.72). Before deciding whether applying a downsizing or upsizing technique, let's first check whether do we have duplications in the training data.

In [None]:
import pyspark.sql.functions as funcs
df_train.groupBy(df_train.text)\
    .count()\
    .where(funcs.col('count') > 1)\
    .select(funcs.sum('count'))\
    .show()

Let's drop the duplicated rows and keep only the first occurences.

In [None]:
df_train = df_train.dropDuplicates((['text']))
balance_check(df_train)

After removing the duplications, the distribution of comments in the training data changed slightly to the positive (more balanced ~ 0.33 - 0.67). For now, we keep the data in this distribution and do not apply any downsizing or upsizing technique (or generation), but we use the F1 score as a performance metric to avoid being biased by the data distribution.

Now we are going to maintain a *df_test* similar to *df_train*.

>> b.  Retrieving Test Data

In [None]:
comments_test = spark.read.options(delimiter=';').csv('test data product reviews.csv', inferSchema=True, header=True)
comments_test.show(truncate=True, n=5)
comments_test.count()

We are going to use *regex* to describe patters to obtain a clean data frame with columns text and label.

In [None]:
regex_pattern = r'"*([01])(.+)'
comments_test = comments_test.withColumn('text', regexp_extract(col('label,text'), regex_pattern, 2))\
                 .withColumn('label', regexp_extract(col('label,text'), regex_pattern, 1))
df_test = comments_test.select('text', 'label')
df_test.show(truncate=True, n=5)
df_test.count(), df_test.select('label').distinct().count()

In [None]:
import pyspark.sql.functions as funcs
df_test.groupBy(df_test.text)\
    .count()\
    .where(funcs.col('count') > 1)\
    .select(funcs.sum('count'))\
    .show()

In [None]:
balance_check(df_test)

Apperently we do not have duplications in test data. And our test data is balanced.

Now that we have both *df_train* and *df_test* in our targetted composition, we can progress with data visualisation and finally the **Sentiment Analysis**.

> ### III. Data Visualisation

In [None]:
import plotly.express as px

In [None]:
df_viz_train = df_train.toPandas()
df_viz_train.to_csv('train_viz', sep='\t', encoding='utf-8', index=False)
df_viz_test = df_train.toPandas()
df_viz_test.to_csv('test_viz', sep='\t', encoding='utf-8', index=False)

In [None]:
# length of comment
df_viz_train['length'] = df_viz_train.text.apply(lambda x: len(x))
df_viz_train.head()
df_viz_test['length'] = df_viz_test.text.apply(lambda x: len(x))
df_viz_train.head()

In [None]:
sns.displot(data=df_viz_train, x='label', y='length')
plt.xticks([0,1], ['negative', 'positive'], rotation='vertical')
plt.show()

In [None]:
def clean_token_extractor(df):

    '''
    Returns three pandas Data Frames (for positive and negative comments seperately 
    and also all comments) with columns ['label', 'result'],
    where 'result' contains cleaned tokens of words.
    ''' 
    %%time
    document_assembler = DocumentAssembler()\
          .setInputCol("text")\
          .setOutputCol("document")

    tokenizer = Tokenizer()\
          .setInputCols(["document"])\
          .setOutputCol("token")

    normalizer = Normalizer()\
          .setInputCols(["token"])\
          .setOutputCol("normalized")

    stopwords_cleaner = StopWordsCleaner()\
          .setInputCols("normalized")\
          .setOutputCol("cleanTokens")\
          .setCaseSensitive(False)

    pipe_viz = Pipeline(
        stages=[document_assembler,
                tokenizer,
                normalizer,
                stopwords_cleaner
                ])

    model_viz = pipe_viz.fit(df)
    df_viz = model_viz.transform(df)

    df_viz_all = df_viz.select('label','cleanTokens.result').toPandas()
    df_viz_poz = df_viz_all[df_viz_all['label'] == 1]
    df_viz_neg = df_viz_all[df_viz_all['label'] == 0]
    return df_viz_poz, df_viz_neg, df_viz_all

In [None]:
df_viz_train_pos, df_viz_train_neg, df_viz_train = clean_token_extractor(df_train)
df_viz_test_pos, df_viz_test_neg, df_viz_test = clean_token_extractor(df_test)

In [None]:
def word_bag(df, col = 'result'):
  
    """
    Counts each word in a data frame and returns counts of each word in a 
    Pandas Data Frame
    col: feature (each row list of strings)
    """ 
    full_list = []  
    for elmnt in df[col]:  
        full_list += elmnt  

    val_counts = pd.Series(full_list).value_counts()
    df_words = pd.DataFrame(val_counts).reset_index().rename(columns={'index':'word', 0:'word_count'}).sort_values(by='word_count', ascending=False)
    return df_words

In [None]:
df_words_all = word_bag(df_viz_train)
df_words_pos = word_bag(df_viz_train_pos)
df_words_neg = word_bag(df_viz_train_neg)

In [None]:
def word_displayer(df, n=20, stat='positive'):
    '''
    Visualise most frequent words in a Pandas Data Frame.
    stat: 'positive', 'negative', 'all'
    n: most frequent n words
    '''
    f, ax = plt.subplots(figsize=(10, 10))
    sns.set_color_codes("pastel")
    sns.barplot(x="word_count", y="word", data=df.iloc[0:n,:],
                label="Total", color="b")
    ax.set(xlim=(100), ylabel="",
          xlabel=f"Number of Words")
    ax.set_title(f'{stat.upper()} COMMENTS')

In [None]:
word_displayer(df=df_words_all, n=20, stat='all')

In [None]:
word_displayer(df=df_words_neg, n=20, stat='negative')

In [None]:
word_displayer(df=df_words_pos, n=20, stat='positive')

## 2. Sentiment Analysis

In [None]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

> ### I.  Logistic Regression and Naive Bayes with **CountVectorizer**

>> i. Building Pipeline



In [None]:
!wget -q https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.classification import LogisticRegression, NaiveBayes

In [None]:
%%time

document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentence = SentenceDetector()\
      .setInputCols("document")\
      .setOutputCol("sentence")

tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")

normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)


stemmer = Stemmer()\
      .setInputCols(["cleanTokens"])\
      .setOutputCol("stem")

finisher = Finisher()\
      .setInputCols(["stem"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)
naiveBayes = NaiveBayes(smoothing=5)
countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)


>> ii. Forming Pipelines

In [None]:
# Pipeline for Logistic Regression with CountVectorizer
nlp_pipeline_cv_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            logReg
            ])

# Pipeline for Naive Bayes with CountVectorizer
nlp_pipeline_cv_nb = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            naiveBayes
            ])


>> iii. Logistic Regression with CountVectorizer

>>> a. Applying LogReg

In [None]:
modelLR = nlp_pipeline_cv_lr.fit(df_train)
pred_lr = modelLR.transform(df_test)
pred_lr = pred_lr.withColumn('label', pred_lr.label.cast(IntegerType()))
pred_lr.filter(pred_lr['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_lr to pandas data frame in order to using sklearn metrics library
df_lr = pred_lr.select('text','label','prediction').toPandas()
print(classification_report(df_lr.label, df_lr.prediction))
print(accuracy_score(df_lr.label, df_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_lr)


>> iv. Naive Bayes with CountVectorizer

>>> a. Applying Naive Bayes

In [None]:
modelNB = nlp_pipeline_cv_nb.fit(df_train)
pred_nb = modelNB.transform(df_test)
pred_nb = pred_nb.withColumn('label', pred_nb.label.cast(IntegerType()))
pred_nb.filter(pred_nb['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_nb to pandas data frame in order to using sklearn metrics library
df_nb = pred_nb.select('text','label','prediction').toPandas()
print(classification_report(df_nb.label, df_nb.prediction))
print(accuracy_score(df_nb.label, df_nb.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_nb)

> ### II.  **TFIDF** Logistic Regression and Naive Bayes

>> i. Building Pipeline



In [None]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.classification import LogisticRegression, NaiveBayes

In [None]:
%%time
document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentence = SentenceDetector()\
      .setInputCols("document")\
      .setOutputCol("sentence")

tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")

normalizer = Normalizer()\
      .setInputCols(["token"])\
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)


stemmer = Stemmer()\
      .setInputCols(["cleanTokens"])\
      .setOutputCol("stem")

finisher = Finisher()\
      .setInputCols(["stem"])\
      .setOutputCols(["token_features"])\
      .setOutputAsArray(True)\
      .setCleanAnnotations(False)

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
label_strIdx = StringIndexer(inputCol='label', outputCol='target')
logReg = LogisticRegression(maxIter=5, regParam=0.01)
naiveBayes = NaiveBayes(smoothing=5)

>> ii. Forming Pipelines

In [None]:
# Pipeline for Logistic Regression with TFIDF
nlp_pipeline_tf_lr = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            logReg
            ])

# Pipeline for Naive Bayes with TFIDF
nlp_pipeline_tf_nb = Pipeline(
    stages=[document_assembler,
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            naiveBayes
            ])


>> iii. Logistic Regression with TFIDF

>>> a. Applying LogReg

In [None]:
modelLR = nlp_pipeline_tf_lr.fit(df_train)
pred_tf_lr = modelLR.transform(df_test)
pred_tf_lr = pred_tf_lr.withColumn('label', pred_tf_lr.label.cast(IntegerType()))
pred_tf_lr.filter(pred_tf_lr['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_tf_lr to pandas data frame in order to using sklearn metrics library
df_tf_lr = pred_tf_lr.select('text','label','prediction').toPandas()
print(classification_report(df_tf_lr.label, df_tf_lr.prediction))
print(accuracy_score(df_tf_lr.label, df_tf_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_tf_lr)


>> iv. Naive Bayes with TFIDF

>>> a. Applying Naive Bayes

In [None]:
modelNB = nlp_pipeline_tf_nb.fit(df_train)
pred_tf_nb = modelNB.transform(df_test)
pred_tf_nb = pred_tf_nb.withColumn('label', pred_tf_nb.label.cast(IntegerType()))
pred_tf_nb.filter(pred_tf_nb['prediction'] == 0)\
    .select("text","probability","label","prediction")\
    .orderBy("probability", ascending=False)\
    .show(n = 10, truncate = 30)

>>> b. Model Performance

In [None]:
# Converting pred_tf_nb to pandas data frame in order to using sklearn metrics library
df_tf_nb = pred_tf_nb.select('text','label','prediction').toPandas()
print(classification_report(df_lr.label, df_lr.prediction))
print(accuracy_score(df_lr.label, df_lr.prediction))

# Evaluation within the Spark Universe is also possible (for scaling issues)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol="prediction")
evaluator.evaluate(pred_tf_nb)

> ### III.  Universal Sentence Encoder

>> i. Building Pipeline



In [None]:
%%time

document = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("label")\
  .setMaxEpochs(11)\
  .setEnableOutputLogs(True)


>> ii. Forming Pipeline

In [None]:
use_clf_pipeline = Pipeline(
    stages = [
        document,
        use,
        classsifierdl
    ])

In [None]:
!cd ~/annotator_logs && ls -l


>> iii. Universal Sentence Encoder with Deep Learning Approach

>>> a. Applying Universal Sentence Encoder with DL

In [None]:
useModel = use_clf_pipeline.fit(df_train)
pred_use = useModel.transform(df_test)
df_use = pred_use.select('text','label', 'class.result').toPandas()
df_use['result'] = df_use['result'].apply(lambda x: x[0])
df_use.head()

>>> b. Model Performance

In [None]:
print(classification_report(df_use.label, df_use.result))
print(accuracy_score(df_use.label, df_use.result))

# 3. Conclusion

Considering the wide application areas of NLP, building solid algorithms on a large scale is a powerful asset that strengthens the competence of companies in almost all business areas. From this perspective, I approached this simulation of ESPRIT's use case, keeping in touch with 3 pillars.

The first pillar could be described as "keeping up with the pace of NLP inferno". In other words, I applied modern embedding techniques like "Universal Sendence Embedding" which use pre-trained embedding algorithms powered by Deep Learning under the hood. Not to my surprise, I got the best accuracy results using "Universal Sendence Embedding". But also I needed to use resources effectively and did not used alternatives like Bert Centence Embedding (Note that for code readability reasons, no fine tuning steps are included in this notebook). 

The second pillar is "simplicity" and this is where pipelines comes into play. I built pipelines that made my code modular, digestible and also stable. My team partners could easily apply/improve my code without any additional support, which cannot be overstated.

The third pillar is 'extending the horizon' which could be explained observing the wider possibilities and sharing these new horizons with my colleagues. To do so I attended the John Snow Labs NLP for Data Science workshop (live-online) and tried to apply these new visions in my case study.

There is no one best model or the only possible solution to every business problem. That's why I look forward to your insight. I want to express my appreciation to have such an opportunity. 

Thank you for your time and see you all tomarrow!

