In [1]:
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, min, max
import matplotlib.pyplot as plt
import numpy as np
import nltk
from nltk.corpus import stopwords
import seaborn as sns
sns.set(font_scale=1.5)
sns.set_style("whitegrid")


PATH = r"C:\Users\cuong\2021 Machine Learning\data"
conf = SparkConf().setAppName("CIS5367 Midterm App").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
sql_context = SQLContext(sc)

In [2]:
def get_data():
    twitter_data = sql_context.read.load("%s/twitter_10k_data.csv" % PATH,    
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
    #Delete all duplicated rows
    #twitter_data = twitter_data.distinct()
    
    #Return a dataframe
    return twitter_data

## Getting the Dataset and divide it into training and test data 

In [3]:
twitter_data = get_data()

In [4]:
from pyspark.sql.functions import when 
twitter_data = twitter_data.withColumn('polarity_type', when(twitter_data.polarity > 0, 'positive')\
                                         .when(twitter_data.polarity < 0, 'negative').otherwise("neutral"))

In [5]:
import pandas as pd
import re
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import StringType, ArrayType


(df_train_raw, df_test_raw) = twitter_data.randomSplit([0.8, 0.2], seed = 1234)
print (df_train_raw.first())

Row(polarity=-1.0, tweet='AskNationwide Hermesparcels Hermes out of all the delivery companies in UK u are by the worst hence why we b', brand_name='Hermes', polarity_type='negative')


In [6]:
print ("\nTest Count::::: " , df_test_raw.count())
print ("\nTrain Count::::: " , df_train_raw.count())


Test Count:::::  1836

Train Count:::::  7433


In [7]:
df_train_raw = df_train_raw.withColumn("part", lit(1))
df_test_raw = df_test_raw.withColumn("part", lit(0))
df_all_raw = df_train_raw.union(df_test_raw)
df_all_raw.show()

+-------------------+--------------------+----------+-------------+----+
|           polarity|               tweet|brand_name|polarity_type|part|
+-------------------+--------------------+----------+-------------+----+
|               -1.0|AskNationwide Her...|    Hermes|     negative|   1|
|              -0.85|Horrible And Ugly...|     Gucci|     negative|   1|
|               -0.8|Hate when this ha...|     Gucci|     negative|   1|
|-0.6999999999999998|Gary Effin Oldman...|     Prada|     negative|   1|
|-0.6999999999999998|NewNew vaporwave ...|     Fendi|     negative|   1|
|               -0.6|Altri 10 minuti A...|     Gucci|     negative|   1|
|               -0.6|Body bagprada are...|     Prada|     negative|   1|
|               -0.6|Guccis armed and ...|     Gucci|     negative|   1|
|               -0.6|Hermesparcels is ...|    Hermes|     negative|   1|
|               -0.6|Something in betw...|  Burberry|     negative|   1|
|               -0.6|Versace Stone col...|   Versac

## Preprocess the data - eliminate unneccessary words, create word counts by converts texts into vectors

In [8]:
#Preprocessing Data
tokenizer = Tokenizer(inputCol='tweet', outputCol='words')
wordsData = tokenizer.transform(df_all_raw). \
    select('polarity_type', 'brand_name', 'words', 'part')

In [9]:
# Remove number
filter = re.compile(r"^[a-zA-Z]+$")
match_udf = udf(lambda tokens: [token for token in tokens if filter.match(token)], ArrayType(StringType()))
df_matched = wordsData.withColumn("words_matched", match_udf("words")). \
    select('words_matched', 'polarity_type', 'brand_name','part')
# Remove stop words
remover = StopWordsRemover(inputCol='words_matched', outputCol='words_clean')
df_words_clean = remover.transform(df_matched). \
    select('words_clean', 'polarity_type', 'part','brand_name')
#print(df_words_clean.show(5))
# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_clean.withColumn("words_stemmed", stemmer_udf("words_clean")). \
    select('words_stemmed', 'polarity_type', 'part','brand_name')
    
#df_stemmed.show(2, truncate=False)

In [10]:
from pyspark.ml.feature import StringIndexer
# tf-idf
hashingTF = HashingTF(inputCol="words_stemmed", outputCol="rawFeatures") #generate vectors
featurizedData = hashingTF.transform(df_stemmed)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
#rescaledData.show()

#Add Brand_name -- group_index to the dataset
brandname_indexer = StringIndexer(inputCol="brand_name", outputCol="brandname_index")
df_brandname_indexed = brandname_indexer.fit(rescaledData).transform(rescaledData)

#Add polarity_type --polarity_index to the dataset
polarity_indexer = StringIndexer(inputCol="polarity_type", outputCol="polarity_index")
df_two_indexed = polarity_indexer.fit(df_brandname_indexed).transform(df_brandname_indexed) 
df_two_indexed.show()

+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+
|       words_stemmed|polarity_type|part|brand_name|         rawFeatures|            features|brandname_index|polarity_index|
+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+
|[asknationwid, he...|     negative|   1|    Hermes|(262144,[47552,51...|(262144,[47552,51...|            5.0|           2.0|
|[horribl, ugli, g...|     negative|   1|     Gucci|(262144,[22675,34...|(262144,[22675,34...|            0.0|           2.0|
|[hate, happen, gu...|     negative|   1|     Gucci|(262144,[72709,12...|(262144,[72709,12...|            0.0|           2.0|
|[gari, effin, old...|     negative|   1|     Prada|(262144,[3924,175...|(262144,[3924,175...|            3.0|           2.0|
|[newnew, vaporwav...|     negative|   1|     Fendi|(262144,[17734,20...|(262144,[17734,20...|            6.0|        

In [11]:
#Input features for Baynesian Methods is the combination of word counts and sentiments
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["rawFeatures","polarity_index"],outputCol="final_features")
assemblerData = assembler.transform(df_two_indexed)
assemblerData.show()

+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|       words_stemmed|polarity_type|part|brand_name|         rawFeatures|            features|brandname_index|polarity_index|      final_features|
+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|[asknationwid, he...|     negative|   1|    Hermes|(262144,[47552,51...|(262144,[47552,51...|            5.0|           2.0|(262145,[47552,51...|
|[horribl, ugli, g...|     negative|   1|     Gucci|(262144,[22675,34...|(262144,[22675,34...|            0.0|           2.0|(262145,[22675,34...|
|[hate, happen, gu...|     negative|   1|     Gucci|(262144,[72709,12...|(262144,[72709,12...|            0.0|           2.0|(262145,[72709,12...|
|[gari, effin, old...|     negative|   1|     Prada|(262144,[3924,175...|(262144,[3924,175...|            3.0|        

In [12]:
df_train = assemblerData.where("part = 1")
df_test = assemblerData.where("part = 0")
df_test.show()

+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|       words_stemmed|polarity_type|part|brand_name|         rawFeatures|            features|brandname_index|polarity_index|      final_features|
+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|[polo, time, brut...|     negative|   0|      Polo|(262144,[18536,43...|(262144,[18536,43...|            4.0|           2.0|(262145,[18536,43...|
|[herm, worst, del...|     negative|   0|    Hermes|(262144,[21823,11...|(262144,[21823,11...|            5.0|           2.0|(262145,[21823,11...|
|[check, polo, ral...|     negative|   0|      Polo|(262144,[14273,18...|(262144,[14273,18...|            4.0|           2.0|(262145,[14273,18...|
|[rt, streetstylep...|     negative|   0|    Chanel|(262144,[2317,162...|(262144,[2317,162...|            2.0|        

## Running Naive Bayes Models

### First Model: labelCol is brandname_index and input is the combination of word counts + polarity_type

In [13]:
# Naive Bayes classifier- first model
from pyspark.ml.classification import NaiveBayes
   
nb = NaiveBayes(labelCol="brandname_index",\
    featuresCol="final_features", smoothing=1.0,\
    modelType="multinomial")
model = nb.fit(df_train)

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(df_test)
predictions.select("polarity_type","brand_name", "brandname_index", 
    "probability", "prediction").show()
evaluator =\
    MulticlassClassificationEvaluator(labelCol="brandname_index",\
    predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-------------+----------+---------------+--------------------+----------+
|polarity_type|brand_name|brandname_index|         probability|prediction|
+-------------+----------+---------------+--------------------+----------+
|     negative|      Polo|            4.0|[0.06423671181093...|       4.0|
|     negative|    Hermes|            5.0|[1.78528608844415...|       5.0|
|     negative|      Polo|            4.0|[7.05890219120392...|       4.0|
|     negative|    Chanel|            2.0|[0.76436301294699...|       0.0|
|     negative|     Gucci|            0.0|[0.99847874282522...|       0.0|
|     negative|     Fendi|            6.0|[0.67166515625544...|       0.0|
|     negative|   Versace|            7.0|[0.67166515625544...|       0.0|
|     negative|     Prada|            3.0|[0.41127766385035...|       3.0|
|     negative|    Chanel|            2.0|[0.81652580577398...|       0.0|
|     negative|   Versace|            7.0|[0.99667385868837...|       0.0|
|     negative|    Chanel

### Second Model: labelCol is polarity_index and input is the combination of word counts + polarity_type

In [15]:
#Input features for Baynesian Methods is the combination of word counts and sentiments
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["rawFeatures","polarity_index"],outputCol="final_features")
assemblerData = assembler.transform(df_two_indexed)
#assemblerData.show()

In [16]:
df_train = assemblerData.where("part = 1")
df_test = assemblerData.where("part = 0")
df_test.show()

+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|       words_stemmed|polarity_type|part|brand_name|         rawFeatures|            features|brandname_index|polarity_index|      final_features|
+--------------------+-------------+----+----------+--------------------+--------------------+---------------+--------------+--------------------+
|[polo, time, brut...|     negative|   0|      Polo|(262144,[18536,43...|(262144,[18536,43...|            4.0|           2.0|(262145,[18536,43...|
|[herm, worst, del...|     negative|   0|    Hermes|(262144,[21823,11...|(262144,[21823,11...|            5.0|           2.0|(262145,[21823,11...|
|[check, polo, ral...|     negative|   0|      Polo|(262144,[14273,18...|(262144,[14273,18...|            4.0|           2.0|(262145,[14273,18...|
|[rt, streetstylep...|     negative|   0|    Chanel|(262144,[2317,162...|(262144,[2317,162...|            2.0|        

In [17]:
# Naive Bayes classifier- first model
from pyspark.ml.classification import NaiveBayes
   
nb = NaiveBayes(labelCol="polarity_index",\
    featuresCol="final_features", smoothing=1.0,\
    modelType="multinomial")
model = nb.fit(df_train)

In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(df_test)
predictions.select("polarity_type","brand_name", "polarity_index", 
    "probability", "prediction").show()
evaluator =\
    MulticlassClassificationEvaluator(labelCol="polarity_index",\
    predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-------------+----------+--------------+--------------------+----------+
|polarity_type|brand_name|polarity_index|         probability|prediction|
+-------------+----------+--------------+--------------------+----------+
|     negative|      Polo|           2.0|[6.98134381372224...|       1.0|
|     negative|    Hermes|           2.0|[7.40915045713100...|       1.0|
|     negative|      Polo|           2.0|[5.77339057567219...|       1.0|
|     negative|    Chanel|           2.0|[1.95452890835155...|       1.0|
|     negative|     Gucci|           2.0|[9.71870774473052...|       1.0|
|     negative|     Fendi|           2.0|[2.30365159947401...|       1.0|
|     negative|   Versace|           2.0|[2.30365159947401...|       1.0|
|     negative|     Prada|           2.0|[8.33852723625867...|       1.0|
|     negative|    Chanel|           2.0|[1.50780810357774...|       1.0|
|     negative|   Versace|           2.0|[2.39853841554240...|       1.0|
|     negative|    Chanel|           2

### Third Model: labelCol is polarity_index and input is word counts

In [19]:
# Naive Bayes classifier- first model
from pyspark.ml.classification import NaiveBayes
   
nb = NaiveBayes(labelCol="polarity_index",\
    featuresCol="features", smoothing=1.0,\
    modelType="multinomial")
model = nb.fit(df_train)

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(df_test)
predictions.select("polarity_type","brand_name", "polarity_index", 
    "probability", "prediction").show()
evaluator =\
    MulticlassClassificationEvaluator(labelCol="polarity_index",\
    predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-------------+----------+--------------+--------------------+----------+
|polarity_type|brand_name|polarity_index|         probability|prediction|
+-------------+----------+--------------+--------------------+----------+
|     negative|      Polo|           2.0|[3.49094951721355...|       2.0|
|     negative|    Hermes|           2.0|[2.74372952323274...|       2.0|
|     negative|      Polo|           2.0|[0.99933799049672...|       0.0|
|     negative|    Chanel|           2.0|[2.12425146849372...|       2.0|
|     negative|     Gucci|           2.0|[2.55025346740856...|       2.0|
|     negative|     Fendi|           2.0|[8.32191895103603...|       2.0|
|     negative|   Versace|           2.0|[8.32191895103603...|       2.0|
|     negative|     Prada|           2.0|[0.99999996684710...|       0.0|
|     negative|    Chanel|           2.0|[2.44948510931897...|       2.0|
|     negative|   Versace|           2.0|[7.89191800398136...|       2.0|
|     negative|    Chanel|           2

### Fourth Model: labelCol is brandname_index and input is word count 

In [21]:
# Naive Bayes classifier- first model
from pyspark.ml.classification import NaiveBayes
   
nb = NaiveBayes(labelCol="brandname_index",\
    featuresCol="features", smoothing=1.0,\
    modelType="multinomial")
model = nb.fit(df_train)

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(df_test)
predictions.select("polarity_type","brand_name", 
    "probability", "prediction").show()
evaluator =\
    MulticlassClassificationEvaluator(labelCol="brandname_index",\
    predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-------------+----------+--------------------+----------+
|polarity_type|brand_name|         probability|prediction|
+-------------+----------+--------------------+----------+
|     negative|      Polo|[5.13502975449590...|       7.0|
|     negative|    Hermes|[1.34003442010529...|       5.0|
|     negative|      Polo|[9.51750306823672...|       4.0|
|     negative|    Chanel|[1.39967076474221...|       2.0|
|     negative|     Gucci|[7.95698730216830...|       6.0|
|     negative|     Fendi|[0.73065398400100...|       0.0|
|     negative|   Versace|[0.73065398400100...|       0.0|
|     negative|     Prada|[0.99999486808278...|       0.0|
|     negative|    Chanel|[0.99999999954300...|       0.0|
|     negative|   Versace|[2.90388333498720...|       7.0|
|     negative|    Chanel|[1.80308467906069...|       2.0|
|     negative|     Prada|[0.99999999293150...|       0.0|
|     negative|    Chanel|[9.02171884292929...|       2.0|
|     negative|    Chanel|[1.71682763907247...|       2.

In [None]:
sc.stop()