In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('CoronavirusNLP').getOrCreate()

In [3]:
df=spark.read.csv('Corona_NLP_train.csv',header=True,inferSchema=True)

In [4]:
df.show(5)

+--------+------------+--------------------+----------+--------------------+---------+
|UserName|  ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|
+--------+------------+--------------------+----------+--------------------+---------+
|    3799|       48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|    3800|       48752|                  UK|16-03-2020|advice Talk to yo...| Positive|
|    3801|       48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|
|    3802|       48754|                null|16-03-2020|My food stock is ...|     null|
|  PLEASE| don't panic| THERE WILL BE EN...|      null|                null|     null|
+--------+------------+--------------------+----------+--------------------+---------+
only showing top 5 rows



In [5]:
df.columns

['UserName', 'ScreenName', 'Location', 'TweetAt', 'OriginalTweet', 'Sentiment']

In [6]:
print((df.count(),len(df.columns)))

(68046, 6)


In [7]:
#df.shape.show()

# Data Preparation

In [8]:
from pyspark.sql.functions import length

In [9]:
df=df.withColumn('Tweet_length', length(df['OriginalTweet']))

In [10]:
df.show(5)

+--------+------------+--------------------+----------+--------------------+---------+------------+
|UserName|  ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|Tweet_length|
+--------+------------+--------------------+----------+--------------------+---------+------------+
|    3799|       48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         111|
|    3800|       48752|                  UK|16-03-2020|advice Talk to yo...| Positive|         237|
|    3801|       48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|         131|
|    3802|       48754|                null|16-03-2020|My food stock is ...|     null|          51|
|  PLEASE| don't panic| THERE WILL BE EN...|      null|                null|     null|        null|
+--------+------------+--------------------+----------+--------------------+---------+------------+
only showing top 5 rows



In [11]:
sentiments=['Positive','Negative','Neutral','Extremely Positive','Extremely Negative']

In [12]:
data=df.filter(df.Sentiment.isin(sentiments))

In [13]:
data.select('Sentiment').distinct().show()

+------------------+
|         Sentiment|
+------------------+
|Extremely Negative|
|           Neutral|
|          Positive|
|          Negative|
|Extremely Positive|
+------------------+



In [14]:
data.select('Sentiment').distinct().count()

5

In [15]:
data.groupby('Sentiment').count().show()

+------------------+-----+
|         Sentiment|count|
+------------------+-----+
|Extremely Negative| 3751|
|           Neutral| 5224|
|          Positive| 7718|
|          Negative| 6857|
|Extremely Positive| 4412|
+------------------+-----+



In [16]:
data.show(5)

+--------+----------+--------------------+----------+--------------------+---------+------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|Tweet_length|
+--------+----------+--------------------+----------+--------------------+---------+------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|         237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|         131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|         249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|         184|
+--------+----------+--------------------+----------+--------------------+---------+------------+
only showing top 5 rows



In [17]:
print((data.count(),len(data.columns)))

(27962, 7)


In [18]:
from pyspark.sql.functions import isnan,when,count,col
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]
   ).show()

+--------+----------+--------+-------+-------------+---------+------------+
|UserName|ScreenName|Location|TweetAt|OriginalTweet|Sentiment|Tweet_length|
+--------+----------+--------+-------+-------------+---------+------------+
|       0|         0|    6152|      0|            0|        0|           0|
+--------+----------+--------+-------+-------------+---------+------------+



In [20]:
#df.groupby('Sentiment').count().show()

In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer,RegexTokenizer

In [27]:
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType
#regextokenizer=RegexTokenizer(inputCol="OriginalTweet", outputCol="words",pattern="\\W")
tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="token_text")
stopremove=StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf=IDF(inputCol="c_vec", outputCol="tf_idf")
#counttokens=udf(lambda words:len(words),IntegerType())

#Convert labels to numeric
labeltonum=StringIndexer(inputCol="Sentiment", outputCol="label")

In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [29]:
clean_up=VectorAssembler(inputCols=["tf_idf","Tweet_length"], outputCol="features")


In [30]:
#Model

In [31]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier

nb=NaiveBayes()
rf=RandomForestClassifier(numTrees=100)
dtc=DecisionTreeClassifier(maxDepth=15)

In [33]:
#Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline= Pipeline(stages=[labeltonum, tokenizer, stopremove,count_vec, idf,clean_up])

In [34]:
cleaner=data_prep_pipeline.fit(data)

In [35]:
clean_data=cleaner.transform(data)

In [36]:
clean_data.show()

+--------+----------+--------------------+----------+--------------------+------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|Tweet_length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+--------+----------+--------------------+----------+--------------------+------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|         111|  2.0|[@menyrbie, @phil...|[@menyrbie, @phil...|(78305,[12124,289...|(78305,[12124,289...|(78306,[12124,289...|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|         237|  0.0|[advice, talk, t

In [37]:
clean_data=clean_data.select(['label', 'features'])

In [38]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(78306,[12124,289...|
|  0.0|(78306,[13,14,133...|
|  0.0|(78306,[8,14,37,7...|
|  0.0|(78306,[7,8,31,47...|
|  0.0|(78306,[3,6,18,60...|
|  0.0|(78306,[1,6,8,13,...|
|  1.0|(78306,[11,13,14,...|
|  2.0|(78306,[48,70,149...|
|  3.0|(78306,[13,14,23,...|
|  0.0|(78306,[8,10,23,5...|
|  0.0|(78306,[4,8,24,38...|
|  4.0|(78306,[1,4,9,11,...|
|  1.0|(78306,[4,21,44,7...|
|  3.0|(78306,[10,37,54,...|
|  1.0|(78306,[4,8,24,33...|
|  4.0|(78306,[1,7,11,36...|
|  1.0|(78306,[1,4,7,34,...|
|  2.0|(78306,[5,47,48,6...|
|  0.0|(78306,[8,12,23,2...|
|  1.0|(78306,[6,28,33,9...|
+-----+--------------------+
only showing top 20 rows



In [39]:
#Split
(training, testing)=clean_data.randomSplit([0.7,0.3])

In [40]:
sentiment_type=dtc.fit(training)

Py4JJavaError: An error occurred while calling o148.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 57.0 failed 1 times, most recent failure: Lost task 2.0 in stage 57.0 (TID 689, master, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3682/1391255814.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:334)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3663/1325188422.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.RDD$$Lambda$2260/254281789.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2042/261322767.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:663)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:208)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:302)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.$anonfun$train$1(DecisionTreeClassifier.scala:135)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:114)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3682/1391255814.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:334)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3663/1325188422.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.RDD$$Lambda$2260/254281789.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2042/261322767.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
test_results=spam_predictor.transform(testing)

In [None]:
test_results.show()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)

In [None]:
print ("Accuracy of the model is::", acc)

In [28]:
#tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="tokenized_text")
#counttokens=udf(lambda tokenized_text:len(tokenized_text),IntegerType())

In [66]:
#tokenized.select("OriginalTweet","words").withColumn("NoOfWords",counttokens(col("words"))).show(10)

+--------------------+--------------------+---------+
|       OriginalTweet|               words|NoOfWords|
+--------------------+--------------------+---------+
|@MeNyrbie @Phil_G...|[menyrbie, phil_g...|       17|
|advice Talk to yo...|[advice, talk, to...|       38|
|Coronavirus Austr...|[coronavirus, aus...|       18|
|As news of the re...|[as, news, of, th...|       41|
|"Cashier at groce...|[cashier, at, gro...|       33|
|Due to COVID-19 o...|[due, to, covid, ...|       50|
|For corona preven...|[for, corona, pre...|       44|
|All month there h...|[all, month, ther...|       43|
|#horningsea is a ...|[horningsea, is, ...|       48|
|ADARA Releases CO...|[adara, releases,...|       30|
+--------------------+--------------------+---------+
only showing top 10 rows



In [69]:
#data.show(5)

+--------+----------+--------------------+----------+--------------------+---------+------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|Tweet_length|
+--------+----------+--------------------+----------+--------------------+---------+------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|         237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|         131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|         249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|         184|
+--------+----------+--------------------+----------+--------------------+---------+------------+
only showing top 5 rows



In [64]:
#tokenized.select("OriginalTweet","words").withColumn("NoOfWords",counttokens(col("words"))).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|OriginalTweet                                                                                                                                                                                                                                                                                   |words                                                                        