In [1]:
import findspark
findspark.init('/opt/spark')

import os
from dotenv import load_dotenv
load_dotenv('../.env')
access = os.environ.get('AWS_ACCESS')
secret = os.environ.get('AWS_SECRET')

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, StringType, TimestampType, DateType, StructType, DoubleType, IntegerType
from pyspark.sql.functions import *

from pyspark.ml.functions import vector_to_array
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.feature import CountVectorizer, IDF, StopWordsRemover
from pyspark.ml import Pipeline, PipelineModel

In [3]:
conf = SparkConf() \
    .set("fs.s3a.awsAccessKeyId", access) \
    .set("fs.s3a.awsSecretAccessKey", secret) \
    .set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com") \
    .set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") \
    .set("com.amazonaws.services.s3.enableV4", "true")

spark = SparkSession.builder.master('local').appName('cool').config(conf=conf).getOrCreate()

In [5]:
# filename = 's3a://patellism/processed_data/2020-08-cleaned.parquet.snappy/part-00004-d2d9c5cf-46de-47d9-86df-28fefd1709e5-c000.snappy.parquet'
filename = '2020-08-clean-small.parquet.snappy'
df = spark.read.parquet(filename).drop('geo', 'coordinates', 'place', 'retweet_count', 'favorite_count')
df.printSchema()
# df = df.withColumn('datetime', df['created_at'].cast(TimestampType())).drop('created_at')
# df.show(5)

root
 |-- Unnamed: 0: long (nullable = true)
 |-- id: double (nullable = true)
 |-- full_text: string (nullable = true)
 |-- clean_text: string (nullable = true)



In [34]:
temp = df.drop('id','full_text','hashtags','datetime') \
         .withColumn('word', explode('clean_text')) \
         .groupBy('word') \
         .count() \
         .sort('count', ascending=False)

# Vocab size will 15,000 so we will remove 5% or 750 words for """production"""
to_remove = temp.drop('count').limit(750).collect()

['say',
 'trump',
 'make',
 'people',
 'work',
 'enough',
 'im',
 'get',
 'biden',
 'american',
 'one',
 'good',
 'need',
 'time',
 'amp',
 'president',
 'state',
 'we',
 'racist',
 'use',
 'cant',
 'democrat',
 'even',
 'think',
 'go',
 'u',
 'see',
 'right',
 'vote',
 'many',
 'america',
 'thing',
 'week',
 'read']

In [35]:
rm_freq_words = StopWordsRemover(inputCol='clean_text', outputCol='clean_rm_frequent', stopWords=to_remove)
cv_tf = CountVectorizer(inputCol='clean_rm_frequent', outputCol='features', vocabSize=5000, minDF=1)
lda_tf = LDA(k=3, maxIter=20)

tf_pipeline = Pipeline(stages=[rm_freq_words, cv_tf, lda_tf])

In [36]:
# Three groupings
#   Ungrouped
#   Time
#   Hashtags
#       Need to determine similarity metrics and method for tweets with no hashtags

In [37]:
model_tf = tf_pipeline.fit(df)

topics_tf = model_tf.stages[-1].describeTopics(maxTermsPerTopic=5)
vocabArray_tf = model_tf.stages[1].vocabulary

def covertToWord(indices):
    result = []
    for i in indices:
        result.append(vocabArray_tf[i])
    return result

udf_convertToWord = udf(covertToWord, ArrayType(StringType()))
topics_tf = topics_tf.withColumn('word', udf_convertToWord('termIndices'))
topics_tf.select('word').show(truncate=False)

+---------------------------------------+
|word                                   |
+---------------------------------------+
|[puppet, you, everyone, could, gotta]  |
|[kid, believe, country, teacher, give] |
|[faith, respect, lose, truth, election]|
+---------------------------------------+



In [38]:
transformed_tf = model_tf.transform(df)
transformed_tf.show(5)

+-------------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+
|                 id|           full_text|hashtags|          clean_text|           datetime|   clean_rm_frequent|            features|   topicDistribution|
+-------------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+
|1291569732789981185|@JRILLaw @Lrihend...|      []|[litigator, abili...|2020-08-06 22:59:55|[litigator, abili...|(655,[39,48,154,1...|[0.02138893474753...|
|1291569732924010496|@Jehi001 @Bibathe...|      []|[get, lose, vile,...|2020-08-06 22:59:55|[lose, vile, unca...|(655,[12,139,254,...|[0.04251318820887...|
|1291569734735917057|@granada761 @Real...|      []|[house, discrimin...|2020-08-06 22:59:55|[house, discrimin...|(655,[2,4,19,31,3...|[0.01425062821245...|
|1291569735994183685|@metawoke @Halani...|      []|[know, one, r

In [39]:
ll = model_tf.stages[-1].logLikelihood(transformed_tf)
lp = model_tf.stages[-1].logPerplexity(transformed_tf)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -6719.647641050866
The upper bound on perplexity: 7.558658763836744


In [72]:
# Group by time
time_df = df.drop('full_text', 'hashtags')

time_df = time_df.withColumn('day_mo_yr', date_format('datetime', 'yyyy-MM-dd'))
time_df = time_df.withColumn('hour', hour('datetime'))

def amOrPm(hour):
    res = hour - 12
    if res < 0:
        return 0
    else: 
        return 1

udf_amOrPm = udf(amOrPm, IntegerType())
time_df = time_df.withColumn('am_or_pm', udf_amOrPm('hour'))

time_df = \
    time_df.withColumn('clean_text', explode('clean_text')) \
    .groupBy('day_mo_yr', 'am_or_pm') \
    .agg(collect_list('clean_text'), collect_list('id')) \
    .withColumnRenamed('collect_list(clean_text)', 'clean_text') \
    .withColumnRenamed('collect_list(id)', 'id_list')

time_df.show()

+----------+--------+--------------------+--------------------+
| day_mo_yr|am_or_pm|          clean_text|             id_list|
+----------+--------+--------------------+--------------------+
|2020-08-06|       1|[litigator, abili...|[1291569732789981...|
+----------+--------+--------------------+--------------------+



In [73]:
model_time = tf_pipeline.fit(time_df)
topics_time = model_time.stages[-1].describeTopics(maxTermsPerTopic=5)
vocabArray_tf = model_time.stages[1].vocabulary

def covertToWord(indices):
    result = []
    for i in indices:
        result.append(vocabArray_tf[i])
    return result

udf_convertToWord = udf(covertToWord, ArrayType(StringType()))
topics_time = topics_time.withColumn('word', udf_convertToWord('termIndices'))
topics_time.select('word').show(truncate=False)

Py4JJavaError: An error occurred while calling o743.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 280.0 failed 1 times, most recent failure: Lost task 0.0 in stage 280.0 (TID 2928) (192.168.1.126 executor driver): java.io.FileNotFoundException: File file:/home/bettik/Documents/CAP6640-Course-Project/topic-modeling/2020-08-cleaned-small.parquet.snappy does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/home/bettik/Documents/CAP6640-Course-Project/topic-modeling/2020-08-cleaned-small.parquet.snappy does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [41]:
# cv_idf = CountVectorizer(inputCol='clean_text', outputCol='raw_features', vocabSize=5000, minDF=3)
# idf = IDF(inputCol="raw_features", outputCol="features")
# lda_tfidf = LDA(k=3, maxIter=20)

# tfidf_pipeline = Pipeline(stages=[cv_idf, idf, lda_tfidf])

In [42]:
# model_idf = tfidf_pipeline.fit(df)

# topics_idf = model_idf.stages[-1].describeTopics(maxTermsPerTopic=5)
# vocabArray_idf = model_idf.stages[0].vocabulary

# def covertToWord(indices):
#     result = []
#     for i in indices:
#         result.append(vocabArray_idf[i])
#     return result

# udf_convertToWord = udf(covertToWord, ArrayType(StringType()))
# topics_idf = topics_idf.withColumn('word', udf_convertToWord('termIndices'))
# topics_idf.select('word').show(truncate=False)

In [43]:
# transformed_idf = model_idf.transform(df)
# transformed_idf.show()

In [44]:
# ll = model_idf.stages[-1].logLikelihood(transformed_idf)
# lp = model_idf.stages[-1].logPerplexity(transformed_idf)
# print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
# print("The upper bound on perplexity: " + str(lp))