## BD10 - D03 - sander: A Predictive Classification Model for Affective Texts

### Introduction
- **It may take hours to run the code**
- **Group name**: Pesticide
- **Group members**: 
  - Can Zhou (20031818)
  - Xuting Wu (20028395)
  - Jizhou Che (20032291)
  - Yingrui Ma (20032288)

- **Dataset: sander**
- **How to run**:
  - **Download and put the folder in your root path of Google Drive!!!**
  - Root Path: /content/drive/My Drive/
  - Floder information:
    - Big_Data_CW_BD10
      - README.txt
      - Big_Data_D01.ipynb
      - Big_Data_D02.ipynb
      - Big_Data_D03.ipynb
      - Dataset_Verification_Label_Accuracy.ipynb
      - Dataset_Verification_Spelling_Check.ipynb
      - Chart.ipynb
      - Datasets
        - STS.csv
        - tweet_emotions.csv
        - sander.csv
      - HelperFile
        - abbr.txt
    - Please contact us if there is any confusion.


- **Code structure**:
  - Install and Import
  - Start & Read Data & Select Classes
  - Stage01: Text Preprocessing
    - Step01: Spelling Correction; Abbreviation & Emoji Recovery
    - Step02: Tokenisation
    - Step03: Stopwords & Noise Removal
    - Step04: Word Lemmatisation & Normalisation
    - Pipeline Construction
  - Stage02: Feature Extraction
    - TF-IDF
    - word2vec
    - Bert
  - Divide Data
  - Stage03: Model Training
    - Logistic Regression
    - Linear SVM
    - Naive Bayes
    - MLP
  - Experiments
    - Experiments Setup
    - Proposed Framework Experiments
    - Baseline Framework Experiments



### Install and import

In [None]:
# Install java
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
!java -version

# Install pyspark
!pip install --ignore-installed -q pyspark

# Install Sparknlp
!pip install --ignore-installed spark-nlp

# Install helpers
!apt-get install enchant
!pip install pyenchant

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)
[K     |████████████████████████████████| 212.3MB 72kB/s 
[K     |████████████████████████████████| 204kB 48.6MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Collecting spark-nlp
[?25l  Downloading https://files.pythonhosted.org/packages/6c/35/3d06b93fefdeab0f6f544b1fc48e5e49c049697c38611ef870383031380b/spark_nlp-3.0.3-py2.py3-none-any.whl (43kB)
[K     |████████████████████████████████| 51kB 4.0MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-3.0.3
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  asp

In [None]:
# Import
import enchant
from enchant.checker import SpellChecker
from enchant.tokenize import EmailFilter, URLFilter

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, lit, udf, split, explode
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
from pyspark.ml.feature import SQLTransformer, StopWordsRemover, Word2Vec, HashingTF, IDF, Tokenizer, MinMaxScaler
from pyspark.ml.classification import LinearSVC, LogisticRegression, NaiveBayes, MultilayerPerceptronClassifier

import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.common import *
from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline
from sparknlp import Finisher

**Import Colab, please pay attention to the path!!!!**  
**If the code is run in cslinux, please do not import colab and change path to:**  
```
path01 = "Datasets/"  
path02 = "HelperFile/"
```
**under the Big_Data_CW_BD10 folder path**


In [None]:
# Only for colab
from google.colab import drive
drive.mount('/content/drive')
path01 = "/content/drive/My Drive/Big_Data_CW_BD10/Datasets/"
path02 = "/content/drive/My Drive/Big_Data_CW_BD10/HelperFile/"

Mounted at /content/drive


### Start & Read data & Select Classes

In [None]:
# Start
spark = sparknlp.start()

In [None]:
# Read data
data = spark.read.csv(path01 + 'sander.csv', header=True)
data = data.withColumnRenamed("TweetText", "original_text")
data.show()

+-----+---------+------------------+--------------------+----------------------+
|Topic|Sentiment|           TweetId|           TweetDate|         original_text|
+-----+---------+------------------+--------------------+----------------------+
|apple| positive|126415614616154112|Tue Oct 18 21:53:...|  Now all @Apple ha...|
|apple| positive|126404574230740992|Tue Oct 18 21:09:...|  @Apple will be ad...|
|apple| positive|126402758403305474|Tue Oct 18 21:02:...|  Hilarious @youtub...|
|apple| positive|126397179614068736|Tue Oct 18 20:40:...|  @RIM you made it ...|
|apple| positive|126395626979196928|Tue Oct 18 20:34:...|  I just realized t...|
|apple| positive|126394830791254016|Tue Oct 18 20:30:...|  I'm a current @Bl...|
|apple| positive|126379685453119488|Tue Oct 18 19:30:...|  The 16 strangest ...|
|apple| positive|126377656416612353|Tue Oct 18 19:22:...|  Great up close & ...|
|apple| positive|126373779483004928|Tue Oct 18 19:07:...|  From which compan...|
|apple| positive|12636635375

### Select Classes

In [None]:
positive_df = data.filter(data.Sentiment == "positive").select("original_text")
negative_df = data.filter(data.Sentiment == "negative").select("original_text")
positive_df = positive_df.limit(250)
negative_df = negative_df.limit(250)

data = positive_df.withColumn("sentiment", lit(1))
tmp = negative_df.withColumn("sentiment", lit(0))
data = data.union(tmp)
data.show()

+----------------------+---------+
|         original_text|sentiment|
+----------------------+---------+
|  Now all @Apple ha...|        1|
|  @Apple will be ad...|        1|
|  Hilarious @youtub...|        1|
|  @RIM you made it ...|        1|
|  I just realized t...|        1|
|  I'm a current @Bl...|        1|
|  The 16 strangest ...|        1|
|  Great up close & ...|        1|
|  From which compan...|        1|
|  Just apply for a ...|        1|
|  RT @JamaicanIdler...|        1|
|  Lmao I think @app...|        1|
|  RT @PhillipRowntr...|        1|
|  Wow. Great deals ...|        1|
|  Just registered a...|        1|
|你好 ! Currently le...|        1|
|  Come to the dark ...|        1|
|  Hey @apple, if yo...|        1|
|  Thank you @apple ...|        1|
|  Thanks to @Apple ...|        1|
+----------------------+---------+
only showing top 20 rows



### Stage01: Text Preprocessing

#### Step 01: Spelling Correction, Abbreviation & Emoji recovery.

In [None]:
#################################################################
# STEP 01:                                                      #
# Spelling correction, abbreviation recovery and emoji recovery.#
#################################################################
def spell_fix(s):

  # Load and construct the abbreviation dictionary.
  abbrDict = open(path02 + "abbr.txt").readlines()
  src = []
  dst = []
  for line in abbrDict:
  	if len(line) > 1:
  		if line[0] != '#':
  			l = line.replace("\n", "").split("??", 1)
  			src.append(l[0])
  			dst.append(l[1])
     
  # Replace appreviations.
  for i in range(0, len(src)):
    s = s.replace(src[i], dst[i])

  # Suggest the words.
  checker = SpellChecker("en_US", filters=[EmailFilter, URLFilter])
  checker.set_text(s.lower())
  for err in checker:
    suggestions = err.suggest()
    if len(suggestions) != 0:

      # Consider weighting the suggestions with respect to frequencies!
      err.replace(err.suggest()[0])
  return checker.get_text().lower()

spark.udf.register("spell_fix_udf", spell_fix)
spell_fix_sql = SQLTransformer(statement = "SELECT *, spell_fix_udf(original_text) as text FROM __THIS__")

#### Step02: Tokenisation

In [None]:
#################################################################
# STEP 02:                                                      #
# Gain Tokens                                                   #
#################################################################
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")
    
sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setUseAbbreviations(True)
    
tokenizer = Tokenizer() \
  .setInputCols(["sentence"]) \
  .setOutputCol("token")

#### Step 03: Stopwords & Noise Removal

In [None]:
#################################################################
# STEP 03:                                                      #
# Remove stopword and noise                                     #
#################################################################
# Encapsulate the class for noise removing
# Remove @, link, #, RT, :
class RegexTransformer(Transformer):
  def __init__(self):
    super(RegexTransformer, self).__init__()
  def _transform(self, df):
    df = df.withColumn('text_explode', F.explode(F.col('clean_token')))\
           .na.replace('', None)\
           .na.drop()\
           .withColumn('reg_ex', F.regexp_replace(F.col('text_explode'), '@\w+', ''))\
           .withColumn('reg_ex', F.regexp_replace(F.col('reg_ex'),  r'http\S+', ''))\
           .withColumn('reg_ex', F.regexp_replace(F.col('reg_ex'),  '#', ''))\
           .withColumn('reg_ex', F.regexp_replace(F.col('reg_ex'),  'RT', ''))\
           .withColumn('reg_ex', F.regexp_replace(F.col('reg_ex'),  ':', ''))\
           .groupBy('text').agg(F.collect_list(F.col('reg_ex')).alias('regex_array'))
    return df

stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("token")\
    .setOutputCol("stopTokens")\
    .setCaseSensitive(False)

#### Step 04: Word Lemmatisation & Normalisation

In [None]:
#################################################################
# STEP 04:                                                     #
# Word Lemmatization & normalize                                #
#################################################################
stemmer = Stemmer() \
    .setInputCols(["stopTokens"]) \
    .setOutputCol("stem")
    
normalizer = Normalizer() \
    .setInputCols(["stem"]) \
    .setOutputCol("normalized")

#### Pipeline Construction

In [None]:
#################################################################
# Text mining pipeline construction.                            #
#################################################################

# Data transformers
finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["clean_token"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)

# Our NLP pipeline
nlp_pipeline = Pipeline(stages=[spell_fix_sql, 
                                document_assembler, 
                                sentence_detector, 
                                tokenizer, 
                                stopwords_cleaner, 
                                stemmer, 
                                normalizer, 
                                finisher])

# Fit and transform data
def textMining(data):
  model = nlp_pipeline.fit(data)
  transformed_data = model.transform(data)

  transformed_data.printSchema()
  transformed_data.show()
  return transformed_data

### Stage02: Feature Extraction
- TF-IDF
- Word2Vec
- Bert

#### TF-IDF

In [None]:
def tfIdf(transformed_data):
  # TF
  hashingTF = HashingTF(inputCol="clean_token", outputCol="rawFeatures", numFeatures=200)
  featurizedData = hashingTF.transform(transformed_data).persist()

  #IDF
  idf = IDF(inputCol="rawFeatures", outputCol="features")
  idfModel = idf.fit(featurizedData)
  tmpData = idfModel.transform(featurizedData).persist()
  
  # Scale features
  featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(tmpData)
  processed = featureScaler.transform(tmpData)

  # Trans & select data
  df = processed.select(col("sentiment").alias("labels"), col("clean_token").alias("MeaningfulWords"), col("scaledFeatures").alias("features"))
  return df

#### word2vec

In [None]:
def myWord2Vec(transformed_data):
  # Word2vec
  word2vec = Word2Vec(vectorSize=200, seed=42, inputCol="clean_token", outputCol="features")
  model = word2vec.fit(transformed_data)
  tmpData = model.transform(transformed_data)

  # Scale features
  featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(tmpData)
  processed = featureScaler.transform(tmpData)

  # Trans & select data
  df = processed.select(col("sentiment").alias("labels"), col("clean_token").alias("MeaningfulWords"), col("scaledFeatures").alias("features"))
  return df

#### Bert

In [None]:
# Load Bert model
bert_embeddings = BertEmbeddings\
      .pretrained('bert_base_cased', 'en') \
      .setInputCols(["document",'normalized'])\
      .setOutputCol("bert")\
      .setCaseSensitive(False)\

# Trans embedding words to embedding sentence
embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

# Finish    
embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)


def myBert(data):
  # Pipeline for bert
  nlp_pipeline_bert = Pipeline(
      stages=[spell_fix_sql,
              document_assembler,
              sentence_detector, 
              tokenizer, 
              stopwords_cleaner,
              stemmer, 
              normalizer, 
              bert_embeddings,
              embeddingsSentence,
              embeddings_finisher])

  # Feed data
  nlp_model_bert = nlp_pipeline_bert.fit(data)
  processed_bert = nlp_model_bert.transform(data)
  processed_bert= processed_bert.withColumn("features", explode(processed_bert.finished_sentence_embeddings))

  # Scale features
  featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(processed_bert)
  processed = featureScaler.transform(processed_bert)

  # Trans & select data
  df = processed.select(col("sentiment").alias("labels"), col("normalized").alias("MeaningfulWords"), col("scaledFeatures").alias("features"))

  return df

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


### Stage03: Model Training
- Logistic Regression
- Linear SVM
- Naive Bayes
- MLP

### Divide data
- 70% for training, 30% for testing

In [None]:
def myDivideData(df):
  dividedData = df.randomSplit([0.7, 0.3], 123) 
  trainingData = dividedData[0] #index 0 = data training
  testingData = dividedData[1] #index 1 = data testing

  return trainingData, testingData

#### Logistic Regression

In [None]:
def myLogReg(trainingData, testingData):
  # Training
  lr = LogisticRegression(labelCol="labels", featuresCol="features", 
                        maxIter=10, regParam=0.01)
  model = lr.fit(trainingData)

  # Testing
  prediction = model.transform(testingData)
  predictionFinal = prediction.select(
      "MeaningfulWords", "prediction", "labels")

  # Analyse
  correctPrediction = predictionFinal.filter(
      predictionFinal['prediction'] == predictionFinal['labels']).count()
  totalData = predictionFinal.count()
  print("Logistic Regression Accuracy:", correctPrediction/totalData)
  
  return correctPrediction/totalData

#### Linear SVM

In [None]:
def myLSVM(trainingData, testingData):
  # Training
  lsvm = LinearSVC(labelCol="labels", featuresCol="features", 
                          maxIter=10, regParam=0.01)
  model = lsvm.fit(trainingData)

  # Testing
  prediction = model.transform(testingData)
  predictionFinal = prediction.select(
      "MeaningfulWords", "prediction", "labels")

  # Analyse
  correctPrediction = predictionFinal.filter(
      predictionFinal['prediction'] == predictionFinal['labels']).count()
  totalData = predictionFinal.count()
  print("Linear SVM Accuracy:", correctPrediction/totalData)

  return correctPrediction/totalData

#### Naive Bayes

In [None]:
def myNBayes(trainingData, testingData):
  # Training
  nb = NaiveBayes(labelCol="labels", featuresCol="features",
                  smoothing=1.0, modelType="multinomial")
  model = nb.fit(trainingData)

  # Testing
  prediction = model.transform(testingData)
  predictionFinal = prediction.select(
      "MeaningfulWords", "prediction", "labels")

  # Analyse
  correctPrediction = predictionFinal.filter(
      predictionFinal['prediction'] == predictionFinal['labels']).count()
  totalData = predictionFinal.count()
  print("Naive Bayes Accuracy:", correctPrediction/totalData)
  
  return correctPrediction/totalData

#### MLP


In [None]:
def myMLP(trainingData, testingData, imputNumber):
  # Define layer structure
  layers = [imputNumber, 100, 100, 2]

  # Training
  mlp = MultilayerPerceptronClassifier(labelCol="labels", featuresCol="features", 
                                       maxIter=100, layers=layers, blockSize=128, seed=1234)
  model = mlp.fit(trainingData)

  # Testing
  prediction = model.transform(testingData)
  predictionFinal = prediction.select(
      "MeaningfulWords", "prediction", "labels")

  # Analyse
  correctPrediction = predictionFinal.filter(
      predictionFinal['prediction'] == predictionFinal['labels']).count()
  totalData = predictionFinal.count()
  print("MLP Accuracy:", correctPrediction/totalData)
  
  return correctPrediction/totalData

### Experiments

#### Experiments Setup

**General experiments setup functions**

In [None]:
def mlAlgorithms(df, isBert):
  # Divide data
  trainingData, testingData = myDivideData(df)

  # Logistic Regression
  myLogReg(trainingData, testingData)

  # Linear SVM
  myLSVM(trainingData, testingData)

  # Naive Bayes
  myNBayes(trainingData, testingData)

  # MLP
  if not isBert:
    myMLP(trainingData, testingData, 200)
  else:
    myMLP(trainingData, testingData, 768)

In [None]:
def experiments(isControl, data):
  # Pre-processing
  if isControl:
    transformed_data = textMiningControl(data)
  else:
    transformed_data = textMining(data)

  # TF-IDF
  print("#######################################################")
  print("TF-IDF:")
  df = tfIdf(transformed_data)
  mlAlgorithms(df, False)

  # word2vec
  print("\n#######################################################")
  print("word2vec:")
  df = myWord2Vec(transformed_data)
  mlAlgorithms(df, False)

  # Bert
  print("\n#######################################################")
  print("Bert:")
  if isControl:
    df = myBert_control(data)
  else:
    df = myBert(data)
  mlAlgorithms(df, True)

**Baseline framework helper functions**

In [None]:
################################################################################
# Setup Control pipeline                                                       #
################################################################################

# Data transformers
document_assembler_control = DocumentAssembler() \
    .setInputCol("original_text") \
    .setOutputCol("document")
    
sentence_detector_control = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setUseAbbreviations(True)
    
tokenizer_control = Tokenizer() \
  .setInputCols(["sentence"]) \
  .setOutputCol("token")

finisher_control = Finisher() \
    .setInputCols(["token"]) \
    .setOutputCols(["clean_token"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)

# Pipeline for control group
control_pipeline = Pipeline(stages=[document_assembler_control,
                                    sentence_detector_control,
                                    tokenizer_control,
                                    finisher_control])
# Fit and transform data
def textMiningControl(data):
  model = control_pipeline.fit(data)
  transformed_data = model.transform(data)

  transformed_data.printSchema()
  transformed_data.show()
  return transformed_data

################################################################################
# Bert for control group                                                       #
################################################################################

# Load Bert model
bert_embeddings = BertEmbeddings\
      .pretrained('bert_base_cased', 'en') \
      .setInputCols(["document",'token'])\
      .setOutputCol("bert")\
      .setCaseSensitive(False)\

# Trans embedding words to embedding sentence
embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

# Finish    
embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)


def myBert_control(data):
  # Pipeline for bert
  nlp_pipeline_bert = Pipeline(
      stages=[document_assembler_control,
              sentence_detector_control,
              tokenizer_control, 
              bert_embeddings,
              embeddingsSentence,
              embeddings_finisher])

  # Feed data
  nlp_model_bert = nlp_pipeline_bert.fit(data)
  processed_bert = nlp_model_bert.transform(data)
  processed_bert= processed_bert.withColumn("features", explode(processed_bert.finished_sentence_embeddings))

  # Scale features
  featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(processed_bert)
  processed = featureScaler.transform(processed_bert)

  # Trans & select data
  df = processed.select(col("sentiment").alias("labels"), col("token").alias("MeaningfulWords"), col("scaledFeatures").alias("features"))

  return df

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


#### Proposed Framework Experiments

In [None]:
isControl = False
experiments(isControl, data)

root
 |-- original_text: string (nullable = true)
 |-- sentiment: integer (nullable = false)
 |-- text: string (nullable = true)
 |-- clean_token: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------------+---------+----------------------+----------------------+
|         original_text|sentiment|                  text|           clean_token|
+----------------------+---------+----------------------+----------------------+
|  Now all @Apple ha...|        1|  now all @laypeopl...|  [laypeopl, get, s...|
|  @Apple will be ad...|        1|  @laypeople will b...|  [laypeopl, ad, ca...|
|  Hilarious @youtub...|        1|  hilarious @you tu...|  [hilari, you, tub...|
|  @RIM you made it ...|        1|  @rim you made it ...|  [rim, made, easi,...|
|  I just realized t...|        1|  i just realized t...|  [realiz, reason, ...|
|  I'm a current @Bl...|        1|  i'm a current @bl...|  [current, blackbe...|
|  The 16 strangest ...|        1|  the 16 stranges

#### Baseline Framework Experiments

In [None]:
isControl = True
experiments(isControl, data)

root
 |-- original_text: string (nullable = true)
 |-- sentiment: integer (nullable = false)
 |-- clean_token: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------------+---------+----------------------+
|         original_text|sentiment|           clean_token|
+----------------------+---------+----------------------+
|  Now all @Apple ha...|        1|  [Now, all, @Apple...|
|  @Apple will be ad...|        1|  [@Apple, will, be...|
|  Hilarious @youtub...|        1|  [Hilarious, @yout...|
|  @RIM you made it ...|        1|  [@RIM, you, made,...|
|  I just realized t...|        1|  [I, just, realize...|
|  I'm a current @Bl...|        1|  [I'm, a, current,...|
|  The 16 strangest ...|        1|  [The, 16, strange...|
|  Great up close & ...|        1|  [Great, up, close...|
|  From which compan...|        1|  [From, which, com...|
|  Just apply for a ...|        1|  [Just, apply, for...|
|  RT @JamaicanIdler...|        1|  [RT, @JamaicanIdl...|
|  L