## Create Spark session

In [1]:
from pyspark.sql import SparkSession

In [4]:
sparkSession = SparkSession.builder.appName('NLP_Tweets').getOrCreate()

## Import dataset from HDFS

In [9]:
df = sparkSession.read.csv('hdfs:///user/root/feed_tweets/CleanTweets.csv', inferSchema=True, header=True)

In [13]:
df.printSchema()

root
 |-- Text: string (nullable = true)
 |-- Sentiment: integer (nullable = true)



In [14]:
df = df.dropna(subset=('Text'))

## Format model input (tokenize, vectorize, assemble)

In [17]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, VectorAssembler

In [23]:
tokenization = Tokenizer(inputCol='Text',outputCol='tokens')
tokenized_df = tokenization.transform(df)

In [24]:
tokenized_df.printSchema()

root
 |-- Text: string (nullable = true)
 |-- Sentiment: integer (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [37]:
cvectorizer = CountVectorizer(inputCol='tokens', outputCol='features')
fit_cvectorizer = cvectorizer.fit(tokenized_df)
vectorized_df = fit_cvectorizer.transform(tokenized_df)
vectorized_df = vectorized_df.select(['features', 'Sentiment'])

In [38]:
vectorized_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Sentiment: integer (nullable = true)



In [40]:
vassembler = VectorAssembler(inputCols=['features'], outputCol='assembled_features')
assembled_df = vassembler.transform(vectorized_df)

In [41]:
assembled_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Sentiment: integer (nullable = true)
 |-- assembled_features: vector (nullable = true)



## Modelling

In [31]:
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel

In [42]:
model = NaiveBayes(featuresCol='assembled_features',labelCol='Sentiment').fit(assembled_df)

In [43]:
predictions = model.transform(assembled_df)

## Evaluation

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [52]:
evaluator = MulticlassClassificationEvaluator(labelCol='Sentiment', 
                                              predictionCol='prediction', 
                                              metricName='accuracy')

accuracy = evaluator.evaluate(predictions)

print(f'Accuracy: {accuracy}')

Accuracy: 0.8752442791401375


In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [51]:
true_postives = predictions[(predictions.Sentiment == 1) & (predictions.prediction == 1)].count()
true_negatives = predictions[(predictions.Sentiment == 0) & (predictions.prediction == 0)].count()
false_positives = predictions[(predictions.Sentiment == 0) & (predictions.prediction == 1)].count()
false_negatives = predictions[(predictions.Sentiment == 1) & (predictions.prediction == 0)].count()

recall = float(true_postives)/(true_postives+false_negatives)
precision = float(true_postives)/(true_postives+false_positives)
accuracy = float((true_postives+true_negatives)/(predictions.count()))

print(f'Recall: {recall}')
print(f'Precision: {precision}')
print(f'Accuracy: {accuracy}')

Recall: 0.9349863566259343
Precision: 0.8463273195876289
Accuracy: 0.8752442791401375
