In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install gdown
#drive_url='https://drive.google.com/uc?export=download&confirm=_eEI&id=18ePMXLEOvC0kIiSnsFdvEa2iEwLj592Y'
!gdown --id 18ePMXLEOvC0kIiSnsFdvEa2iEwLj592Y
#stopwords_url='https://drive.google.com/file/d/1yReenWk77Y95esGurD0Tzj3zADqKr5B4/view?usp=sharing'
!gdown --id 1yReenWk77Y95esGurD0Tzj3zADqKr5B4
!unzip ./tweets.zip -d .
!rm -rf tweets.zip

Spark Session

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Tweets').getOrCreate()

In [104]:
frame1=spark.read.option("inferSchema",True).option("header",True).option("multiLine",True).option("escape",'\"').csv('./hashtag_donaldtrump.csv').select('tweet')
frame2=spark.read.option("inferSchema",True).option("header",True).option("multiLine",True).option("escape",'\"').csv('./hashtag_joebiden.csv').select('tweet')

Preprocessing

In [105]:
from pyspark.sql.functions import regexp_replace, col
frame1=frame1.withColumn("clean",regexp_replace(regexp_replace(regexp_replace('tweet',r'http\S+',' '),r'\n',' '),r'\\n',' '))
frame2=frame2.withColumn("clean",regexp_replace(regexp_replace(regexp_replace('tweet',r'http\S+',' '),r'\n',' '),r'\\n',' '))

In [106]:
!pip install emoji
import emoji
em=list(emoji.UNICODE_EMOJI['en'].keys())
elim=[str(x) for x in range(10)]+['.',',', '!','?',';',':','-','—','_','+','=','%','&','*','(',')','[',']','{','}','\"','|','\\',"'",'`','$','€','@','^','<','>','/','►'] # retained for twitter hashtags



In [107]:
from pyspark.ml.feature import StopWordsRemover
remover=StopWordsRemover(stopWords=em+elim).setCaseSensitive(False)

In [108]:
from pyspark.sql.functions import split,concat_ws
frame1=frame1.withColumn("letters",split('clean',''))
frame2=frame2.withColumn("letters",split('clean',''))

In [109]:
remover.setInputCol("letters")
remover.setOutputCol("filtered")
frame1=remover.transform(frame1).withColumn('words',split(concat_ws('',"filtered"),' '))
frame2=remover.transform(frame2).withColumn('words',split(concat_ws('',"filtered"),' '))

In [110]:
#import nltk
#nltk.download('stopwords')
#from nltk.corpus import stopwords
#stopwords=stopwords.words('english')
stopwords=open('stop_words_english.txt','r').read().split('\n')
remover2=StopWordsRemover(stopWords=stopwords+['']).setCaseSensitive(False)
remover2.setInputCol("words")
remover2.setOutputCol("tweet_words")
frame1=remover2.transform(frame1).drop('clean','letters','filtered','words')
frame2=remover2.transform(frame2).drop('clean','letters','filtered','words')

In [111]:
frame1=frame1.withColumn('clean',concat_ws(' ',"tweet_words"))
frame2=frame2.withColumn('clean',concat_ws(' ',"tweet_words"))

Sentiment

In [112]:
from textblob import TextBlob
def sentiment_analysis(sentence):
    pol=TextBlob(sentence).sentiment.polarity
    if pol>0: return 1
    elif pol==0: return 0
    else: return -1

In [113]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, StringType
sentiment_udf=udf(sentiment_analysis,IntegerType())
frame1=frame1.withColumn("Sentiment",sentiment_udf(frame1.clean))
frame2=frame2.withColumn("Sentiment",sentiment_udf(frame2.clean))

TF-IDF

In [13]:
from pyspark.ml.feature import CountVectorizer, IDF
cv=CountVectorizer(inputCol="tweet_words",outputCol="tf",vocabSize=5000,minDF=0.001)
cv_model1=cv.fit(frame1)
cv_model2=cv.fit(frame2)

In [14]:
vocab1=cv_model1.vocabulary
vocab2=cv_model2.vocabulary

In [15]:
frame1=cv_model1.transform(frame1)
frame2=cv_model2.transform(frame2)
idf=IDF(inputCol="tf",outputCol="tfidf")
idf_model1=idf.fit(frame1)
idf_model2=idf.fit(frame2)
frame1=idf_model1.transform(frame1)
frame2=idf_model2.transform(frame2)

In [16]:
from pyspark.sql.functions import monotonically_increasing_id
frame1=frame1.withColumn("id",monotonically_increasing_id())
frame2=frame2.withColumn("id",monotonically_increasing_id())

In [None]:
#For testing, delete on main run
frame1=frame1.limit(200)
frame2=frame2.limit(200)

LDA

In [18]:
from pyspark.ml.clustering import LDA
topics=10
lda=LDA(k=topics,seed=1,optimizer="em",maxIter=10,featuresCol='tf')
lda_model1=lda.fit(frame1)
lda_model2=lda.fit(frame2)

In [None]:
topic1=lda_model1.describeTopics(topics)
topic2=lda_model2.describeTopics(topics)
#from pyspark.sql.functions import explode, arrays_zip
#top1=topics_1.withColumn("new",arrays_zip("termIndices","termWeights")).withColumn("new",explode("new")).select("topic",col("new.termIndices").alias("termIndices"),col("new.termWeights").alias("termWeights"))
#top2=topics_2.withColumn("new",arrays_zip("termIndices","termWeights")).withColumn("new",explode("new")).select("topic",col("new.termIndices").alias("termIndices"),col("new.termWeights").alias("termWeights"))

In [20]:
frame1=lda_model1.transform(frame1)
frame2=lda_model2.transform(frame2)

In [21]:
topic_lookup_udf1=udf(lambda x: ' '.join([vocab1[i] for i in x]),StringType())
topic_lookup_udf2=udf(lambda x: ' '.join([vocab2[i] for i in x]),StringType())
topic1=topic1.withColumn('terms',topic_lookup_udf1(col('termIndices'))).drop('termIndices','termWeights')
topic2=topic2.withColumn('terms',topic_lookup_udf2(col('termIndices'))).drop('termIndices','termWeights')

In [22]:
getMaxudf=udf(lambda x: float(max(x)),FloatType())
getMaxindexudf=udf(lambda x: x.tolist().index(max(x)),IntegerType())
frame1=frame1.withColumn('probDist',getMaxudf(col('topicDistribution'))).withColumn('topic_index',getMaxindexudf(col('topicDistribution')))
frame2=frame2.withColumn('probDist',getMaxudf(col('topicDistribution'))).withColumn('topic_index',getMaxindexudf(col('topicDistribution')))

In [23]:
frame1=frame1.join(topic1,frame1.topic_index==topic1.topic,'inner').drop('topic')
frame2=frame2.join(topic2,frame2.topic_index==topic2.topic,'inner').drop('topic')

In [None]:
#high Probability associated tweet with each topic
from pyspark.sql.functions import max
frame1.join(frame1.groupby(frame1.topic_index).agg(max('probDist').alias('probDist')),on=['topic_index','probDist'],how='inner').select('terms','tweet').show(truncate=False)
frame2.join(frame2.groupby(frame2.topic_index).agg(max('probDist').alias('probDist')),on=['topic_index','probDist'],how='inner').select('terms','tweet').show(truncate=False)

Train Test Split, Cross Validation and Classification

In [24]:
#1 vs all classification
df1=frame1.withColumn("Sentiment",(col("Sentiment")+1)%2) #Neutrals are 1, Positive and Negative are 0
df2=frame2.withColumn("Sentiment",(col("Sentiment")+1)%2)
df3=frame1.filter(col("Sentiment")!=0).withColumn("Sentiment",(col("Sentiment")+1)/2) #Positives are 1, Negatives are 0
df4=frame2.filter(col("Sentiment")!=0).withColumn("Sentiment",(col("Sentiment")+1)/2)

In [25]:
train1,test1=df1.randomSplit([0.8,0.2])
train2,test2=df2.randomSplit([0.8,0.2])
train3,test3=df3.randomSplit([0.8,0.2])
train4,test4=df4.randomSplit([0.8,0.2])

In [26]:
from pyspark.ml.classification import LogisticRegression
lr=LogisticRegression().setLabelCol('Sentiment').setFeaturesCol('tfidf').setRegParam(0.1).setMaxIter(10).setElasticNetParam(0.1)

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(labelCol='Sentiment',metricName='areaUnderROC')

In [28]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid=(ParamGridBuilder().addGrid(lr.regParam,[0.1, 0.3, 0.5]).addGrid(lr.elasticNetParam,[0.0, 0.1, 0.2]).build())
cv=CrossValidator(estimator=lr,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=10)
model1_1=cv.fit(train1)
model1_2=cv.fit(train3)
model2_1=cv.fit(train2)
model2_2=cv.fit(train4)
pred1_1=model1_1.transform(test1)
pred1_2=model1_2.transform(test3)
pred2_1=model2_1.transform(test2)
pred2_2=model2_2.transform(test4)
print(evaluator.evaluate(pred1_1))
print(evaluator.evaluate(pred1_2))
print(evaluator.evaluate(pred2_1))
print(evaluator.evaluate(pred2_2))

0.7796875000000001
0.7777777777777778
0.6180555555555555
0.5636363636363637


Combined TFIDF and TopicDistribution

In [59]:
from pyspark.ml.feature import VectorAssembler
assembler=VectorAssembler(inputCols=['tfidf','topicDistribution'], outputCol='features')
vecframe1=assembler.transform(frame1)
vecframe2=assembler.transform(frame2)

In [60]:
#1 vs all classification
df1=vecframe1.withColumn("Sentiment",(col("Sentiment")+1)%2) #Neutrals are 1, Positive and Negative are 0
df2=vecframe2.withColumn("Sentiment",(col("Sentiment")+1)%2)
df3=vecframe1.filter(col("Sentiment")!=0).withColumn("Sentiment",(col("Sentiment")+1)/2) #Positives are 1, Negatives are 0
df4=vecframe2.filter(col("Sentiment")!=0).withColumn("Sentiment",(col("Sentiment")+1)/2)

In [61]:
train1,test1=df1.randomSplit([0.8,0.2])
train2,test2=df2.randomSplit([0.8,0.2])
train3,test3=df3.randomSplit([0.8,0.2])
train4,test4=df4.randomSplit([0.8,0.2])

In [62]:
from pyspark.ml.classification import LogisticRegression
lr=LogisticRegression().setLabelCol('Sentiment').setFeaturesCol('features').setRegParam(0.1).setMaxIter(10).setElasticNetParam(0.1)

In [63]:
cv=CrossValidator(estimator=lr,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=10)
model1_1=cv.fit(train1)
model1_2=cv.fit(train3)
model2_1=cv.fit(train2)
model2_2=cv.fit(train4)
pred1_1=model1_1.transform(test1)
pred1_2=model1_2.transform(test3)
pred2_1=model2_1.transform(test2)
pred2_2=model2_2.transform(test4)
print(evaluator.evaluate(pred1_1))
print(evaluator.evaluate(pred1_2))
print(evaluator.evaluate(pred2_1))
print(evaluator.evaluate(pred2_2))

0.8011695906432749
0.6416666666666666
0.6100217864923746
0.738095238095238


In [None]:
import matplotlib.pyplot as plt
fig,ax=plt.subplots()
for x in frame1.groupby(frame1.topic_index).count().collect():
  ax.scatter(x.asDict()['topic_index'],x.asDict()['count'],label='topic'+str(x.asDict()['topic_index']))
  ax.legend()

In [None]:
from wordcloud import WordCloud
word_cloud=WordCloud(background_color='white',max_words=100)
word_cloud.generate(' '.join(vocab1[:50]+vocab2[:50]).upper())
plt.imshow(word_cloud, interpolation='bilinear')
plt.axis('off')

In [None]:
import seaborn as sns
#for x in frame1.groupby(frame1.Sentiment).count().collect():
fig=sns.barplot(x='Sentiment', y='count',data=frame1.groupby(frame1.Sentiment).count().toPandas())