In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
!pip install ipython-autotime
%load_ext autotime

time: 0 ns (started: 2022-05-03 10:36:00 -04:00)


In [3]:
# sc.stop()

time: 0 ns (started: 2022-05-03 10:36:00 -04:00)


In [4]:
import re
import numpy as np
import pandas as pd
import nltk
# nltk.download('stopwords')
from nltk.corpus import stopwords
# nltk.download('words')
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer
# nltk.download('wordnet')

time: 1.19 s (started: 2022-05-03 10:36:00 -04:00)


## Data Preparation

In [5]:
txt_df = spark.read.csv(
    "data_test.csv", 
    header=True,
    inferSchema = True
)
txt_df.show(3)

+-------------------+-------------------+--------------------+----------+--------+--------+---------+--------------+-----------------+-----+--------------------+--------+--------+----+------+-------------+--------------+-----------+--------+--------+--------------------+-------+---------+-----+---------+----+----+------+----------+-------+----------+--------+------------+---------+---------+----------+
|                 id|    conversation_id|          created_at|      date|    time|timezone|  user_id|      username|             name|place|               tweet|language|mentions|urls|photos|replies_count|retweets_count|likes_count|hashtags|cashtags|                link|retweet|quote_url|video|thumbnail|near| geo|source|user_rt_id|user_rt|retweet_id|reply_to|retweet_date|translate|trans_src|trans_dest|
+-------------------+-------------------+--------------------+----------+--------+--------+---------+--------------+-----------------+-----+--------------------+--------+--------+----+----

In [6]:
temp = txt_df.select('user_id', 'tweet')
txt_row = temp.rdd
txt = txt_row.map(lambda x: [str(x[0]), x[1]])
txt.take(2)

[['215231197', 'Stay gold, Pony Boy.'],
 ['215231197', 'your tone gives you away.']]

time: 687 ms (started: 2022-05-03 10:36:06 -04:00)


## Data Pre-processing

In [7]:
def easy_clean(txt):
    
    # transform to lowercase
    txt_low = txt.lower()
    
    # remove punctuations and numbers except for #
    txt_clean = re.sub("[^#_a-zA-Z]", " ",txt_low)
    
    return txt_clean


def extract_hash_words(txt_clean):
    
    # find all hash words (with hash symbol)
    txt_hash_words = re.findall(r'#\w+',txt_clean)
    
    # remove the hash symbol
    txt_nohash_words_txt = re.sub("#", " ", " ".join([x for x in txt_hash_words]))
    
    # tokenize
    txt_nohash_words = txt_nohash_words_txt.split()
    
    return txt_nohash_words

time: 0 ns (started: 2022-05-03 10:36:07 -04:00)


In [8]:
# clean each tweet in a simple way
txt_clean_tweet = txt.map(lambda x: (x[0],easy_clean(x[1]),x[1]))
txt_clean_tweet.take(50)

[('215231197', 'stay gold  pony boy ', 'Stay gold, Pony Boy.'),
 ('215231197', 'your tone gives you away ', 'your tone gives you away.'),
 ('215231197',
  'this unbelieving world has nowhere to rest its weary head ',
  'This unbelieving world has nowhere to rest its weary head.'),
 ('215231197',
  'jealousy hides in pride and calls itself discernment ',
  'Jealousy hides in pride and calls itself discernment.'),
 ('215231197',
  'refining my focus on things that really matter  like listening to silly requests from my kids and actually doing them  the dishes will be clean at the end of the day anyway ',
  'Refining my focus on things that really matter. Like listening to silly requests from my kids and actually doing them. The dishes will be clean at the end of the day anyway.'),
 ('215231197',
  'this year  i stopped being hard on myself  i m ok with just taking the fresh laundry out of the machine and letting it sit on a chair  untouched for days ',
  "This year, I stopped being hard 

time: 484 ms (started: 2022-05-03 10:36:07 -04:00)


In [9]:
# extract the hash words and keep the cleaned tweets
hash_tweet = txt_clean_tweet.map(lambda x:(x[0],extract_hash_words(x[1]),x[2]))
hash_tweet.take(5)

[('215231197', [], 'Stay gold, Pony Boy.'),
 ('215231197', [], 'your tone gives you away.'),
 ('215231197',
  [],
  'This unbelieving world has nowhere to rest its weary head.'),
 ('215231197', [], 'Jealousy hides in pride and calls itself discernment.'),
 ('215231197',
  [],
  'Refining my focus on things that really matter. Like listening to silly requests from my kids and actually doing them. The dishes will be clean at the end of the day anyway.')]

time: 516 ms (started: 2022-05-03 10:36:07 -04:00)


In [10]:
# only keep the hash words
hash_words = hash_tweet.map(lambda x:(x[0],x[1]))
hash_words.take(5)

[('215231197', []),
 ('215231197', []),
 ('215231197', []),
 ('215231197', []),
 ('215231197', [])]

time: 500 ms (started: 2022-05-03 10:36:08 -04:00)


In [11]:
# change the form of the hash words for LDA
## filter out the empty lists
hash_words = hash_words.filter(lambda x: x[1]!=[])
## group by userid
user_hash_words = hash_words.groupByKey().mapValues(list).collect()
user_hash_words[0:5]

[('257832450',
  [['ps', 'vgfgamers', 'godofwar'],
   ['philsprovisions'],
   ['philsprovisions'],
   ['flyeaglesfly'],
   ['philsprovisions'],
   ['philsprovisions'],
   ['philsprovisions'],
   ['philsprovision'],
   ['philsprovisions'],
   ['philsprovisions'],
   ['roadtorecovery'],
   ['epiccollapse', 'fantasyfootball'],
   ['philsprovisions'],
   ['philsprovisions']]),
 ('2484021120',
  [['greek', 'newyear'],
   ['greek', 'christosstaikouras'],
   ['life'],
   ['saudi', 'jamalkhashoggi', 'security'],
   ['exodus', 'lebanon', 'home', 'cyprus'],
   ['greece', 'cave', 'kapsia'],
   ['mars', 'perseverance', 'alien'],
   ['omicron', 'delta', 'lab'],
   ['invested', 'amazon'],
   ['projecttasmania', 'mysterious', 'blackbox', 'aliens'],
   ['sunken', 'warship', 'riverthames'],
   ['beautiful', 'greece', 'dragon', 'cave'],
   ['frigid', 'outdoors'],
   ['ghislainemaxwell', 'jeffreyepstein', 'sextrafficking', 'princeandrew'],
   ['turkey', 'secret', 'missiles', 'nuclear', 'nordicmonitor'],


time: 5.44 s (started: 2022-05-03 10:36:08 -04:00)


In [12]:
len(user_hash_words)

511

time: 0 ns (started: 2022-05-03 10:36:14 -04:00)


## Self-define LDA for Hash Words

In [13]:
import logging
import numpy as np
import random


class user_tweet:
    def __init__(self,tweets,word2id):
        self.t_words = [words(tweet,word2id) for tweet in tweets]
        self.size = len(tweets)

class user:
    def __init__(self,tmp):
        self.id = [user[0] for user in tmp]
        
        self.size = len(self.id)
        self.word2id = {}
        self.words = []
        self.wordsize = 0
        ind = 0
        for i in range(len(tmp)):
            for j in range(len(tmp[i][1])):
                for k in range(len(tmp[i][1][j])):
                    if tmp[i][1][j][k] in self.word2id:
                        self.words.append(self.word2id[tmp[i][1][j][k]])
                    else:
                        self.word2id[tmp[i][1][j][k]] = ind
                        self.words.append(ind)
                        ind += 1
        self.wordsize = len(self.words)
        self.tweets = dict({zzz[0]:user_tweet(zzz[1],self.word2id) for zzz in tmp})

class words:
    def __init__(self,tweet,word2id):
        self.words = []
        self.size = 0
        self.word2id = word2id
        for i in range(len(tweet)):
            self.words.append(self.word2id[tweet[i]])
        self.size = len(self.words)


class twitter_lda:
    def __init__(self,K,iter,alpha,beta,beta_b,gamma,data,top_max_num):
        self.K = K
        self.users = user(data)
        self.user_count = self.users.size
        self.word_count = self.users.wordsize
        self.iter = iter

        self.top_max_num = top_max_num

        self.alpha_g = alpha
        self.beta = beta
        self.beta_b = np.array([beta_b for i in range(self.word_count)])
        self.beta_b_sum = beta_b * self.word_count
        self.beta_word = np.array([beta for i in range(self.word_count)])
        self.beta_word_sum = beta * self.word_count
        self.gamma = np.array([gamma,gamma])

        self.alpha_sum = 0
        self.alpha_general = np.array([self.alpha_g for i in range(self.K)])
        self.alpha_sum = self.alpha_g*self.K

        self.c_ua = np.array([[0 for i in range(self.K)] for i in range(self.user_count)])
        self.theta_general = np.array([[0.0 for i in range(self.K)] for i in range(self.user_count)])

        self.c_lv=np.array([0,0])

        self.rho = np.array([0,0])

        #self.c_word = [[0 for i in range(self.word_count)] for i in range(self.K)]
        self.c_word = np.zeros((self.K,self.word_count))
        #self.phi_word = [[0.0 for i in range(self.word_count)] for i in range(self.K)]
        self.phi_word = np.zeros((self.K,self.word_count))
        
        self.c_b = np.array([0 for i in range(self.word_count)])
        self.phi_background = np.array([0.0 for i in range(self.word_count)])

        self.countAllWord = np.array([0 for i in range(self.K)])

    def ini(self):
        u = 0
        d = 0
        w = 0

        self.z = [[] for id in self.users.id]
        self.x = [[] for id in self.users.id]

        for u in range(self.users.size):
            id = self.users.id[u]
            self.z[u] = [0 for i in range(self.users.tweets[id].size)]
            self.x[u] = [[] for i in range(self.users.tweets[id].size)]
            #print(u)
            for d in range(self.users.tweets[id].size):
                twitter_word = self.users.tweets[id].t_words[d]
                self.x[u][d] = [False for i in range(twitter_word.size)]

                a_general = random.randint(0,self.K-1)

                self.z[u][d] = a_general
                self.c_ua[u][a_general] += 1

                for w in range(twitter_word.size):
                    word = twitter_word.words[w]
                    randback = random.randint(0,1)
                    if randback == 0:
                        self.c_lv[1] += 1
                        self.c_word[a_general][word] += 1
                        self.countAllWord[a_general] += 1
                        self.x[u][d][w] = True
                    else:
                        self.c_lv[0] += 1
                        self.c_b[word] += 1
                        self.x[u][d][w] = False

    def est(self):
        for i in range(self.iter):
            print("iteration: " + str(i))
            self.sweep()
        self.update_distribution()



    def sweep(self):
        for cntuser in range(self.users.size):
            id = self.users.id[cntuser]
            for cnttweet in range(self.users.tweets[id].size):
                twitter_word = self.users.tweets[id].t_words[cnttweet]
                self.sample_z(cntuser,cnttweet,id,twitter_word)
                for cntword in range(twitter_word.size):
                    word = twitter_word.words[cntword]
                    self.sample_x(cntuser,cnttweet,cntword,word)

    def update_distribution(self):

        for u in range(self.user_count):
            c_u_a = 0
            #for a in range(self.K):
                #c_u_a += self.c_ua[u][a]
            c_u_a = self.c_ua[u].sum()
            #for a in range(self.K):
                #self.theta_general[u][a] = (self.c_ua[u][a] + self.alpha_general[a])/(c_u_a + self.alpha_sum)
            self.theta_general[u] = (self.c_ua[u] + self.alpha_general[0])/(c_u_a + self.alpha_sum)
            for a in range(self.K):
                c_v = self.c_word[a].sum()
                #for v in range(self.word_count):
                 #   c_v += self.c_word[a][v]

                self.phi_word = (self.c_word + self.beta_word[0]) / (c_v + self.beta_word_sum)
            
            c_b_v = self.c_b.sum()
            #for v in range(self.word_count):
             #   c_b_v += self.c_b[v]
            
            self.phi_background = (self.c_b + self.beta_b[0]) / (c_b_v + self.beta_b_sum)
            #for v in range(self.word_count):
                #self.phi_background[v] = (self.c_b[v] + self.beta_b[v]) / (c_b_v + self.beta_b_sum)

            for l in range(2):
                self.rho[0] = (self.c_lv[0] + self.gamma[0]) / (self.c_lv[0] + self.c_lv[1] + self.gamma[0] + self.gamma[1])
                self.rho[1] = (self.c_lv[1] + self.gamma[1]) / (self.c_lv[0] + self.c_lv[1] + self.gamma[0] + self.gamma[1])

            print("finish:"+str(u)+"/"+str(self.user_count))




    def sample_x(self,u,d,n,word):
        binarylabel = self.x[u][d][n]
        binary = 0
        if binarylabel:
            binary = 1
        else:
            binary = 0
        self.c_lv[binary] -= 1
        if binary == 0:
            self.c_b[word] -= 1
        else:
            self.c_word[self.z[u][d]][word] -= 1
            self.countAllWord[self.z[u][d]] -= 1

        binarylabel = self.draw_x(u,d,n,word)

        self.x[u][d][n] = binarylabel

        if binarylabel:
            binary = 1
        else:
            binary = 0

        self.c_lv[binary] += 1

        if binary == 0:
            self.c_b[word] += 1
        else:
            self.c_word[self.z[u][d]][word] += 1
            self.countAllWord[self.z[u][d]] += 1


    def draw_x(self,u,d,n,word):
        p_lv = [0.0,0.0]
        pb = 1
        ptopic = 1
        p_lv[0] = (self.c_lv[0] + self.gamma[0])/(self.c_lv[0] + self.c_lv[1] + self.gamma[0] + self.gamma[1])
        p_lv[1] = (self.c_lv[1] + self.gamma[1]) / (self.c_lv[0] + self.c_lv[1] + self.gamma[0] + self.gamma[1])

        pb = (self.c_b[word] + self.beta_b[word])/(self.c_lv[0]+self.beta_b_sum)
        ptopic = (self.c_word[self.z[u][d]][word] + self.beta_word[word])/(self.countAllWord[self.z[u][d]] + self.beta_word_sum)

        p0 = pb * p_lv[0]
        p1 = pb * p_lv[1]

        sum = p0 + p1
        randpick = random.random()
        if randpick <= p0/sum:
            return False
        else:
            return True

    def sample_z(self,u,d,buffer_user,tw):
        tweet_topic = self.z[u][d]
        #w = 0
        self.c_ua[u][tweet_topic] -= 1
        for w in range(tw.size):
            word = tw.words[w]
            if self.x[u][d][w] == True:
                self.c_word[tweet_topic][word] -= 1
                self.countAllWord[tweet_topic] -= 1

        tweet_topic = self.draw_z(u,d,buffer_user,tw)

        self.z[u][d] = tweet_topic

        self.c_ua[u][tweet_topic] += 1
        for w in range(tw.size):
            word = tw.words[w]
            if self.x[u][d][w] == True:
                self.c_word[tweet_topic][word] += 1
                self.countAllWord[tweet_topic] += 1


    def draw_z(self,u,d,buffer_user,tw):

        p_topic = [0.0 for i in range(self.K)]
        self.pcount = [0 for i in range(self.K)]

        wordcnt = {}
        totalwords = 0

        for w in range(tw.size):
            if self.x[u][d][w]:
                totalwords += 1
                word = tw.words[w]
                if word not in wordcnt:
                    wordcnt[word] = 1
                else:
                    wordcnt[word] += 1

        for a in range(self.K):
            p_topic[a] = (self.c_ua[u][a] + self.alpha_general[a]) / (self.users.tweets[buffer_user].size - 1 + self.alpha_sum)
            buffer_p = 1
            i = 0
            for word,buffer_cnt in wordcnt.items():
                for j in range(buffer_cnt):
                    value = (self.c_word[a][word] + self.beta_word[word] + j) / (self.countAllWord[a] + self.beta_word_sum + i)
                    i += 1
                    buffer_p *= value
                    buffer_p = self.isoverflow(buffer_p,a)

            p_topic[a] *= pow(buffer_p,1.0)

        p_topic = self.recompute(p_topic,self.pcount)

        randz = random.random()

        sum = 0

        for a in range(self.K):
            sum += p_topic[a]

        thred = 0.0
        chosena = -1

        for a in range(self.K):
            thred += p_topic[a] / sum
            if thred >= randz:
                chosena = a
                break

        return chosena


    def recompute(self,p_topic,pcount):
        max = pcount[0]

        for i in range(1,len(pcount)):
            if pcount[i] > max:
                max = pcount[i]

        for i in range(len(pcount)):
            p_topic[i] = p_topic[i] * pow(1e150,pcount[i]-max)

        return p_topic

    def isoverflow(self,buffer_p,a2):
        if buffer_p>1e150:
            self.pcount[a2] += 1
            return buffer_p/1e150
        if buffer_p<1e-150:
            self.pcount[a2] -= 1
            return buffer_p * 1e150
        return buffer_p

    def getTop(self,phi):
        ind = 0
        rank = []
        tmp = []
        num = min(self.top_max_num,len(phi))
        for i in range(num):
            max = -100000
            for j in range(len(phi)):
                if j in tmp:
                    continue
                if phi[j] >max:
                    ind = j
                    max = phi[j]
            rank.append([ind,max])
            tmp.append(ind)
        return rank


    def outputWordsInTopics(self):

        topic_word = []
        va = dict({value:key for key,value in self.users.word2id.items()})
        for a in range(self.K):
            tmp = self.getTop(self.phi_word[a])
            topic_word.append([])
            for i in range(len(tmp)):
                topic_word[a].append([va[tmp[i][0]],tmp[i][1]])

        return topic_word


time: 0 ns (started: 2022-05-03 10:36:14 -04:00)


In [14]:
def outputWordsInTopics(phi_word, word2id, num_topic, num_top_words): ##################### add num_topic and num_top_words

        topic_word = []
        va = dict({value:key for key,value in word2id.items()})
        for a in range(num_topic):
            tmp = getTop(phi_word[a], num_top_words) ############################### add num_top_words
            topic_word.append([])
            for i in range(len(tmp)):
                #print(tmp[i])
                topic_word[a].append([va[tmp[i][0]],tmp[i][1]])

        return topic_word
    

def getTop(phi, num_top_words):############################### add num_top_words
    ind = 0
    rank = []
    tmp = []
    #num = min(self.top_max_num,len(phi))
    for i in range(num_top_words): ########################### I CHANGED 30 TO num_top_words##################
        max = -100000
        for j in range(len(phi)):
            if j in tmp:
                continue
            if phi[j] >max:
                ind = j
                max = phi[j]
        rank.append([ind,max])
        tmp.append(ind)
    return rank

time: 16 ms (started: 2022-05-03 10:36:14 -04:00)


In [15]:
# conduct LDA
num_topic = 30
num_iteration = 20
lda = twitter_lda(num_topic,num_iteration,0.01,0.01,0.01,0.01,user_hash_words,50)

time: 31 ms (started: 2022-05-03 10:36:14 -04:00)


In [16]:
lda.users.wordsize

20206

time: 0 ns (started: 2022-05-03 10:36:14 -04:00)


In [17]:
lda.ini()

time: 94 ms (started: 2022-05-03 10:36:14 -04:00)


In [18]:
lda.est()

iteration: 0
iteration: 1
iteration: 2
iteration: 3
iteration: 4
iteration: 5
iteration: 6
iteration: 7
iteration: 8
iteration: 9
iteration: 10
iteration: 11
iteration: 12
iteration: 13
iteration: 14
iteration: 15
iteration: 16
iteration: 17
iteration: 18
iteration: 19
finish:0/511
finish:1/511
finish:2/511
finish:3/511
finish:4/511
finish:5/511
finish:6/511
finish:7/511
finish:8/511
finish:9/511
finish:10/511
finish:11/511
finish:12/511
finish:13/511
finish:14/511
finish:15/511
finish:16/511
finish:17/511
finish:18/511
finish:19/511
finish:20/511
finish:21/511
finish:22/511
finish:23/511
finish:24/511
finish:25/511
finish:26/511
finish:27/511
finish:28/511
finish:29/511
finish:30/511
finish:31/511
finish:32/511
finish:33/511
finish:34/511
finish:35/511
finish:36/511
finish:37/511
finish:38/511
finish:39/511
finish:40/511
finish:41/511
finish:42/511
finish:43/511
finish:44/511
finish:45/511
finish:46/511
finish:47/511
finish:48/511
finish:49/511
finish:50/511
finish:51/511
finish:52/51

In [19]:
lda.outputWordsInTopics()

[[['mdundovibes', 0.045382376290023826],
  ['ghanamusic', 0.020810494083846824],
  ['mdundonews', 0.009469625373303595],
  ['happybirthday', 0.009469625373303595],
  ['dontcryprguy', 0.009469625373303595],
  ['killthebill', 0.007579480588213058],
  ['ghanaartiste', 0.007579480588213058],
  ['mdundogh', 0.007579480588213058],
  ['newmusicghana', 0.007579480588213058],
  ['listenonmdundo', 0.005689335803122519],
  ['mondaymotivation', 0.005689335803122519],
  ['ghmusic', 0.005689335803122519],
  ['tupperware', 0.005689335803122519],
  ['holdtheline', 0.005689335803122519],
  ['newyear', 0.0037991910180319812],
  ['tiktok', 0.0037991910180319812],
  ['respect', 0.0037991910180319812],
  ['f', 0.0037991910180319812],
  ['rahuldravid', 0.0037991910180319812],
  ['epl', 0.0037991910180319812],
  ['ktbffh', 0.0037991910180319812],
  ['mytwitteranniversary', 0.0037991910180319812],
  ['mdundobirthdays', 0.0037991910180319812],
  ['mdundomusic', 0.0037991910180319812],
  ['tupperwareid', 0.0037

time: 12.8 s (started: 2022-05-03 10:37:40 -04:00)


In [20]:
num_top_words = 15
topic_words = outputWordsInTopics(lda.phi_word,lda.users.word2id,num_topic,num_top_words)
topic_words_array = np.array(topic_words)[:,:,0]
topic_words_array

array([['mdundovibes', 'ghanamusic', 'mdundonews', 'happybirthday',
        'dontcryprguy', 'killthebill', 'ghanaartiste', 'mdundogh',
        'newmusicghana', 'listenonmdundo', 'mondaymotivation', 'ghmusic',
        'tupperware', 'holdtheline', 'newyear'],
       ['yahoo', 'nfl', 'jesussaves', 'lifestyle', 'mothersday',
        'altmillars', 'veracruz', 'jags', 'jax', 'covid', 'smartnews',
        'moderndeluxeseries', 'gopackgo', 'miraclesacross', 'nofearhere'],
       ['poshmark', 'shopmycloset', 'style', 'gmixmag', 'fashion',
        'marketing', 'businesstransformation', 'leadership',
        'changemanagement', 'leadershiptips', 'employeeexperience',
        'bitcoin', 'uhc', 'ghfutures', 'sustainability'],
       ['flyeaglesfly', 'eagles', 'nfl', 'nfltwitter', 'gobirds', 'jets',
        'takeflight', 'phi', 'phivsnyj', 'giants', 'nygvsphi',
        'goldcoast', 'iubb', 'kalundetalks', 'washingtonfootball'],
       ['dontspyonus', 'cleanupthessa', 'callofdutymodernwarfare',
     

time: 2.24 s (started: 2022-05-03 10:37:52 -04:00)


In [21]:
np.shape(topic_words_array)

(30, 15)

time: 0 ns (started: 2022-05-03 10:37:55 -04:00)


In [22]:
# preset column name
column_name = []
for i in range(num_topic):
    column_name.append('Topic'+str(i))
    
topic_words_pd = pd.DataFrame(topic_words_array.T, columns = column_name)
topic_words_pd

Unnamed: 0,Topic0,Topic1,Topic2,Topic3,Topic4,Topic5,Topic6,Topic7,Topic8,Topic9,...,Topic20,Topic21,Topic22,Topic23,Topic24,Topic25,Topic26,Topic27,Topic28,Topic29
0,mdundovibes,yahoo,poshmark,flyeaglesfly,dontspyonus,sccl,holidayherethisyear,dartmouth,biggbosstamil,nigeria,...,vimeo,startupindia,hepsat,ibs,kengenagm,travel,beeronthecarpet,playstore,bahrain,calibri
1,ghanamusic,nfl,shopmycloset,eagles,cleanupthessa,sclife,seeaustralia,inbusinessforabetterworld,biggboss,israel,...,whatshappeninginmyanmar,startupstory,sunburngoa,ibsat,football,ozmusic,beerwithaview,france,covid,microsoft
2,mdundonews,jesussaves,style,nfl,callofdutymodernwarfare,prolife,followsuit,q,sweepstakes,empathy,...,junta,startups,eagles,mba,energychampion,australasia,dailybrew,french,tf,healthtips
3,happybirthday,lifestyle,gmixmag,nfltwitter,research,cashbox,feelnsw,law,priyanka,lng,...,mar,startup,cashflow,mbacampus,le,aussie,mokapot,greece,weatherbahrain,doctorshospital
4,dontcryprguy,mothersday,fashion,gobirds,spain,death,thisiswa,newprofilepic,ccp,evidence,...,myanmar,enterpreneurs,africa,bestcollege,kengen,happynewyear,chiefskingdom,islamicheritagemonth,meteorology,bestpic
5,killthebill,altmillars,marketing,jets,rica,abortion,thisisqueensland,kuwait,amir,hydrogen,...,juntapolicebrutality,entrepreneurship,aviation,bestbschools,collegefootball,ozbands,impeachtrumpnow,australia,ccot,happydiwali
6,ghanaartiste,veracruz,businesstransformation,takeflight,austax,life,australiasnorthwest,criminalcourt,venmome,equality,...,juntaarmybrutality,business,staysafe,internship,sustainablekengen,aussiemusic,commerce,germany,omicron,gettysport
7,mdundogh,jags,leadership,phi,ps,top,ilovesydney,independenceday,pavni,empowerment,...,yangon,businessstory,lifeiscalling,career,sports,australasianmusic,removetrumpnow,cyprus,weather,pictureoftheday
8,newmusicghana,jax,changemanagement,phivsnyj,kerajaangagal,midlandsgives,chiefskingdom,cybercrimes,thamarai,visayas,...,againstmyanmarmilitarycoup,startupstories,liveinyourlivingroom,icfai,syracuse,chanyeol,communbr,uk,coronavirus,saltillo
9,listenonmdundo,covid,leadershiptips,giants,malaysia,nrlc,dodgers,jailsentence,raju,gas,...,juntacrimes,australia,jeno,admissions,nbatopshotthis,baekhyun,vocation,ahmadiyya,wtf,deepavali


time: 32 ms (started: 2022-05-03 10:37:55 -04:00)


In [53]:
topic_words_pd.to_csv('./result_topic_words.csv', index=True, header=True)

time: 31 ms (started: 2022-05-05 16:51:25 -04:00)


## Extract More Words for Each Topic

In [23]:
def find_pos(word):
    # Part of Speech constants
    # ADJ, ADJ_SAT, ADV, NOUN, VERB = 'a', 's', 'r', 'n', 'v'

    pos = nltk.pos_tag(nltk.word_tokenize(word))[0][1]
    
    # Adjective tags - 'JJ', 'JJR', 'JJS'
    if pos.lower()[0] == 'j':
        return 'a'
    # Adverb tags - 'RB', 'RBR', 'RBS'
    elif pos.lower()[0] == 'r':
        return 'r'
    # Verb tags - 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'
    elif pos.lower()[0] == 'v':
        return 'v'

    # Noun tags - 'NN', 'NNS', 'NNP', 'NNPS'
    else:
        return 'n'


def advanced_clean(txt):
        
    # remove url
    txt_nourl = re.sub("http\S+","",str(txt))
    
    # remove at
    txt_noat =  re.sub("@\S+","",txt_nourl)
    
    # remove punctuations and numbers
    txt_clean = re.sub('[^_a-zA-Z]',' ',txt_noat)
        
    # lowercase
    txt_lower = txt_clean.lower()
    
    # tokenize
    token = txt_lower.split()
    
    # check whether it is an English word
    english_words = []
    nltk_words = set(nltk.corpus.words.words())
    for word in token:
        if word in nltk_words and len(word)>2:
            english_words.append(word)
            
    # remove stopwords
    from nltk.corpus import stopwords
    lang_stopwords = stopwords.words('english')
    stopwords_removed = [w for w in english_words if w not in lang_stopwords]

    # lemmatize
    lemma_words = []
    wl = WordNetLemmatizer()
    for word in stopwords_removed:
        pos = find_pos(word)
        lemma_words.append(wl.lemmatize(word, pos))
    
    return lemma_words

time: 0 ns (started: 2022-05-03 10:37:55 -04:00)


In [24]:
# clean the tweets in an advanced way
hash_tweet_words = hash_tweet.map(lambda x: (x[0], x[1], advanced_clean(x[2])))
hash_tweet_words.take(5)

[('215231197', [], ['stay', 'gold', 'pony', 'boy']),
 ('215231197', [], ['tone', 'away']),
 ('215231197',
  [],
  ['unbelieving', 'world', 'nowhere', 'rest', 'weary', 'head']),
 ('215231197', [], ['jealousy', 'pride', 'discernment']),
 ('215231197',
  [],
  ['refining',
   'focus',
   'really',
   'matter',
   'like',
   'listen',
   'silly',
   'actually',
   'clean',
   'end',
   'day',
   'anyway'])]

time: 3.17 s (started: 2022-05-03 10:37:55 -04:00)


In [25]:
# remove empty hash_word list 
hash_tweet_words_noempty = hash_tweet_words.filter(lambda x: x[1]!=[] and x[2]!=[])
hash_tweet_words_noempty.take(5)

[('215231197', ['homeschooling'], ['god', 'need', 'patience']),
 ('215231197', ['munchies'], ['rocky', 'road', 'whisky']),
 ('198158303', ['bhooma_labs'], ['clock', 'link']),
 ('198158303',
  ['samskritam'],
  ['please',
   'join',
   'listen',
   'special',
   'lecture',
   'series',
   'scientific',
   'heritage',
   'part',
   'night',
   'club',
   'house',
   'room']),
 ('198158303',
  ['hosadiganta', 'chemistry'],
  ['ancient', 'work', 'alchemy', 'chemistry', 'chemistry'])]

time: 12.4 s (started: 2022-05-03 10:37:58 -04:00)


In [26]:
# temp = hash_tweet_words_noempty.map(lambda x: x[1]).collect()
# print(temp)

time: 0 ns (started: 2022-05-03 10:38:10 -04:00)


In [27]:
# change the form of hash_tweet 
hash_tweet_words_flat = hash_tweet_words_noempty.flatMap(lambda x: [(i, x[2]) for i in x[1]]).reduceByKey(lambda x, y: x+y).cache()
hash_tweet_words_flat.take(10)

[('bhooma_labs', ['clock', 'link']),
 ('religion',
  ['word',
   'religion',
   'represent',
   'dharma',
   'read',
   'article',
   'love',
   'religion',
   'sure',
   'must',
   'read',
   'essay',
   'matter',
   'please',
   'read',
   'comment',
   'circulate',
   'activist',
   'inclusion',
   'south',
   'constitution',
   'consider',
   'progressive',
   'manifest',
   'religion',
   'ministry',
   'gay',
   'bisexual',
   'two',
   'different',
   'find',
   'common',
   'ground',
   'monk',
   'pastor',
   'share',
   'belief',
   'organize',
   'religion',
   'coexist',
   'even',
   'society',
   'religious',
   'platform',
   'promise',
   'certainty',
   'simply',
   'offer',
   'faith',
   'existence',
   'god',
   'faith',
   'religion',
   'religious',
   'platform',
   'promise',
   'certainty',
   'simply',
   'offer',
   'faith',
   'existence',
   'god',
   'faith',
   'religion',
   'religious',
   'platform',
   'promise',
   'certainty',
   'simply',
   'offer

time: 21min 39s (started: 2022-05-03 10:38:10 -04:00)


In [28]:
# hash_tweet_words_noempty.count()

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [29]:
# topic = 0
# topic_hash_words_list = topic_words_pd.iloc[:, topic].values.tolist()
# topic_hash_words_list

time: 15 ms (started: 2022-05-03 10:59:50 -04:00)


In [30]:
# single_word = 'travel'
# temp = hash_tweet_words_noempty.filter(lambda x: single_word in x[1])#singleword in x
# temp.take(5)

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [31]:
# # group tweet words topics
# topic_words = hash_tweet_words_flat.join(all_topic_hash_words)\
#                     .map(lambda x: (x[1][1], [x[0]]+x[1][0]))
# print(topic_words.take(5))

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [32]:
# # produce all_topic_hash_words
# all_topic_hash_words = sc.parallelize([])
# for topic in range(num_topic):
#     topic_hash_words_list = topic_words_pd.iloc[:, topic].values.tolist()
#     topic_hash_words_rdd = sc.parallelize(topic_hash_words_list).map(lambda x: (x, 'Topic'+str(topic)))
#     all_topic_hash_words = all_topic_hash_words.union(topic_hash_words_rdd)

# all_topic_hash_words = all_topic_hash_words.cache()
# all_topic_hash_words.take(20)

time: 16 ms (started: 2022-05-03 10:59:50 -04:00)


In [33]:
# # join topic and words
# topic_words = hash_tweet_words_flat.join(all_topic_hash_words)
# print(topic_words.take(5))

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [34]:
# # change the form
# topic_words = topic_words.map(lambda x: (x[1][1], [x[0]]+x[1][0]))
# print(topic_words.take(5))

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [35]:
# # count the frequency
# topic_words_freq = topic_words.flatMap(lambda x: [((x[0],w),1) for w in x[1]]) \
#                 .reduceBy(lambda x1, x2: x1+x2) \
#                 .map(lambda x: (x[0][0], x[0][1], x[1]))
# print(topic_words_freq.take(5))

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [36]:
# # output in a .csv file
# df_ans = spark.createDataFrame(topic_words_freq, ['topicNum', 'word', 'frequency']).toPandas().to_csv('./result_topic_words_freq.csv', mode='a', index=False, header=False)

time: 0 ns (started: 2022-05-03 10:59:50 -04:00)


In [38]:
# group tweet words by topics
for topic in range(num_topic):
    topic_hash_words_list = topic_words_pd.iloc[:, topic].values.tolist()
    topic_words = hash_tweet_words_flat.filter(lambda x: x[0] in topic_hash_words_list)\
                    .map(lambda x: (topic, [x[0]]+x[1]))
    
    topic_words_freq = topic_words.flatMap(lambda x: [((x[0],w),1) for w in x[1]]) \
                    .reduceByKey(lambda x1, x2: x1+x2) \
                    .map(lambda x: (x[0][0], x[0][1], x[1]))
#     print(topic_words_freq.take(5))
    # output in a .csv file
    df_ans = spark.createDataFrame(topic_words_freq, ['topicNum', 'word', 'frequency']).toPandas().to_csv('./result_topic_words_freq.csv', mode='a', index=False, header=False)

[(0, 'storm', 1), (0, 'corona', 1), (0, 'eight', 1), (0, 'dramatic', 1), (0, 'take', 7)]
[(1, 'prayer', 2), (1, 'one', 9), (1, 'unwavering', 1), (1, 'seek', 1), (1, 'product', 1)]
[(2, 'sometimes', 2), (2, 'must', 2), (2, 'run', 2), (2, 'though', 11), (2, 'work', 7)]
[(3, 'doggone', 1), (3, 'one', 10), (3, 'back', 8), (3, 'list', 17), (3, 'hand', 5)]
[(4, 'let', 3), (4, 'right', 9), (4, 'identify', 6), (4, 'stop', 5), (4, 'impunity', 6)]
[(5, 'steady', 5), (5, 'press', 1), (5, 'release', 1), (5, 'bill', 6), (5, 'carol', 9)]
[(6, 'supportlarb', 1), (6, 'two', 8), (6, 'form', 1), (6, 'three', 4), (6, 'new', 3)]
[(7, 'strategy', 1), (7, 'coal', 1), (7, 'sell', 2), (7, 'time', 7), (7, 'add', 1)]
[(8, 'team', 9), (8, 'ulla', 5), (8, 'open', 6), (8, 'madam', 6), (8, 'water', 2)]
[(9, 'session', 2), (9, 'follow', 12), (9, 'strategy', 1), (9, 'first', 1), (9, 'technological', 1)]
[(10, 'yellow', 5), (10, 'beef', 20), (10, 'delivery', 151), (10, 'incredible', 5), (10, 'doorstep', 4)]
[(11, 'nee

## Find the Topic for a Specific User

In [47]:
# input the topic words data
topic_words_freq_df = spark.read.csv(
    "result_topic_words_freq.csv", 
    header=False,
    inferSchema = True
)
temp = topic_words_freq_df.select('_c0', '_c1', '_c2')
topic_words_freq = temp.rdd.map(lambda x: [x[1], ('Topic'+str(x[0]), x[2])])
topic_words_freq.take(5)

[['storm', ('Topic0', 1)],
 ['corona', ('Topic0', 1)],
 ['eight', ('Topic0', 1)],
 ['dramatic', ('Topic0', 1)],
 ['take', ('Topic0', 7)]]

time: 656 ms (started: 2022-05-03 11:46:29 -04:00)


In [48]:
# input the history tweets of the specific user
input_user_df = spark.read.csv(
    "input_user_tweets.csv", 
    header=True,
    inferSchema = True
)
temp = input_user_df.select('user_id', 'tweet')
txt_row = temp.rdd
input_user = txt_row.map(lambda x: [str(x[0]), x[1]])
input_user.take(2)

[['215231197', 'Stay gold, Pony Boy.'],
 ['215231197', 'your tone gives you away.']]

time: 594 ms (started: 2022-05-03 11:46:30 -04:00)


In [49]:
# record the userid 
userid = input_user.take(1)[0][0]
print('The user ID is:', userid)

# produce the tweet words list
input_user_clean = input_user.map(lambda x: (x[0], extract_hash_words(easy_clean(x[1]))+advanced_clean(x[1])))\
                                .reduceByKey(lambda x1, x2: x1+x2)
input_words_freq = input_user_clean.flatMap(lambda x: [(w, 1) for w in x[1]])\
                                .reduceByKey(lambda x1, x2: x1+x2) # remove duplicate
input_words_freq.collect()

The user ID is: 215231197


[('stay', 2),
 ('gold', 1),
 ('pony', 1),
 ('boy', 1),
 ('tone', 1),
 ('away', 1),
 ('unbelieving', 2),
 ('world', 6),
 ('nowhere', 1),
 ('rest', 2),
 ('weary', 1),
 ('head', 1),
 ('jealousy', 1),
 ('pride', 1),
 ('discernment', 1),
 ('refining', 1),
 ('focus', 1),
 ('really', 3),
 ('matter', 1),
 ('like', 5),
 ('listen', 1),
 ('silly', 1),
 ('actually', 1),
 ('clean', 3),
 ('end', 2),
 ('day', 4),
 ('anyway', 1),
 ('year', 1),
 ('stop', 3),
 ('hard', 1),
 ('take', 1),
 ('fresh', 2),
 ('laundry', 1),
 ('machine', 1),
 ('sit', 1),
 ('chair', 1),
 ('untouched', 1),
 ('marriage', 1),
 ('paper', 1),
 ('sign', 2),
 ('afraid', 2),
 ('good', 5),
 ('conscience', 1),
 ('idolatry', 1),
 ('spiritual', 1),
 ('adultery', 1),
 ('inside', 1),
 ('job', 1),
 ('fear', 4),
 ('wisdom', 2),
 ('wish', 1),
 ('see', 4),
 ('reveal', 2),
 ('finish', 1),
 ('want', 10),
 ('peace', 2),
 ('understand', 4),
 ('try', 1),
 ('humility', 1),
 ('never', 3),
 ('meant', 1),
 ('understood', 1),
 ('admire', 1),
 ('wrong', 1)

time: 18.6 s (started: 2022-05-03 11:46:30 -04:00)


In [50]:
# find the topic of each words
input_word_topics = input_words_freq.join(topic_words_freq)\
                                .map(lambda x: (x[0], x[1][1][0], x[1][0]*x[1][1][1]))
input_word_topics.collect()

[('gold', 'Topic14', 3),
 ('gold', 'Topic26', 2),
 ('pony', 'Topic8', 1),
 ('boy', 'Topic0', 2),
 ('boy', 'Topic6', 3),
 ('boy', 'Topic12', 1),
 ('boy', 'Topic13', 2),
 ('boy', 'Topic15', 1),
 ('boy', 'Topic18', 2),
 ('tone', 'Topic2', 2),
 ('tone', 'Topic6', 1),
 ('away', 'Topic0', 3),
 ('away', 'Topic1', 1),
 ('away', 'Topic3', 2),
 ('away', 'Topic5', 3),
 ('away', 'Topic6', 9),
 ('away', 'Topic11', 1),
 ('away', 'Topic12', 1),
 ('away', 'Topic15', 4),
 ('away', 'Topic16', 2),
 ('away', 'Topic18', 2),
 ('away', 'Topic20', 15),
 ('away', 'Topic22', 2),
 ('away', 'Topic24', 2),
 ('away', 'Topic28', 1),
 ('world', 'Topic0', 36),
 ('world', 'Topic1', 6),
 ('world', 'Topic2', 12),
 ('world', 'Topic4', 6),
 ('world', 'Topic6', 96),
 ('world', 'Topic7', 12),
 ('world', 'Topic8', 54),
 ('world', 'Topic9', 12),
 ('world', 'Topic10', 30),
 ('world', 'Topic11', 24),
 ('world', 'Topic12', 18),
 ('world', 'Topic13', 18),
 ('world', 'Topic14', 12),
 ('world', 'Topic15', 30),
 ('world', 'Topic16', 

time: 2.84 s (started: 2022-05-03 11:46:49 -04:00)


In [51]:
# count and sort the number of related topics
input_related_topics = input_word_topics.map(lambda x: (x[1], x[2]))\
                                        .reduceByKey(lambda x1, x2: x1+x2)\
                                        .sortBy(lambda x: x[1], ascending=False)
input_related_topics.collect()

[('Topic23', 4501),
 ('Topic2', 3601),
 ('Topic21', 2636),
 ('Topic27', 2340),
 ('Topic22', 2035),
 ('Topic6', 1970),
 ('Topic5', 1795),
 ('Topic29', 1787),
 ('Topic19', 1762),
 ('Topic20', 1693),
 ('Topic17', 1604),
 ('Topic18', 1557),
 ('Topic3', 1475),
 ('Topic10', 1461),
 ('Topic12', 1160),
 ('Topic15', 1091),
 ('Topic0', 1034),
 ('Topic11', 1028),
 ('Topic1', 1024),
 ('Topic25', 978),
 ('Topic28', 977),
 ('Topic13', 853),
 ('Topic9', 847),
 ('Topic14', 793),
 ('Topic8', 779),
 ('Topic4', 760),
 ('Topic16', 707),
 ('Topic24', 559),
 ('Topic7', 329),
 ('Topic26', 188)]

time: 4.67 s (started: 2022-05-03 11:46:52 -04:00)


In [52]:
print('The most relevant topic is:', input_related_topics.take(1)[0][0])

The most relevant topic is: Topic23
time: 453 ms (started: 2022-05-03 11:46:56 -04:00)


In [56]:
topic_word_top5 = pd.read_csv('result_topic_words.csv')

time: 16 ms (started: 2022-05-05 16:53:51 -04:00)


In [61]:
topic_word_top5_list = topic_word_top5[input_related_topics.take(1)[0][0]][:5].values.tolist()

time: 453 ms (started: 2022-05-05 16:56:26 -04:00)


In [62]:
topic_word_top5_list

['ibs', 'ibsat', 'mba', 'mbacampus', 'bestcollege']

time: 0 ns (started: 2022-05-05 16:56:34 -04:00)
