In [1]:
from gensim.corpora import Dictionary
import pandas as pd
from gensim.parsing.preprocessing import preprocess_string

In [2]:
# read with spark because it's a lot faster
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, udf

spark = SparkSession.builder.config("spark.worker.cleanup.enabled", "true") \
.config("spark.worker.cleanup.interval", 60) \
.getOrCreate() 
df = spark.read.json("parsed_tweets3")

In [3]:
df = df.sample(0.05)

In [4]:
import re

@udf("string")
def replace_entities(mentions, urls, tweet):
    mentions = ['@' + m for m in mentions.split(' ')]
    for m in mentions:
        if len(m) > 0:
            tweet = tweet.replace(m, '@MENTION')
        
    urls = urls.split(' ')
    for u in urls:
        if len(u) > 0:
            tweet = tweet.replace(u, '@URL')
    http = re.compile(r'https?://\S+')
    tweet = http.sub('@URL', tweet)
        
    return tweet

In [5]:
df = df.withColumn("entity_replaced", replace_entities(df.mentions, df.urls, df.full_text))

In [6]:
df = df.where(df.lang == "en")
df = df.withColumn("rt_indicator", when(df.full_text.like("RT @%"), 1).otherwise(0))
training_df = df.where(df.rt_indicator == 0).sample(0.15)
full_df = df.sample(0.05)
df = training_df.union(full_df).distinct()

In [9]:
from demoji import replace
import re
from gensim.parsing.preprocessing import strip_multiple_whitespaces, remove_stopwords
from gensim.utils import to_unicode
from nltk.stem.snowball import SnowballStemmer
from nltk.corpus import stopwords
from gensim.parsing.preprocessing import STOPWORDS

STOPWORDS = STOPWORDS.union(stopwords.words('english')).union(set('&amp;'))

def my_remove_stopwords(s):
    s = to_unicode(s)
    s = s.lower()
    return " ".join(w for w in s.split() if w not in STOPWORDS)

def replacer(string):
    string = string.replace("-", "")
    string = string.replace("RT ", "")
    handles_and_hashtags = re.compile(r"[^\w\d#@\s]+")
    string =  handles_and_hashtags.sub('', string)
    string = replace(string)
    return string

def ignore_stemmer(tweet):
    stemmed_list = []
    split = tweet.split(" ")
    for w in split:
        if not w.startswith("@") and not w.startswith("#"):
            w = SnowballStemmer("english").stem(w)
        stemmed_list.append(w)
    return " ".join(stemmed_list)

def further_replacer(tweet):
    split = tweet.split(" ")
    numbers = re.compile(r"\d+")
    new_text_list = []
    for w in split:
        if w == '@url':
            new_text_list.append('@URL')
            continue
            
        if w == '@mention':
            new_text_list.append('@MENTION')
            continue
            
        if numbers.match(w):
            new_text_list.append('@NUMBER')
            continue
            
        else:
            new_text_list.append(w)
    
    return " ".join(new_text_list)

def whitespace_replace_udf(tweet):
    return strip_multiple_whitespaces(tweet)

@udf("string")
def process_text(tweet):
    tweet = my_remove_stopwords(tweet)
    tweet = replacer(tweet)
    tweet = ignore_stemmer(tweet)
    tweet = further_replacer(tweet)
    tweet = strip_multiple_whitespaces(tweet)
    
    return tweet

df = df.withColumn("cleaned_text", process_text("entity_replaced"))

In [10]:
from pyspark.sql.functions import split    
df = df.withColumn("split_text", split(df.cleaned_text, " "))

In [None]:
df.write.option("maxRecordsPerFile", 50000).json("cleaned_tweets")

In [12]:
pdf = df.toPandas()

In [15]:
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer

no_features = 1000

# LDA can only use raw term counts for LDA because it is a probabilistic graphical model
tf_vectorizer = CountVectorizer(max_df=0.95, min_df=2, max_features=no_features)
tf = tf_vectorizer.fit_transform(pdf.cleaned_text.values)
tf_feature_names = tf_vectorizer.get_feature_names()

In [20]:
texts = pdf.split_text.values

In [32]:
from sklearn.feature_extraction.text import CountVectorizer

In [None]:
# need to confirm it's right to do dict with full text but other things not with it 
dictionary = Dictionary(df.split_text.to_list())

training_df = df[df.rt_indicator == 0]

training_docs = list(training_df.split_text.apply(tuple).unique())
training_corpus = [dictionary.doc2bow(tweet) for tweet in training_docs]

full_docs = df.split_text.to_list()
full_corpus = [dictionary.doc2bow(tweet) for tweet in full_docs]

In [None]:
len(training_corpus)

In [None]:
from gensim.models import ldamulticore
# main hyperparameter is number of topics, 10 may be too little, try 50 or 100 for this random sample dataset
# for coronavirus themed tweets, we could do fewer topics 

# Set training parameters.
# try different number of topics
num_topics = 10
chunksize = 2000 # number of documents passed to a core

# use defaults for iterations and passes and see if modeling is good
passes = 20 # number of passes through corpus
iterations = 400 # could make 100 for coronavirus tweets, but could reduce for faster development iterations 
eval_every = None  # Don't evaluate model perplexity, takes too much time.

# Make a index to word dictionary.
temp = dictionary[0]  # This is only to "load" the dictionary.
id2word = dictionary.id2token

model = ldamulticore.LdaMulticore(
    corpus=training_corpus,
    id2word=id2word,
    chunksize=chunksize,
    eta='auto',
    iterations=400,
    num_topics=50,
    eval_every=eval_every
)

In [None]:
top_topics = model.top_topics(training_corpus)

In [None]:
top_topics

In [None]:
topics_dict = {}
for t in range(len(top_topics)):
    topics_dict[str(t)] = top_topics[t]
print(topics_dict)

In [None]:
topics = []
for i in range(len(full_corpus)):
    topics.append(model.get_document_topics(full_corpus[i], minimum_probability=0.0))

In [None]:
probabilities = []
for t in topics:
    p = list(zip(*t))[1]
    probabilities.append(p)

In [None]:
df['topics'] = probabilities

In [None]:
from scipy.stats import entropy
import math

def calculate_perplexity(probability_list):
    return math.exp(entropy(probability_list))

In [None]:
df['perplexity'] = df['topics'].apply(calculate_perplexity)

In [None]:
import seaborn as sns
sns.distplot(df['perplexity'])

In [None]:
import numpy as np
df['top_topic'] = df['topics'].apply(np.argmax)
df['top_topic_prob'] = df['topics'].apply(max)

In [None]:
# throw away tweets with perplexity too high
unperplexed = df[df['perplexity'] < 10]

In [None]:
unperplexed.to_csv("50_topic_model_round_4.csv", index=False)

In [None]:
text_dict = {}
for i in range(50):
    sample = unperplexed[unperplexed['top_topic'] == i]
    ordered = sample.loc[sample.topics.apply(lambda x: x[i]).sort_values(ascending = False).index]
    text_dict[i] = list(ordered.entity_replaced.unique()[:10])

In [None]:
import json
with open("50_topics_tweets_round_4.json", "w") as f:
    f.write(json.dumps(text_dict, indent = 2, ensure_ascii = False))

In [None]:
perplexity_dict = {}
perplexity_dict_tuples = {}
nrows_dict = {}

def make_float(l):
    return [float(i) for i in l]
        
for i in range(1, 11):
    key = "[" + str(i) + ","  + str(i + 1) + ")"
    ix = pdf[(pdf.perplexity >= i) & (pdf.perplexity < i+1)].index
    sample = pdf.loc[ix, ['full_text', 'topics', 'perplexity']]
    nrows = sample.shape[0]
    sample = sample.sample(n=100)
    sample['topics'] = sample['topics'].apply(make_float)
    
    nrows_dict[key] = nrows
    
    perplexity_dict[key] = {}
    perplexity_dict[key]["full_text"] = sample.full_text.to_list()
    perplexity_dict[key]["topic_probabilities"] = sample.topics.to_list()
    perplexity_dict[key]["perplexity"] = sample.perplexity.to_list()
    
    perplexity_dict_tuples[key] = list(zip(sample.full_text.to_list(), sample.topics.to_list(), sample.perplexity.to_list()))

In [None]:
with open("perplexity_sample.json", "w") as f:
    f.write(json.dumps(perplexity_dict, indent = 2))

In [None]:
with open("perplexity_sample_tuple.json", "w") as f:
    f.write(json.dumps(perplexity_dict_tuples, indent = 2))