In [1]:
from __future__ import print_function
import json
import re
import string
import numpy as np
import nltk, re
import pandas as pd
import sparknlp
import os
import seaborn as sns
from pyspark.sql import SparkSession
from nltk.corpus import stopwords
from nltk.corpus import wordnet
from pyspark import SparkContext, SparkConf
from pyspark import SQLContext
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import functions as fn
import time
from pyspark.sql.functions import udf, to_date, date_format
from pyspark.sql.types import *
from nltk.stem import WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()
#nltk.download('wordnet')

# Trump

In [2]:
conf = SparkConf().setAppName("sentiment_analysis")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql.functions import col
def clean(x):
#      remove the retweeted tweet
     x=str(x)
     x = re.sub(r'^RT[\s]+','',x)
#      Replace #word with word
     x = re.sub(r'#', '', x)
#      Convert @username to empty strings
     x= re.sub('@[^\s]+', '', x) 
#       Convert www.* or https?://* to empty strings
     x= re.sub('((www\.[^\s]+)|(https?://[^\s]+))', '',x)
#     Remove all characters which are not alphabets, numbers or whitespaces.
     x = re.sub('[^A-Za-z0-9 ]+','', x)
     return x
udfclean=udf(clean,StringType())

def lemmatizer(tweet):
    word_list = []
    for word in tweet.split():
        word_list.append(wordnet_lemmatizer.lemmatize(word))
    return (" ".join(word_list))    
udflemmatizer=udf(lemmatizer,StringType())
def sample(x):
    return 5000/x
udfsample=udf(sample,FloatType())

In [4]:
trump_tweet=sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('trump.csv')
trump_tweet=trump_tweet.dropna()
trump_tweet=trump_tweet.distinct()
trump=trump_tweet.filter(fn.col("time").isin(['2020-10-15','2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23',
                                             '2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03']))
trump.show(10)

+----------+---------------+--------------------+
|      time|           user|                text|
+----------+---------------+--------------------+
|2020-10-16|        sjrlady|@abc are you goin...|
|2020-10-16|       jayjer24|what's up @realdo...|
|2020-10-16|    SheilaYehle|last night in eas...|
|2020-10-16|    geurtje1914|@realdonaldtrump ...|
|2020-10-16|KyleCoo32421721|#crookedjoebiden ...|
|2020-10-16|DIDNOTVOTEFOR44|president trump w...|
|2020-10-16|    lamantide71|@realdonaldtrump ...|
|2020-10-16|  ElianaBenador|is it #sleepyjoe ...|
|2020-10-16| alexander12ray|@realdonaldtrump ...|
|2020-10-16|  RealCaryPrice|https://t.co/iybd...|
+----------+---------------+--------------------+
only showing top 10 rows



In [5]:
count=trump.groupBy('time').agg(fn.count('*').alias('count')).orderBy(trump.time.asc())
count=count.withColumn('fract',udfsample('count'))
count.show()

+----------+-----+----------+
|      time|count|     fract|
+----------+-----+----------+
|2020-10-15| 8851| 0.5649079|
|2020-10-16| 6995| 0.7147963|
|2020-10-17| 9476| 0.5276488|
|2020-10-18| 9686|0.51620895|
|2020-10-19|12542| 0.3986605|
|2020-10-20|14592| 0.3426535|
|2020-10-21|10420|0.47984645|
|2020-10-22|35188| 0.1420939|
|2020-10-23|13313|0.37557274|
|2020-10-24|17364| 0.2879521|
|2020-10-25| 9782| 0.5111429|
|2020-10-26|31814|0.15716352|
|2020-10-27|15701|0.31845105|
|2020-10-28|11031|0.45326805|
|2020-10-29| 8254| 0.6057669|
|2020-10-30|12390|0.40355125|
|2020-10-31|12567|0.39786744|
|2020-11-01| 7925| 0.6309148|
|2020-11-02|13036|0.38355324|
|2020-11-03|13115|0.38124284|
+----------+-----+----------+



In [6]:
fractions = count.select("time",'fract').rdd.collectAsMap()
print(fractions)

{'2020-10-15': 0.5649079084396362, '2020-10-16': 0.7147963047027588, '2020-10-17': 0.5276488065719604, '2020-10-18': 0.5162089467048645, '2020-10-19': 0.3986605107784271, '2020-10-20': 0.3426535129547119, '2020-10-21': 0.47984644770622253, '2020-10-22': 0.14209389686584473, '2020-10-23': 0.37557274103164673, '2020-10-24': 0.28795209527015686, '2020-10-25': 0.511142909526825, '2020-10-26': 0.15716351568698883, '2020-10-27': 0.31845104694366455, '2020-10-28': 0.45326805114746094, '2020-10-29': 0.6057668924331665, '2020-10-30': 0.40355125069618225, '2020-10-31': 0.39786744117736816, '2020-11-01': 0.6309148073196411, '2020-11-02': 0.38355323672294617, '2020-11-03': 0.3812428414821625}


In [7]:
trump=trump.sampleBy('time',fractions)

In [8]:
print(trump.count())
trump.show()

100159
+----------+---------------+--------------------+
|      time|           user|                text|
+----------+---------------+--------------------+
|2020-10-16|        sjrlady|@abc are you goin...|
|2020-10-16|       jayjer24|what's up @realdo...|
|2020-10-16|    SheilaYehle|last night in eas...|
|2020-10-16|    geurtje1914|@realdonaldtrump ...|
|2020-10-16|DIDNOTVOTEFOR44|president trump w...|
|2020-10-16|    lamantide71|@realdonaldtrump ...|
|2020-10-16|  ElianaBenador|is it #sleepyjoe ...|
|2020-10-16| alexander12ray|@realdonaldtrump ...|
|2020-10-16|  RealCaryPrice|https://t.co/iybd...|
|2020-10-16|       jim62192|when have you eve...|
|2020-10-16|DaleFer99092112|@realdonaldtrump ...|
|2020-10-16|   jmmtcarvalho|looking good. #st...|
|2020-10-16|        jos1963|@dickberlijn dick...|
|2020-10-16|     BrownFayeM|kinda what hillar...|
|2020-10-16|EmmanuelKatoto1|@realdonaldtrump ...|
|2020-10-16|  LuanneAWilson|@realdonaldtrump ...|
|2020-10-16|   TheChosenOn8|although the ti

In [9]:
start_time=time.time()
trump=trump.withColumn('clean_words',udfclean('text'))
trump=trump.withColumn('newText',udflemmatizer('clean_words'))
#trump=trump.withColumn('clean_words',udfclean('text'))
trump=trump.select('newText')

In [10]:
stopwords = [u'rt',u're', u'i', u'me', u'my', u'myself', u'we', u'our', u'ours', u'ourselves', u'you', u'your',
             u'yours', u'yourself', u'yourselves', u'he', u'him', u'his', u'himself', u'she', u'her', u'hers',
             u'herself', u'it', u'its', u'itself', u'they', u'them', u'their', u'theirs', u'themselves', u'what',
             u'which', u'who', u'whom', u'this', u'that', u'these', u'those', u'am', u'is', u'are', u'was', u'were',
             u'be', u'been', u'being', u'have', u'has', u'had', u'having', u'do', u'does', u'did', u'doing', u'a',
             u'an', u'the', u'and', u'but', u'if', u'or', u'because', u'as', u'until', u'while', u'of', u'at', u'by',
             u'for', u'with', u'about', u'against', u'between', u'into', u'through', u'during', u'before', u'after',
             u'above', u'below', u'to', u'from', u'up', u'down', u'in', u'out', u'on', u'off', u'over', u'under',
             u'again', u'further', u'then', u'once', u'here', u'there', u'when', u'where', u'why', u'how', u'all',
             u'any', u'both', u'each', u'few', u'more', u'most', u'other', u'some', u'such', u'no', u'nor', u'not',
             u'only', u'own', u'same', u'so', u'than', u'too', u'very', u's', u't', u'can', u'will', u'just', u'don',
             u'should', u'now']

In [11]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="newText",outputCol='words')
sw_filter = StopWordsRemover()\
  .setStopWords(stopwords)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")
#trump=trump.select('filtered')
end_time=time.time()
print('The preopocessing time :',end_time-start_time)
#countVectors = CountVectorizer(inputCol="filtered", outputCol="features",
#vocabSize=10000, minDF=5)
start_time=time.time()
hashtf = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
end_time=time.time()
print('The tf-idf time :',end_time-start_time)
#label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer,sw_filter, hashtf,idf])
pipelineFit = pipeline.fit(trump)
trump = pipelineFit.transform(trump)
trump.show(5)

The preopocessing time : 0.16986775398254395
The tf-idf time : 0.03187274932861328
+--------------------+--------------------+--------------------+--------------------+--------------------+
|             newText|               words|            filtered|                  tf|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+
| are you going to...|[, are, you, goin...|[, going, factche...|(65536,[9191,1548...|(65536,[9191,1548...|
|           whats up |         [whats, up]|             [whats]|(65536,[48531],[1...|(65536,[48531],[5...|
|last night in eas...|[last, night, in,...|[last, night, eas...|(65536,[4166,5381...|(65536,[4166,5381...|
| theres only one ...|[, theres, only, ...|[, theres, one, a...|(65536,[16499,218...|(65536,[16499,218...|
|president trump w...|[president, trump...|[president, trump...|(65536,[14524,221...|(65536,[14524,221...|
+--------------------+--------------------+------------------

#  Load Sentiment Data

In [12]:
sentiment=sqlContext.read.format('com.databricks.spark.csv').options(header='True', inferschema='true',sep='\t').load('AFINN.txt')

In [13]:
sentiment.show(2)

+---------+---------+
|     word|sentiment|
+---------+---------+
|abandoned|       -2|
| abandons|       -2|
+---------+---------+
only showing top 2 rows



In [14]:
sentiment.printSchema()

root
 |-- word: string (nullable = true)
 |-- sentiment: integer (nullable = true)



In [15]:
sentiment.where(fn.col('sentiment') == 1).show(5)

+---------+---------+
|     word|sentiment|
+---------+---------+
|   aboard|        1|
| absorbed|        1|
|   accept|        1|
| accepted|        1|
|accepting|        1|
+---------+---------+
only showing top 5 rows



In [16]:
sentiment.where(fn.col('sentiment') == -1).show(5)

+---------+---------+
|     word|sentiment|
+---------+---------+
| absentee|       -1|
|absentees|       -1|
|    admit|       -1|
|   admits|       -1|
| admitted|       -1|
+---------+---------+
only showing top 5 rows



In [17]:
sentiment.groupBy('sentiment').agg(fn.count('*').alias('count')).orderBy(sentiment.sentiment.asc()).show()

+---------+-----+
|sentiment|count|
+---------+-----+
|       -5|   16|
|       -4|   43|
|       -3|  264|
|       -2|  965|
|       -1|  309|
|        0|    1|
|        1|  208|
|        2|  448|
|        3|  172|
|        4|   45|
|        5|    5|
+---------+-----+



In [18]:
trump.select('filtered', fn.explode('filtered').alias('word')).show(10)

+--------------------+--------------------+
|            filtered|                word|
+--------------------+--------------------+
|[, going, factche...|                    |
|[, going, factche...|               going|
|[, going, factche...|           factcheck|
|[, going, factche...|               biden|
|[, going, factche...|                    |
|[, going, factche...|                 lie|
|[, going, factche...|georgestephanopoulos|
|[, going, factche...|           townhalls|
|             [whats]|               whats|
|[last, night, eas...|                last|
+--------------------+--------------------+
only showing top 10 rows



In [19]:
start_time=time.time()
review_word_sentiment_df = trump.\
    select('filtered', fn.explode('filtered').alias('word')).\
    join(sentiment, 'word')
review_word_sentiment_df.show(10)

+----------+--------------------+---------+
|      word|            filtered|sentiment|
+----------+--------------------+---------+
|     great|[last, night, eas...|        3|
|       yes|[last, night, eas...|        1|
|      hell|[last, night, eas...|       -4|
|   loyalty|         [, loyalty]|        3|
|disrespect|[ever, witnessed,...|       -2|
|      good|[looking, good, s...|        3|
|      dick|[, dick, berlijn,...|       -4|
|      like|[, like, governan...|        2|
| important|[, like, governan...|        2|
|    matter|[, like, governan...|        1|
+----------+--------------------+---------+
only showing top 10 rows



In [20]:
from pyspark.sql.types import IntegerType
simple_sentiment_prediction_df = review_word_sentiment_df.\
    groupBy('filtered').\
    agg(fn.avg('sentiment').alias('avg_sentiment'))
end_time=time.time()
print('sentiment analysis time :', end_time-start_time)
simple_sentiment_prediction_df.show(20)
#simple_sentiment_prediction_df= simple_sentiment_prediction_df.withColumn("avg_sentiment",simple_sentiment_prediction_df["avg_sentiment"].cast(IntegerType()))

sentiment analysis time : 1.834254503250122
+--------------------+-------------+
|            filtered|avg_sentiment|
+--------------------+-------------+
|           [, thank]|          2.0|
|[yes, believe, je...|          1.0|
|[idiot, youre, pr...|         -3.0|
|[sad, happen, nei...|          0.0|
|[, introduction, ...|          3.0|
|    [, talking, ass]|         -4.0|
|[, broken, record...|          0.0|
|[, pcr, tests, de...|         -3.0|
|[, justice, amy, ...|          2.0|
|[, youve, lost, b...|         -3.0|
|[, date, us, high...|         -2.0|
|[, alzheimers, ki...|          2.0|
|[, another, 100, ...|         -3.0|
|       [, get, lost]|         -3.0|
|[waitso, , likes,...|          2.0|
|[, list, top, int...|          0.0|
|[, cant, air, 60,...|         -2.5|
|[yo, prez, , fit,...|         -1.0|
|[, hat, made, goo...|          3.0|
|   [charged, crimes]|         -3.0|
+--------------------+-------------+
only showing top 20 rows



In [21]:
positive=simple_sentiment_prediction_df.filter('avg_sentiment>0').count()
negative=simple_sentiment_prediction_df.filter('avg_sentiment<0').count()
neutral=simple_sentiment_prediction_df.filter('avg_sentiment==0').count()
total=simple_sentiment_prediction_df.count()
p=positive/total
n=negative/total
neutral=neutral/total
print('positive: ',p)
print('negative: ', n)
print('neutral: ',neutral)

positive:  0.44480273551626376
negative:  0.5087540848789145
neutral:  0.04644317960482178


# Biden

In [22]:
biden_tweet=sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('biden.csv')
biden_tweet=biden_tweet.dropna()
biden_tweet=biden_tweet.distinct()
biden=biden_tweet.filter(fn.col("time").isin(['2020-10-15','2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23',
                                             '2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03']))
biden.show(10)
count=biden.groupBy('time').agg(fn.count('*').alias('count')).orderBy(biden.time.asc())
count=count.withColumn('fract',udfsample('count'))
fractions = count.select("time",'fract').rdd.collectAsMap()
print(fractions)
biden=biden.sampleBy('time',fractions)

+----------+---------------+---------------------+
|      time|           user|                 text|
+----------+---------------+---------------------+
|2020-10-16|        sjrlady| @abc are you goin...|
|2020-10-16| SuzieQ74731364| #crookedjoebiden ...|
|2020-10-16|    BobGeloneck| @joebiden these w...|
|2020-10-16|      bezivonne|@joebiden 鈼?bengh...|
|2020-10-16|BabaGan15001591| @joebiden did hun...|
|2020-10-16|        4Shaner| this was supposed...|
|2020-10-16|  ElianaBenador| is it #sleepyjoe ...|
|2020-10-16|  NancyLChapman| @joebiden you ain...|
|2020-10-16|   MiguelAmorim| "@joebiden ""my d...|
|2020-10-16|   BellaRisttaa| @joebiden https:/...|
+----------+---------------+---------------------+
only showing top 10 rows

{'2020-10-15': 0.48680752515792847, '2020-10-16': 0.5899705290794373, '2020-10-17': 0.6924248933792114, '2020-10-18': 0.7588405013084412, '2020-10-19': 0.6368615627288818, '2020-10-20': 0.37605294585227966, '2020-10-21': 0.6747638583183289, '2020-10-22': 0.4411

In [23]:
print(biden.count())
biden.show()

99861
+----------+---------------+---------------------+
|      time|           user|                 text|
+----------+---------------+---------------------+
|2020-10-16|        sjrlady| @abc are you goin...|
|2020-10-16| SuzieQ74731364| #crookedjoebiden ...|
|2020-10-16|      bezivonne|@joebiden 鈼?bengh...|
|2020-10-16|BabaGan15001591| @joebiden did hun...|
|2020-10-16|  ElianaBenador| is it #sleepyjoe ...|
|2020-10-16|  NancyLChapman| @joebiden you ain...|
|2020-10-16|   BellaRisttaa| @joebiden https:/...|
|2020-10-16| Forest_StarIII| @joebiden vote #c...|
|2020-10-16|   JupiterWalls| nothing about thi...|
|2020-10-17|DanielBolling14| @joebiden #crooke...|
|2020-10-17|     GoonaJames| he's taking to li...|
|2020-10-17|1perplepassion8|     #bidenharris2020|
|2020-10-17|       ForgeRat|    #chickentrump馃悢|
|2020-10-17|       SMohitR1| aisa hai tu india...|
|2020-10-17|   3toedgiraffe| @joebiden still i...|
|2020-10-17|       Werdgerl|     @realdonaldtrump|
|2020-10-17|       JSolomos|

In [24]:
start_time=time.time()
biden=biden.withColumn('clean_words',udfclean('text'))
biden=biden.withColumn('newText',udflemmatizer('clean_words'))
biden=biden.select('newText')

In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
tokenizer = Tokenizer(inputCol="newText",outputCol='words')
sw_filter = StopWordsRemover()\
  .setStopWords(stopwords)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")
end_time=time.time()
print('The preopocessing time :',end_time-start_time)
#countVectors = CountVectorizer(inputCol="filtered", outputCol="features",
#vocabSize=10000, minDF=5)
start_time=time.time()
hashtf = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="idf", minDocFreq=5)
end_time=time.time()
print('tf-idf time : ',end_time-start_time)
#minDocFreq: remove sparse terms
#label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer,sw_filter, hashtf,idf])
pipelineFit = pipeline.fit(biden)
biden = pipelineFit.transform(biden)
biden.show(5)

The preopocessing time : 0.0638282299041748
tf-idf time :  0.005985260009765625
+--------------------+--------------------+--------------------+--------------------+--------------------+
|             newText|               words|            filtered|                  tf|                 idf|
+--------------------+--------------------+--------------------+--------------------+--------------------+
| are you going to...|[, are, you, goin...|[, going, factche...|(65536,[9191,1548...|(65536,[9191,1548...|
|    crookedjoebiden |   [crookedjoebiden]|   [crookedjoebiden]|(65536,[57900],[1...|(65536,[57900],[5...|
| benghazi whistle...|[, benghazi, whis...|[, benghazi, whis...|(65536,[3498,1630...|(65536,[3498,1630...|
| did hunter have ...|[, did, hunter, h...|[, hunter, pay, w...|(65536,[43105,472...|(65536,[43105,472...|
|is it sleepyjoe o...|[is, it, sleepyjo...|[sleepyjoe, lousy...|(65536,[8605,2617...|(65536,[8605,2617...|
+--------------------+--------------------+--------------------+

In [26]:
start_time=time.time()
review_word_sentiment_df2 = biden.\
    select('filtered', fn.explode('filtered').alias('word')).\
    join(sentiment, 'word')
review_word_sentiment_df2.show(10)

+---------+--------------------+---------+
|     word|            filtered|sentiment|
+---------+--------------------+---------+
|      pay|[, hunter, pay, w...|       -1|
|incapable|[aisa, hai, tu, i...|       -2|
|    shock|[, still, shock, ...|       -2|
|  mistake|[, make, mistake,...|       -2|
|     care|[, make, mistake,...|        2|
|     fake|[corrupt, joe, bi...|       -3|
|  fucking|[, ain, fucking, ...|       -4|
|     fuck|[, ain, fucking, ...|       -4|
|     want|[, wanted, shut, ...|        1|
|     want|[nobody, ever, sa...|        1|
+---------+--------------------+---------+
only showing top 10 rows



In [27]:
simple_sentiment_prediction_df2 = review_word_sentiment_df2.\
    groupBy('filtered').\
    agg(fn.avg('sentiment').alias('avg_sentiment'))
end_time=time.time()
print('sentiment analysis time :', end_time-start_time)
simple_sentiment_prediction_df2.show(20)

sentiment analysis time : 1.5774781703948975
+--------------------+------------------+
|            filtered|     avg_sentiment|
+--------------------+------------------+
|           [, thank]|               2.0|
|[, kammy, joke, t...|               2.0|
|[, thomas, engage...|              -1.5|
|[, blah, blah, bl...|              -2.0|
|[corruptjoebiden,...|              -0.5|
|[, dont, forget, ...|              -1.0|
|[, virus, came, c...|0.3333333333333333|
|[, cool, demagogu...|               1.0|
|       [, get, lost]|              -3.0|
|[, thanks, lying,...|               2.0|
|[, unfortunately,...|               0.0|
|[, theyre, going,...|               2.5|
|[ugly, people, tr...|              -3.0|
|[, say, 100, year...|               2.0|
|[biden, says, cat...|               1.5|
|[, going, shut, v...|               1.5|
|[, ridiculous, st...|              -1.0|
|[, danny, glover,...|              -2.0|
|     [, love, trump]|               3.0|
|[, communist, par...|         

In [28]:
positive=simple_sentiment_prediction_df2.filter('avg_sentiment>0').count()
negative=simple_sentiment_prediction_df2.filter('avg_sentiment<0').count()
neutral=simple_sentiment_prediction_df2.filter('avg_sentiment==0').count()
total=simple_sentiment_prediction_df2.count()
p=positive/total
n=negative/total
neutral=neutral/total
print('positive: ',p)
print('negative: ', n)
print('neutral: ',neutral)

positive:  0.5226106911447084
negative:  0.43115550755939525
neutral:  0.046233801295896326
