In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('corona').getOrCreate()

In [3]:
df=spark.read.csv('NLP_Corona_train.csv', header = True, inferSchema=True,sep= ',')

In [4]:
df.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|            UserName|          ScreenName|            Location|             TweetAt|         Sentiment|       OriginalTweet|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|                3799|               48751|              London|          16-03-2020|           Neutral|@MeNyrbie @Phil_G...|
|                3800|               48752|                  UK|          16-03-2020|          Positive|advice Talk to yo...|
|                3801|               48753|           Vagabonds|          16-03-2020|          Positive|Coronavirus Austr...|
|                3802|               48754|                null|          16-03-2020|          Positive|My food stock is ...|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|              null|              

In [5]:
df = df.dropDuplicates()
print(df.count(),",",len(df.columns))

65074 , 6


In [6]:
df = df.na.drop()
print(df.count(),",",len(df.columns))

32621 , 6


In [7]:
df.printSchema()

root
 |-- UserName: string (nullable = true)
 |-- ScreenName: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- TweetAt: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- OriginalTweet: string (nullable = true)



In [8]:
df.columns

['UserName', 'ScreenName', 'Location', 'TweetAt', 'Sentiment', 'OriginalTweet']

In [9]:
sentiments1 = ['Positive','Negative','Neutral','Extremely Positive','Extremely Negative']

In [10]:
df = df.filter(df.Sentiment.isin(sentiments1))

In [11]:
df.select('Sentiment').distinct().count()

5

In [12]:
df.select('Sentiment').distinct().show()

+------------------+
|         Sentiment|
+------------------+
|Extremely Negative|
|           Neutral|
|          Positive|
|          Negative|
|Extremely Positive|
+------------------+



# Preparation

In [13]:
from pyspark.sql.functions import length

In [14]:
df=df.withColumn('length', length(df['OriginalTweet']))

In [15]:
df.show()

+--------+----------+--------------------+----------+------------------+--------------------+------+
|UserName|ScreenName|            Location|   TweetAt|         Sentiment|       OriginalTweet|length|
+--------+----------+--------------------+----------+------------------+--------------------+------+
|    3863|     48815|              Boston|16-03-2020|           Neutral|What 2K Consumers...|   110|
|    4088|     49040|     California, USA|16-03-2020|          Positive|Apple has closed ...|   136|
|    4321|     49273|       Lake Mary, FL|16-03-2020|           Neutral|M-C: Political Tr...|   111|
|    4411|     49363| South Carolina, USA|16-03-2020|          Negative|Coronavirus Live ...|    67|
|    4459|     49411|      United Kingdom|17-03-2020|          Negative|Queues to buy gun...|   186|
|    4789|     49741|That's me in the ...|17-03-2020|          Positive|Boris Johnson the...|   212|
|    5004|     49956|        Upstate, NY |17-03-2020|           Neutral|This country need..

In [16]:
df=df.withColumnRenamed("Sentiment","sentiment1")

In [17]:
df.groupby('Sentiment1').mean().show()

+------------------+------------------+
|        Sentiment1|       avg(length)|
+------------------+------------------+
|Extremely Negative|179.08476571697668|
|           Neutral|134.06076810889644|
|          Positive| 167.5731693929081|
|          Negative|165.74478227261014|
|Extremely Positive|183.49146433990896|
+------------------+------------------+



In [18]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="token_text")
stopremove=StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf=IDF(inputCol="c_vec", outputCol="tf_idf")

# we also need to convert our labels in numbers
ham_samp_to_num = StringIndexer(inputCol="sentiment1", outputCol='label')

In [19]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [20]:
clean_up1 = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

# Model

In [21]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier

nb=NaiveBayes()
rf=RandomForestClassifier(numTrees=200)
dtc=DecisionTreeClassifier(maxDepth=15)

# Pipeline

In [22]:
from pyspark.ml import Pipeline
df_prep_pipeline= Pipeline(stages=[ham_samp_to_num, tokenizer, stopremove,count_vec, idf,clean_up1])

In [23]:
cleaner1=df_prep_pipeline.fit(df)

In [24]:
clean_df=cleaner1.transform(df)

In [25]:
clean_df.show()

+--------+----------+--------------------+----------+------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|UserName|ScreenName|            Location|   TweetAt|        sentiment1|       OriginalTweet|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+--------+----------+--------------------+----------+------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3863|     48815|              Boston|16-03-2020|           Neutral|What 2K Consumers...|   110|  2.0|[what, 2k, consum...|[2k, consumers, t...|(80619,[6,48,135,...|(80619,[6,48,135,...|(80620,[6,48,135,...|
|    4088|     49040|     California, USA|16-03-2020|          Positive|Apple has closed ...|   136|  0.0|[apple, has, clos...|[apple, closed, r...|(806

In [27]:
clean_df=clean_df.select(['label', 'features'])

In [28]:
clean_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(80620,[6,48,135,...|
|  0.0|(80620,[7,59,165,...|
|  2.0|(80620,[6,9,774,8...|
|  1.0|(80620,[6,32,73,2...|
|  1.0|(80620,[1,4,8,17,...|
|  0.0|(80620,[8,18,56,1...|
|  2.0|(80620,[0,3,7,12,...|
|  4.0|(80620,[4,23,2743...|
|  0.0|(80620,[0,11,13,3...|
|  1.0|(80620,[17,18,20,...|
|  1.0|(80620,[1,33,60,6...|
|  0.0|(80620,[21,43,53,...|
|  2.0|(80620,[21,297,34...|
|  4.0|(80620,[1,2,4,27,...|
|  3.0|(80620,[0,28,32,3...|
|  2.0|(80620,[9,108,270...|
|  2.0|(80620,[36,95,97,...|
|  3.0|(80620,[6,7,32,67...|
|  2.0|(80620,[3,6,7,228...|
|  4.0|(80620,[3,4,7,61,...|
+-----+--------------------+
only showing top 20 rows



# Training

In [29]:
(training, testing)=clean_df.randomSplit([0.75,0.25])

In [30]:
spam_predictor1=dtc.fit(training)

In [31]:
test_results1=spam_predictor1.transform(testing)

In [32]:
test_results1.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(80620,[0,2,11,25...|[2355.0,2300.0,13...|[0.28740541859897...|       0.0|
|  0.0|(80620,[5,18,55,1...|[151.0,31.0,5.0,4...|[0.61632653061224...|       0.0|
|  0.0|(80620,[5,92,477,...|[2239.0,1875.0,27...|[0.26046998604001...|       2.0|
|  0.0|(80620,[7,59,165,...|[2239.0,1875.0,27...|[0.26046998604001...|       2.0|
|  0.0|(80620,[8,18,56,1...|[229.0,103.0,21.0...|[0.43207547169811...|       0.0|
|  0.0|(80620,[9,12,17,5...|[2355.0,2300.0,13...|[0.28740541859897...|       0.0|
|  0.0|(80620,[16,63,124...|[2239.0,1875.0,27...|[0.26046998604001...|       2.0|
|  0.0|(80620,[24,37,54,...|[0.0,0.0,0.0,7.0,...|[0.0,0.0,0.0,1.0,...|       3.0|
|  0.0|(80620,[33,2064,2...|[2239.0,1875.0,27...|[0.26046998604001...|       2.0|
|  0.0|(80620,[9

In [33]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [34]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results1)

In [35]:
print ("Accuracy of the model is::", acc)

Accuracy of the model is:: 0.3381217557842695
