In [24]:
raw = spark.read.json('s3://macs-30123-final-sentiment-temp/final/tweets_data_*.json', multiLine=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# Drop unnecessary columns
data = raw.drop('text', 'description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
# Clean up inconsistencies
data = data.withColumnRenamed('sentiment\r', 'sentiment')
data = data.withColumnRenamed('clean_text', 'text')
data = data.withColumnRenamed('clean_description', 'description')
data = data.withColumn('followers', data['followers'].cast('integer'))
data.dropna()
data = data.filter(data.text != '')
data = data.filter(data.description != '')
data = data.replace(float('nan'), 0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Create numeric index for sentiment score
from pyspark.ml.feature import StringIndexer

indexer_sent = StringIndexer(inputCol='sentiment', outputCol='sent_code').setHandleInvalid('skip')
data = indexer_sent.fit(data).transform(data)
data = data.withColumn('sent_code', data['sent_code'].cast('integer'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
# Create numeric index of language

indexer_lang = StringIndexer(inputCol='language', outputCol='lang_code').setHandleInvalid('skip')
data = indexer_lang.fit(data).transform(data)
data = data.withColumn('lang_code', data['lang_code'].cast('integer'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# We don't really care about "mixed" sentiment results - only positive/neutral/negative
# POSITIVE = 3, MIXED = 2, NEGATIVE = 1, NEUTRAL = 0

data = data.filter(data.sent_code != 2)
data = data.filter(data.language == 'en') # keeping the other languages introduced weird errors

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
by_sent = data.groupby('sent_code')
by_sent.count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+
|sent_code| count|
+---------+------+
|        1| 85672|
|        3| 17693|
|        0|126439|
+---------+------+

In [53]:
# Resample data given the count results shown above
data = data.sampleBy('sent_code', {0: .14, 1: .21, 3: .99}, seed=17)

by_sent = data.groupby('sent_code')
by_sent.count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|sent_code|count|
+---------+-----+
|        1|17874|
|        3|17520|
|        0|17581|
+---------+-----+

In [94]:
# Model 1: text of user descriptions for each tweet
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

tokened = Tokenizer(inputCol='description', outputCol='words').transform(data)
stopwords = StopWordsRemover(inputCol='words', outputCol='terms').transform(tokened)
hashed = HashingTF(inputCol='terms', outputCol="hash").transform(stopwords)
tf_idf = IDF(inputCol='hash', outputCol="features").fit(hashed).transform(hashed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [95]:
tf_idf.select('terms', 'features').show(4, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|terms                                        |features                                                                                                                                                            |
+---------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[union, millwright, vested, status, quo]     |(262144,[4842,91039,129199,181609,197130],[6.159095388491933,7.786551806428712,10.184447079227082,7.658718434918827,9.085834790558973])                             |
|[digital, soldier, , support, president]     |(262144,[83887,124361,201383,230616,249180],[5.045711782503511,3.375407773184103,4.265553224953936,5.

In [101]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

train, test = tf_idf.randomSplit([0.7, 0.3])

logistic = LogisticRegression(labelCol='sent_code', regParam=0.2).fit(train)

prediction = logistic.transform(test)
prediction.groupBy('sent_code', 'prediction').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-----+
|sent_code|prediction|count|
+---------+----------+-----+
|        1|       3.0| 1573|
|        3|       1.0| 1992|
|        3|       3.0| 1882|
|        1|       1.0| 2268|
|        0|       3.0| 1396|
|        0|       0.0| 1914|
|        0|       1.0| 1916|
|        1|       0.0| 1496|
|        3|       0.0| 1388|
+---------+----------+-----+

In [79]:
# Model 2: follower count of each tweet's user
by_sent.mean('followers').show()

features = ['followers']
assembler = VectorAssembler(inputCols = features, outputCol = 'features')

follower_data = assembler.transform(data)
follower_data[['text', 'sent_code', 'followers', 'features']].show(5)

lr = LogisticRegression(featuresCol='features', labelCol='sent_code')
train, test = follower_data.randomSplit([0.7, 0.3])
model2 = lr.fit(train)

trainingSummary = model2.summary
evaluationSummary = model2.evaluate(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+---------+---------+
|                text|sent_code|followers| features|
+--------------------+---------+---------+---------+
|you seem very emo...|        3|     4141| [4141.0]|
|wheres the boot  ...|        0|      553|  [553.0]|
|from the founding...|        0|     1003| [1003.0]|
|hopefully biden c...|        0|      277|  [277.0]|
|i know it can run...|        0|     1774| [1774.0]|
|this has def beco...|        1|     4209| [4209.0]|
|not enough time f...|        1|       21|   [21.0]|
|theory     dadona...|        0|       18|   [18.0]|
|p s  im glad to h...|        3|      153|  [153.0]|
|ap sources  biden...|        0|    34459|[34459.0]|
|         and confess|        0|    15290|[15290.0]|
|joe biden has sai...|        1|      169|  [169.0]|
|xi jinping must b...|        3|       69|   [69.0]|
|are you smoking c...|        1|        1|    [1.0]|
| trump can't believe|        1|      233|  [233.0]|
|you just might ge...|        3|     7574| [75

In [87]:
print("\nFalse positive rate by label (Training):")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nTrue positive rate by label (Training):")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))
    
print("\nTraining Accuracy: " + str(trainingSummary.accuracy))
print("Test Accuracy: ", str(evaluationSummary.accuracy))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


False positive rate by label (Training):
label 0: 0.0035117461855170744
label 1: 0.9907779809059517
label 2: 0.0

True positive rate by label (Training):
label 0: 0.014564686737184702
label 1: 0.9968672182504619
label 2: 0.0

Training Accuracy: 0.3396557306281027
Test Accuracy:  0.3441644145559676