# Spark ML sentiment analysis using Naive Bayes model. Dataset: https://github.com/fnielsen/afinn/blob/master/afinn/data/AFINN-111.txt

In [131]:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, NGram, HashingTF, IDF
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.classification import NaiveBayes
import re

In [132]:
conf = SparkConf().setAppName("Naive Bayes")
sc = SparkContext.getOrCreate()
sqlContext = SparkSession.builder.getOrCreate()

In [133]:
train = [ws.strip().split('\t') for ws in open('tweets.txt')]
training_tweets = [(train[i][0], int(train[i + 1][0])) for i in range(0, len(train) - 1, 2)]

In [134]:
dataset = sqlContext.createDataFrame(training_tweets, ["sentence", "label"])

In [135]:
dataset.show(5)

+--------------------+-----+
|            sentence|label|
+--------------------+-----+
|Gas by my house h...|    1|
|Theo Walcott is s...|   -1|
|its not that I'm ...|   -1|
|Iranian general s...|   -1|
|with J Davlar 11t...|    1|
+--------------------+-----+
only showing top 5 rows



In [137]:
# tokenize tweets
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')
countTokens = udf(lambda w: len(w), IntegerType())
tokenized = tokenizer.transform(dataset)

In [138]:
# remove basic stopwords
remover = StopWordsRemover(inputCol='words', outputCol='filtered')
dataset = remover.transform(tokenized)

In [139]:
# drop columns that we don't need in the future
dataset = dataset.drop('sentence', 'words')

In [140]:
# remove URLs, emoticons, hashtags, mentions, RT

pattern = re.compile('[\d|)(:\(|:\))+$]')

def remove_punctuation(word):
    return word.translate(str.maketrans('', '', string.punctuation))

def is_unwanted_word(word):
    if '@' in word:
        return True
    elif '#' in word:
        return True
    elif 'http://' in word:
        return True
    elif 'https://' in word:
        return True
    elif word == 'RT':
        return True
    elif pattern.match(word):
        return True
    return False

def remove_unwanted_values(values):
    return [remove_punctuation(x) for x in values if not is_unwanted_word(x)]
    

map_udf = udf(remove_unwanted_values, ArrayType(StringType()))

In [141]:
# apply the UDF that replaces punctuation marks and removes mentions, hashtags, links
dataset = dataset.withColumn('filtered', map_udf(dataset.filtered))

In [143]:
# create bigrams
ngram = NGram(n=2, inputCol='filtered', outputCol='bigrams')
dataset = ngram.transform(dataset)

In [154]:
# use tf-idf
hashingTF = HashingTF(inputCol='bigrams', outputCol='TF', numFeatures=20000)
tf_df = hashingTF.transform(dataset)

idf = IDF(inputCol='TF', outputCol='TF-IDF')
idfModel = idf.fit(tf_df)
idf_df = idfModel.transform(tf_df)

In [161]:
# Convert labels to sparse vectors, that are needed by the classifer
train_dataset = tf_df.rdd.map(lambda row: LabeledPoint(float(row.label), Vectors.fromML(row.TF)))

# split data into training and test sets
train, test = train_dataset.randomSplit([0.8, 0.2])

In [162]:
# train model
model = NaiveBayes.train(train, 1.0)

In [163]:
# predict labels on test set
pred_label = test.map(lambda x: (x.label, model.predict(x.features)))
# calculate accuracy
acc = 1.0 * pred_label.filter(lambda x: x[0] == x[1]).count() / test.count()

print('Model accuracy: ', acc)

Model accuracy:  0.4731750219876869
