### Dataset
https://www.kaggle.com/datasets/praveengovi/emotions-dataset-for-nlp

In [None]:
# # install dependencies
# !pip install pyspark
# !pip install pandas

In [50]:
import pandas as pd

# set panas to print full text
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)

import jupyter_black

jupyter_black.load()

# hide pyspark warnings
import warnings

warnings.filterwarnings("ignore")

In [51]:
# read the data which is in txt format
df = pd.read_csv("train.txt", names=["text", "emotion"], delimiter=";")

In [52]:
# let's save the data in csv format with header
df.to_csv("train.csv", index=False, header=True)

In [53]:
# print the first 2 rows
df.head(2)

Unnamed: 0,text,emotion
0,i didnt feel humiliated,sadness
1,i can go from feeling so hopeless to so damned hopeful just from being around someone who cares and is awake,sadness


In [54]:
# import all the required libraries for creating spark session
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

import warnings

warnings.filterwarnings("ignore")

In [56]:
# see the existing spark session
spark = SparkSession.builder.getOrCreate()
spark

In [57]:
# stop the existing spark session
spark.stop()

In [58]:
# Create a SparkContext instance
spark_context = SparkContext(master="local")

# Creating a spark session.
spark = SparkSession.builder.appName("Emotion Detection").getOrCreate()
print(spark_context)

<SparkContext master=local appName=pyspark-shell>


In [59]:
spark_context

In [60]:
# read the train.csv file
df = spark.read.csv("train.csv", header=True, inferSchema=True)

# print the schema
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- emotion: string (nullable = true)



In [61]:
# display the first 2 rows
df.show(2)

+--------------------+-------+
|                text|emotion|
+--------------------+-------+
|i didnt feel humi...|sadness|
|i can go from fee...|sadness|
+--------------------+-------+
only showing top 2 rows



In [62]:
# show how many null values are there in each column using pandas
df.toPandas().isnull().sum()

text       0
emotion    0
dtype: int64

In [63]:
# display the data distribution of each emotion
df.groupBy("emotion").count().show()

+--------+-----+
| emotion|count|
+--------+-----+
|     joy| 5362|
|    love| 1304|
|   anger| 2159|
|    fear| 1937|
|surprise|  572|
| sadness| 4666|
+--------+-----+



In [65]:
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    CountVectorizer,
    IDF,
    StringIndexer,
)

In [67]:
# tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [68]:
# remove the stop words
stopwords_remover = StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), outputCol="filtered"
)

# convert the words to vectors
count_vectorizer = CountVectorizer(
    inputCol=stopwords_remover.getOutputCol(), outputCol="raw_features"
)

# calculate the idf
idf = IDF(inputCol=count_vectorizer.getOutputCol(), outputCol="features")


# convert the emotion column to label
label_stringIdx = StringIndexer(inputCol="emotion", outputCol="label")

# import the required libraries for creating the pipeline
from pyspark.ml import Pipeline

# create the pipeline
pipeline = Pipeline(
    stages=[tokenizer, stopwords_remover, count_vectorizer, idf, label_stringIdx]
)

# fit the pipeline to the data
pipeline_fit = pipeline.fit(df)

# transform the data
dataset = pipeline_fit.transform(df)

# display the first 2 rows
dataset.show(2)

+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
|                text|emotion|               words|            filtered|        raw_features|            features|label|
+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
|i didnt feel humi...|sadness|[i, didnt, feel, ...|[didnt, feel, hum...|(15082,[0,48,567]...|(15082,[0,48,567]...|  1.0|
|i can go from fee...|sadness|[i, can, go, from...|[go, feeling, hop...|(15082,[1,29,42,5...|(15082,[1,29,42,5...|  1.0|
+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 2 rows



In [69]:
# show which joy belongs to which encoded label
dataset.select("emotion", "label").distinct().show()

+--------+-----+
| emotion|label|
+--------+-----+
|    love|  4.0|
|     joy|  0.0|
|surprise|  5.0|
|   anger|  2.0|
| sadness|  1.0|
|    fear|  3.0|
+--------+-----+



In [70]:
# split the data into train and test
train, test = dataset.randomSplit([0.7, 0.3], seed=100)

# print the number of rows in train and test
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 11246
Test Dataset Count: 4754


In [72]:
# build the model
from pyspark.ml.classification import NaiveBayes

# create the model
nb = NaiveBayes(featuresCol="features", labelCol="label", predictionCol="prediction")

# fit the model to the train data
model = nb.fit(train)

# predict the test data
predictions = model.transform(test)

In [73]:
# find the test accuracy and f1 score
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

# calculate the accuracy
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

# create the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

# calculate the f1 score
f1_score = evaluator.evaluate(predictions)
print("Test set f1 score = " + str(f1_score))

22/12/09 17:06:23 WARN DAGScheduler: Broadcasting large task binary with size 1199.2 KiB
Test set accuracy = 0.6903660075725705
22/12/09 17:06:24 WARN DAGScheduler: Broadcasting large task binary with size 1199.2 KiB
Test set f1 score = 0.703776482543677


In [76]:
print(predictions.columns)

['text', 'emotion', 'words', 'filtered', 'raw_features', 'features', 'label', 'rawPrediction', 'probability', 'prediction']


In [77]:
# print the emotion, label, prediction
predictions.select("emotion", "label", "prediction").show(5)

22/12/09 17:09:53 WARN DAGScheduler: Broadcasting large task binary with size 1183.7 KiB
+-------+-----+----------+
|emotion|label|prediction|
+-------+-----+----------+
|  anger|  2.0|       3.0|
|  anger|  2.0|       0.0|
|  anger|  2.0|       5.0|
|    joy|  0.0|       3.0|
|   fear|  3.0|       5.0|
+-------+-----+----------+
only showing top 5 rows



In [80]:
# show some predictions that are correct display only the emotion, label, prediction
predictions.filter(predictions.label == predictions.prediction).select(
    "emotion", "label", "prediction"
).show(5)

22/12/09 17:11:01 WARN DAGScheduler: Broadcasting large task binary with size 1192.8 KiB
+-------+-----+----------+
|emotion|label|prediction|
+-------+-----+----------+
|   fear|  3.0|       3.0|
|   fear|  3.0|       3.0|
|sadness|  1.0|       1.0|
|    joy|  0.0|       0.0|
|  anger|  2.0|       2.0|
+-------+-----+----------+
only showing top 5 rows



In [81]:
# how many predictions are correct and how many are wrong
predictions.filter(predictions.label == predictions.prediction).count()

22/12/09 17:11:36 WARN DAGScheduler: Broadcasting large task binary with size 1194.8 KiB


3282

In [82]:
# how many predictions are correct and how many are wrong
predictions.filter(predictions.label != predictions.prediction).count()

22/12/09 17:11:40 WARN DAGScheduler: Broadcasting large task binary with size 1195.0 KiB


1472

In [83]:
# what's the test set size
test.count()

4754

In [84]:
# Let's try Logistic Regression
from pyspark.ml.classification import LogisticRegression

# create the model
lr = LogisticRegression(
    featuresCol="features", labelCol="label", predictionCol="prediction"
)

# fit the model to the train data
model = lr.fit(train)

# predict the test data
predictions = model.transform(test)

# create the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

# calculate the accuracy
accuracy = evaluator.evaluate(predictions)

# print the accuracy
print("Test set accuracy = " + str(accuracy))

# create the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

# calculate the f1 score
f1_score = evaluator.evaluate(predictions)

# print the f1 score
print("Test set f1 score = " + str(f1_score))

22/12/09 17:14:00 WARN DAGScheduler: Broadcasting large task binary with size 1206.4 KiB
Test set accuracy = 0.8300378628523348
22/12/09 17:14:00 WARN DAGScheduler: Broadcasting large task binary with size 1206.4 KiB
Test set f1 score = 0.8317629919611031
