#  Appendix B) WCD BIG DATA PROJECT - TWITTER - INFLATION - CLUSTERING / TOPIC MODELLING

**WeCloudData Bootcamp 2022 (Part-time Cohort)**<br> </font>
By: Kevin Jeswani & Junaid Zafar <br>
The set of notebooks are segmented for the purpose of clarity & convenience <br>
The following is the suggested order for running the scripts:
- '1_WCD_Twitter_Inflation_Classification' - Mounted S3 bucket for inflation tweets, copied over twitter data, tweet cleaning. VADER & Spark-NLP pre-trained model is used to apply labels to the inflation tweets. The data is then transformed with spark-ml. Logistic regression & random forest are built and trained with gridsearchCV on the label and transformed token features.
- '2_WCD_Twitter_AllTopics_Clustering'  - All topics in the WCD twitter bucket are filtered, custom transformers are built and inserted into an extensive pipeline to load raw data from Kinesis firehose. Clustering uses Latent Dirichlet Allocation is conducted using a custom gridsearch to perform topic modelling.<br>

**Appendices** - Please note these notebooks are included simply as supporting information and to show that other experiments and exercises were conduct. Less time and effort was spent formatting on these notebooks, whereas Notebook 1) and 2) are the main submission documents.
- 'AppA_WCD_Twitter_Inflation_Classification_MLPOnly' - Experimentation for classification with multi-layer perceptron models - originally at the end of Notebook 1)
- 'AppB_WCD_Twitter_Inflation_Clustering' **This Notebook**  -Inflation tweet data with Spark-NLP labels imported, custom transformer for data cleaning built and combined with standard nlp transformers in a pipeline. LDA clustering implemented to model topics in the inflation dataset. An attempt was made with a GMM clustering model.
- 'AppC_WCD_Twitter_AllTopics_52mil_Clustering' - ALL streamed tweets (55mil+) are loaded from the WCD bucket, a transformation pipeline is built and all the data is transformed. A LDM clustering is built to cluster all the topics. 
- 'AppD_WCD_Twitter_AllTopics_Clustering_Evaluation' - An attempt was made to visualize the clustering using principal component analysis and t-SNE, but the data transformation required was too heavy to process and other issues occured.

## 1.0 Import Data & Setup

In [0]:
# Section Imports
from pyspark.sql import SparkSession #create a spark session
from pyspark import SparkContext, SparkConf #for Spark NLP
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType #for schema & for data processing
import pyspark.sql.functions as spsql #spark sql functions with alias

In [0]:
# Initialize Spark Session
spark = SparkSession \
        .builder \
        .appName('Twitter Clustering') \
        .getOrCreate()
print('Session created')
# Setup for SparkNLP in case used
# Follow instructions for spark config: https://nlp.johnsnowlabs.com/docs/en/install
sparknlp_config = SparkConf().set('spark.kryoserializer.buffer.max', 2000).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Standard Spark Context
sc = SparkContext.getOrCreate(conf=sparknlp_config) #reset the spark context with the updated settings

Session created


In [0]:
%fs ls /mnt/my_bucket/

path,name,size,modificationTime
dbfs:/mnt/my_bucket/twitterInflation_ML_in.parquet/,twitterInflation_ML_in.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_clean.parquet/,twitterInflation_clean.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_features.parquet/,twitterInflation_features.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_idf.parquet/,twitterInflation_idf.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_raw.csv/,twitterInflation_raw.csv/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_sparknlp.parquet/,twitterInflation_sparknlp.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_sparknlp_in.parquet/,twitterInflation_sparknlp_in.parquet/,0,1673332733228
dbfs:/mnt/my_bucket/twitterInflation_vader.parquet/,twitterInflation_vader.parquet/,0,1673332733228


In [0]:
# Read for cleaned tweet data - without transformers
tweets_clean_in = spark.read.parquet('/mnt/my_bucket/twitterInflation_clean.parquet') 

In [0]:
# Read for tweet data with sentiment labelled with spark-nlp (for comparison with clustering results)
tweets_sparknlp_in = spark.read.parquet('/mnt/my_bucket/twitterInflation_sparknlp.parquet')

In [0]:
display(tweets_sparknlp_in)

id,name,username,tweet,followers_count,location,created_at,text,F_retweet,F_reply,F_mentions,F_hastag,mentions,hastags,vader_score,vader_label,snlp_sentiment,label
1602659953285951493,STOCK TRAIN,stocktrain2,10-year Treasury yield drops below 3.5% after inflation reading @CNBC https://t.co/qwJgRZngMw,34024,,Tue Dec 13 13:41:23 +0000 2022,10-year Treasury yield drops below 3.5% after inflation reading,False,False,True,False,List(CNBC),List(),0.2023,2,negative,0.0
1602659956016418816,Crutcial News of Crypto’s,CrusNewsCrypto,BREAKING: U.S. inflation slowed again last month in the latest sign that price increases are slowly cooling despite… https://t.co/GsyGAFPHof,51446,,Tue Dec 13 13:41:24 +0000 2022,BREAKING: U.S. inflation slowed again last month in the latest sign that price increases are slowly cooling despite…,False,False,False,False,List(),List(),0.0,2,negative,0.0
1602659957240868867,Fred Randall,FredRandall15,@StonedSportDude @disclosetv Ummm dude they just sent more money to Ukraine. Watch after Christmas. 2023 inflation will be worse.,23,,Tue Dec 13 13:41:24 +0000 2022,Ummm dude they just sent more money to Ukraine. Watch after Christmas. 2023 inflation will be worse.,False,False,True,False,"List(StonedSportDude, disclosetv)",List(),-0.4767,0,negative,0.0
1602659958243434500,Bloomberg Opinion,opinion,The one huge question mark is 🇨🇳 What is the role of China in the global inflation debate? China is in a weak pos… https://t.co/kFBtRClobX,495471,Worldwide,Tue Dec 13 13:41:24 +0000 2022,The one huge question mark is 🇨🇳 What is the role of China in the global inflation debate? China is in a weak pos…,False,False,False,False,List(),List(),-0.1531,2,negative,0.0
1602659958813761536,Stephanie Link,Stephanie_Link,Welcome to the Santa Claus rally post CPI at less than expected .1% m/m and 7.1% y/y - thanks to goods inflation lo… https://t.co/CTNJ8cJApA,144280,NJ/NY,Tue Dec 13 13:41:24 +0000 2022,Welcome to the Santa Claus rally post CPI at less than expected .1% m/m and 7.1% y/y - thanks to goods inflation lo…,False,False,False,False,List(),List(),0.7096,1,positive,1.0
1602659958897709057,XULQI MOON,XULQIMOON,I don't know what real CPI in Pakistan is to be honest because lots of products have had about 50-100% increase sin… https://t.co/gnTkrDaehr,3781,International Space Station,Tue Dec 13 13:41:24 +0000 2022,I don't know what real CPI in Pakistan is to be honest because lots of products have had about 50-100% increase sin…,False,False,False,False,List(),List(),0.6808,2,negative,0.0
1602659959765876736,J0x3y.eth 🚀,Godskid_CFC,"Ladies and gentlemen, we are back up",2437,Data,Tue Dec 13 13:41:25 +0000 2022,"Ladies and gentlemen, we are back up",False,False,False,False,List(),List(),0.0,2,positive,1.0
1602659959984238592,"Stoy T Hall, CFP®",Stoy_Hall,"You will see the #markets go crazy today, all because #inflation year over year is 7.1%... Reminder that is still… https://t.co/VNzVfFecbV",790,,Tue Dec 13 13:41:25 +0000 2022,"You will see the go crazy today, all because year over year is 7.1%... Reminder that is still…",False,False,False,True,List(),"List(markets, inflation)",-0.34,2,positive,1.0
1602659960780906510,CRYPTO-MARS,CryptoMars24,"@ElonMartians BREAKING: 🇺🇸 US inflation falls to 7.1%, lower than expectations https://t.co/rsOzXAWD7T",1268,,Tue Dec 13 13:41:25 +0000 2022,"BREAKING: 🇺🇸 US inflation falls to 7.1%, lower than expectations",False,False,True,False,List(ElonMartians),List(),-0.296,2,negative,0.0
1602659962370805760,Matt R,MattR3232,@TallmanTrades 7.1% CPI. Expected was 7.3%. Perhaps inflation is coming down which is bullish for stocks as more mo… https://t.co/3ZnMWVjz2K,490,,Tue Dec 13 13:41:25 +0000 2022,7.1% CPI. Expected was 7.3%. Perhaps inflation is coming down which is bullish for stocks as more mo…,False,False,True,False,List(TallmanTrades),List(),0.0,2,negative,0.0


## 2.0 Transformation Pipeline

In [0]:
# Section Imports
from pyspark.ml.feature import Tokenizer #tokenization - break strings into words
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF #remove stopwords (words integral to sentence structure, but no value w.r.t sentiment); Get term-frequency (count vectorization)
                                                                      #Inverse-Document Frequency
from pyspark.ml.feature import   NGram, HashingTF #Vector Assembler to process IDF struct; NGram, Token-Frequency
from pyspark.ml.feature import  VectorAssembler, ChiSqSelector #feature assembly inal stage to process 1gram_idf and 2gram_tf


#Custom Transformer Building
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark import keyword_only
import re

### 2.1 Develop a custom transformer

In [0]:
# Develop a custom transformer class to peform series of regex replace functions on an input streaming df
# Source with modifications: https://csyhuang.github.io/2020/08/01/custom-transformer/ & https://www.crowdstrike.com/blog/deep-dive-into-custom-spark-transformers-for-machine-learning-pipelines/
class RegexTransformer(Transformer, #Main class
                        HasInputCol, #Setup for output_col parameter
                        HasOutputCol, #Setup for output_col parameter
                        DefaultParamsReadable, 
                        DefaultParamsWritable):
    '''
    Custom Transformer for Spark >3.0; wrapper for a regex replace function
    Input: input_col = column name to be transformed, output_col = column_name resulting from transformations, regex_rules = tuple list for regex substitute procedure rules
    Output Transform a df when RegexTransformer().transform(input_col,output_col,regex_rules) is called or put into pyspark ml pipeline
    '''
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString) #input_col parameter
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString) #output_col parameter
    regex_rules = Param(Params._dummy(), "regex_rules","list of tuples for regex replace procedures; first el is regex str, 2nd el is str to be replaced with") 
    f_trim = Param(Params._dummy(), "f_trim","flag to apply .trim() to input text")
    f_lower = Param(Params._dummy(), "f_lower","flag to apply .lower() to input text")
    
    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output", regex_rules=None,f_trim=True,f_lower=True):
        super(RegexTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None, regex_rules=None,f_trim=True,f_lower=True) #default settings to None - always requires input
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output", regex_rules=None,f_trim=True,f_lower=True): #initialize the parameters
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)       
      
    def get_regex_rules(self):
        return self.getOrDefault(self.regex_rules) 
    
    def get_f_trim(self):
        return self.getOrDefault(self.f_trim)
    
    def get_f_lower(self):
        return self.getOrDefault(self.f_lower)
                      
    # Required if you use Spark >= 3.0
    #def setInputCol(self, new_inputCol):
    #    return self.setParams(inputCol=new_inputCol)
  
    # Required if you use Spark >= 3.0
    #def setOutputCol(self, new_outputCol):
     #   return self.setParams(outputCol=new_outputCol

    # Main Transformer Function
    def _transform(self, df: DataFrame): 
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        regex_rules = self.get_regex_rules() #custom parameter input, list of tuples
        f_trim = self.get_f_trim()
        f_lower = self.get_f_lower()
        #for every tuple in the regex_rules, use the regex pattern in the 1st el. and replace with 2nd el.
        #udf applying regex substitute to each row
        for i,(regex,repl) in enumerate(regex_rules):
            transform_udf = spsql.udf(lambda x: re.sub(regex,repl,x), StringType()) 
            if i==0:
                target_col = input_col
            else:
                target_col = output_col
            df = df.withColumn(output_col, transform_udf(target_col))     #apply udf to the input_col
        # Trim & Lower-case if flags are true
        if f_lower==True:
            df = df.withColumn(output_col, spsql.lower(output_col))  
        if f_trim==True:
            df = df.withColumn(output_col, spsql.trim(output_col))  
        return df

1) Use regular expressions `reg_exp_replace`  to remove urls (can use this for another analysis later on) <br>
2) Remove "RT @___:" for retweets <br>
3) remove mentions "@___ " & replies "@___: " <br>
4) replace "& amp;" (without the space) and replace with "&" only <br>
5) Remove all special characters except '&' as its used for 's&p500' for example <br>
6) Substitute multiple spaces with single space <Br>
7) Lowercase all text (can examine this later on fo RTs and also for emotions with all capital words) <br>
8) remove 'inflation' every line is related to inflation, it's polluting the topic modelling <br>
8) Trim leading/trailing whitespaces

In [0]:
regex_list = [(r"http\S+", " "), #urls
              (r"^(RT @)([a-zA-Z0-9_]{1,50}):", " "), #retweets
              (r"@([a-zA-Z0-9_]{1,50}) ", " "), #mentions
              (r"@([a-zA-Z0-9_]{1,50}): ", " "), #replies
              (r"amp;", ""), #change &amp; to & only
              (r"[^A-Za-z&]", " "), #all special characters except &
              (r"\s+", " "), #replace multi space with single       
              (r"(\W|^)inflation(\W|$)"," "),
              (r"(\W|^)Inflation(\W|$)"," ")
             ]

regex_replacer = RegexTransformer(input_col='tweet',output_col='tweet_clean',regex_rules=regex_list,f_trim=True,f_lower=True) #pass regex_list, apply lowercase and trim
# Test it
df_test = tweets_sparknlp_in.sample(0.001,123)
df_clean = regex_replacer.transform(df_test)

In [0]:
display(df_clean)

id,name,username,tweet,followers_count,location,created_at,text,F_retweet,F_reply,F_mentions,F_hastag,mentions,hastags,vader_score,vader_label,snlp_sentiment,label,tweet_clean
1601739936889438208,augusto lori,augustolori,"@highbrow_nobrow When inflation goes up, the deficit goes down. So what Biden and the Democratic Party did in this… https://t.co/8vBYIH0uwm",6,,Sun Dec 11 00:45:34 +0000 2022,"When inflation goes up, the deficit goes down. So what Biden and the Democratic Party did in this…",False,False,True,False,List(highbrow_nobrow),List(),0.0,2,negative,0.0,when goes up the deficit goes down so what biden and the democratic party did in this
1602569369917263873,Javier Santillán,Javier_SaVel,Not my fault not even @Claudiashein @samuel_garcias. And definitely not @lopezobrador_ . Is is the fault of currenc… https://t.co/k56SljzD4B,40,,Tue Dec 13 07:41:26 +0000 2022,Not my fault not even @samuel_garcias. And definitely not . Is is the fault of currenc…,False,False,True,False,"List(Claudiashein, samuel_garcias, lopezobrador_)",List(),0.3089,2,negative,0.0,not my fault not even samuel garcias and definitely not is is the fault of currenc
1602569445691609088,FCI_Futures,Fci_Futures,#gold / #XAUUSD ANALYSIS BIG DAYS AHEAD - EYE ON US #CPI AND #FOMC + CPI = HAWKISH FED = - GOLD - CPI =… https://t.co/nDV0PzMuto,86,India,Tue Dec 13 07:41:44 +0000 2022,/ ANALYSIS BIG DAYS AHEAD - EYE ON US AND + CPI = HAWKISH FED = - GOLD - CPI =…,False,False,False,True,List(),"List(gold, XAUUSD, CPI, FOMC)",0.0,2,positive,1.0,gold xauusd analysis big days ahead eye on us cpi and fomc cpi hawkish fed gold cpi
1601923655969435648,Souvik,_SouvikSaha,10 marks for what could go wrong?,65,timbuktoo,Sun Dec 11 12:55:36 +0000 2022,10 marks for what could go wrong?,False,False,False,False,List(),List(),-0.4767,0,positive,1.0,marks for what could go wrong
1602243609562152962,Maximilian,MAXlMlIlAN,@stu10100 @helenpidd @YorkshireWater Tldr: 1973 Water Act made it work well for a while. Then during inflation gov… https://t.co/xy85smiQZX,30,,Mon Dec 12 10:06:59 +0000 2022,Tldr: 1973 Water Act made it work well for a while. Then during inflation gov…,False,False,True,False,"List(stu10100, helenpidd, YorkshireWater)",List(),0.2732,2,negative,0.0,tldr water act made it work well for a while then during gov
1601741134690078720,Juan Cardenes,juan_cardenes2,"@POTUS What that heck you talking about?? Inflation is killing people, Rent are at a all time high, houses are unaf… https://t.co/YLgkx8m79V",2,,Sun Dec 11 00:50:20 +0000 2022,"What that heck you talking about?? Inflation is killing people, Rent are at a all time high, houses are unaf…",False,False,True,False,List(POTUS),List(),-0.6966,0,negative,0.0,what that heck you talking about is killing people rent are at a all time high houses are unaf
1601748053253566464,Santa Rally-cession Citrini,Citrini7,@John_J_Ahearn Depends on the method & modality of stimulus. Many have made the very strong case that the only way… https://t.co/nhpJKoSQAo,2424,"Greenwich, CT",Sun Dec 11 01:17:49 +0000 2022,Depends on the method & modality of stimulus. Many have made the very strong case that the only way…,False,False,True,False,List(John_J_Ahearn),List(),0.5563,2,positive,1.0,depends on the method & modality of stimulus many have made the very strong case that the only way
1601735999939178496,STEELCITYNATION,SCityNATION412,@POTUS And it resulted in the highest level of inflation in a half century and many economists are still expecting… https://t.co/wJndDVIiFC,17471,"Pittsburgh, PA",Sun Dec 11 00:29:55 +0000 2022,And it resulted in the highest level of inflation in a half century and many economists are still expecting…,False,False,True,False,List(POTUS),List(),0.0,2,positive,1.0,and it resulted in the highest level of in a half century and many economists are still expecting
1601927990782398465,Glenn Morris,GlennPaul,"@GavinWax Inflation, economy , energy, defense, war, employment, American traditional values President… https://t.co/lbN9P5UVlv",101,Emerald Green Gulf Coast,Sun Dec 11 13:12:50 +0000 2022,"Inflation, economy , energy, defense, war, employment, American traditional values President…",False,False,True,False,List(GavinWax),List(),0.1027,2,positive,1.0,economy energy defense war employment american traditional values president
1601921951831707649,enjoying beach to mountain peak 🌊🌊🌊🌊🌊,Coastrunner5555,"@SenatorHagerty January 1st the Inflation reduction act starts, you voted against it. Getting ready to take credit… https://t.co/o46yQ0qsjA",1999,Thailand & Oregon,Sun Dec 11 12:48:50 +0000 2022,"January 1st the Inflation reduction act starts, you voted against it. Getting ready to take credit…",False,False,True,False,List(SenatorHagerty),List(),0.3612,2,neutral,2.0,january st the reduction act starts you voted against it getting ready to take credit


### 2.2 Base Transformer Pipeline

In [0]:
# Feature Transformers
regex_replacer = RegexTransformer(input_col='tweet',output_col='tweet_clean',regex_rules=regex_list,f_trim=True,f_lower=True) #pass regex_list, apply lowercase and trim
tokenizer = Tokenizer(inputCol="tweet_clean", outputCol="tokens") #Create tokenizer - breakdown text into "building blocks" to be usedin NLP algos - separate by words
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_noStopWords") # Remove stopwords from the token list,= no significance in meaning added to sentence
countvec = CountVectorizer(vocabSize=2**16, inputCol="tokens_noStopWords", outputCol='countvec') #term-frequency: 2^16 is arbritary large number = how many unique words should be considered; Create count vectorizer function
idf = IDF(inputCol='countvec', outputCol="1gram_idf", minDocFreq=5) #Inverse-Document Frequency - minDocFreq: remove sparse terms appearing less than 5 times troughout all the tweets
ngram = NGram(n=2, inputCol="tokens_noStopWords", outputCol="2gram") #n-gram two word combinations with more meaning than single words
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000) #Hasing Token-Frequency
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) #Inverse-Document Frequency for 2-gram

#Feature Assembly
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_idf"], outputCol="rawFeatures") #Vector Assembler (1gram + 2gram inverse doc freq.)

# Feature selection from 1-gram and 2-gram token frequencies
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Transformation Pipeline: staged process for feature processing
t_pipeline = Pipeline(stages=[regex_replacer,tokenizer, stopword_remover, countvec, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector])

# Warning: chi2selector is depracated - should update to univariate selecton:
    # https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.UnivariateFeatureSelector.html
    # Additional information: https://medium.com/insiderengineering/a-deep-dive-into-sparks-univariate-feature-selector-3f8b726d7d32

In [0]:
# Fit the transformation pipeline on full input dataset
t_pipeline_model = t_pipeline.fit(tweets_sparknlp_in) 

**Note**: In this case the df which contains cleaned text from the first notebook is passed to the pipeline, as it contains spark-nlp sentiment. However, with the custom transformer a raw file directly from kinesis firehose could be passed. Another similar custom transformer could be built to read the csv's with the given schema as necessary.

In [0]:
# Transform full input dataset
tweets_transformed = t_pipeline_model.transform(tweets_sparknlp_in)

In [0]:
# Cache for proper model processing - otherwise may encouter errors due to partitions not being cached properly
tweets_transformed.cache()

Out[231]: DataFrame[id: string, name: string, username: string, tweet: string, followers_count: string, location: string, created_at: string, text: string, F_retweet: boolean, F_reply: boolean, F_mentions: boolean, F_hastag: boolean, mentions: array<string>, hastags: array<string>, vader_score: float, vader_label: int, snlp_sentiment: string, label: double, tweet_clean: string, tokens: array<string>, tokens_noStopWords: array<string>, countvec: vector, 1gram_idf: vector, 2gram: array<string>, 2gram_tf: vector, 2gram_idf: vector, rawFeatures: vector, features: vector]

## 3.0 Clustering on Inflation: Latent Dirichlet Allocation (LDA)

In [0]:
# Section Imports
from pyspark.ml.clustering import LDA #Latent Dirichlet allocation (LDA)

### 3.1 Assemble Model

NLP text clustering using LDA = topic modelling. <br>
Terms:
- Word = basic unit of text data index from a vocabulary set with V elements: {1,...,V}
- Document = sequence of N words, w = (w_1,w_2,…w_N) ,w_n is the nth word in document
- Corpus = sequence of M documents, D = (w_1, w_2,…w_M)
- Topic = distribution of words, with LDA treating each document in the corpus as mixture of topics
How LDA works: 3-level heirarchical bayesian model
- Estimates two distribution sets in the corpus: 
    - word distribution of each topic(θ); which follows Dirichlet distribution
    - topic distribution over corpus (Z)
- Latent variables (θ & Z) are defined in the text generation process
- Probability distribution of P(θ_d), P(Z_dn|θ_d), and P(w_dn|z_dn) are defined and the joint distribution, P(w,z,θ), is calculated <br>
Advantages:
- Can find latent topics in the document; which can be interpretable
- Unsupervised = no label required
- Easy to train, can retain word representation <br>
Disadvantages:
- Document considered as bag of words; word order and semantic information (multiple meanings) ignored
- Topic # = fixed; can be uncorrelated and the topic distribution cannot capture correlations
- Static (no topic evolution over time)
<br>
Source & More Info: <br>
https://towardsdatascience.com/nlp-with-lda-latent-dirichlet-allocation-and-text-clustering-to-improve-classification-97688c23d98 <Br>
https://spark.apache.org/docs/latest/ml-clustering.html#latent-dirichlet-allocation-lda <Br>
https://towardsdatascience.com/the-ultimate-guide-to-clustering-algorithms-and-topic-modeling-3a65129df324 <Br>
https://algotech.netlify.app/blog/topic-modeling-lda/

In [0]:
num_topics = 3 #clusters/topics to find
max_iter = 100 #max iterations when solving
# Initialize model
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='features')

In [0]:
# LDA Clustering Pipeline
# Transformation and LDA pipelines can be serialized as shown below, however in the interest of trying out different clustering methods while saving on computational resource
            # ...the transformation will be conducted first for use with individual models
#lda_pipeline = Pipeline(stages = [t_pipeline,lda])

In [0]:
# Use full (no retweet) dataset 
lda_model = lda.fit(tweets_transformed) #Fit LDA
lda_predictions = lda_model.transform(tweets_transformed) #Transform dataset

### 3.2 Examine Clusters

In [0]:
topics = lda_model.describeTopics(20)

In [0]:
print("The topics described by their top-weighted terms:")
display(topics)

The topics described by their top-weighted terms:


topic,termIndices,termWeights
0,"List(4, 2, 18, 3, 26, 32, 6, 49, 50, 19, 36, 24, 83, 15, 139, 1, 31, 117, 89, 132)","List(0.005846886899727253, 0.005096082800551022, 0.004465117519054199, 0.004380880214778705, 0.004068506499630339, 0.003844323395323409, 0.0036341756932993575, 0.00342718293593999, 0.0029339125850387578, 0.0027865838982767087, 0.002697377844338694, 0.002600879761320283, 0.0025529439530112374, 0.0025364193304770587, 0.0024851924815063008, 0.0023402488684397364, 0.0022373770505605094, 0.002224548014304238, 0.0022096026675938966, 0.0021426316860981335)"
1,"List(11, 7, 8, 61, 87, 35, 12, 33, 20, 15610, 5, 59, 3, 17, 77, 2, 15, 22, 1, 25)","List(0.004170247413011157, 0.003917039723699574, 0.0028936540486130743, 0.0026920242721077247, 0.002482179088532996, 0.0023015622039913738, 0.0022499278260956286, 0.0021433294684464962, 0.0021227559945068385, 0.0021038763049586275, 0.002063595732091153, 0.001989414542991744, 0.0019697387886922577, 0.0018698032678819156, 0.0018246649393008159, 0.0018128700980733546, 0.0016760996988064516, 0.0016701252076600371, 0.001669600847372976, 0.0016505074735931019)"
2,"List(0, 10, 5, 13, 1, 9020, 9, 28, 21, 54, 12, 6, 46, 41, 14, 22, 56, 23, 25, 16)","List(0.005221296930167718, 0.004648823655731621, 0.004043792312040511, 0.00391809979113012, 0.003425698436400405, 0.003308358470452767, 0.0032868988032956794, 0.002667748544974632, 0.0024664740566832798, 0.002373710447215099, 0.0023693601651697776, 0.0022077551828351485, 0.0021917972203519855, 0.002187266708097168, 0.0021760241895008524, 0.002163048999203492, 0.0020700339354297837, 0.002031630282487057, 0.0020211088721167367, 0.0020006851766095264)"


In [0]:
# Check Pipeline
t_pipeline.getStages()

Out[235]: [RegexTransformer_7404c06b5351,
 Tokenizer_ad5bf333b61e,
 StopWordsRemover_da4ec91e5150,
 CountVectorizer_8d59930611ca,
 IDF_9d03a6495ea5,
 NGram_41216a41377c,
 HashingTF_dcf805755c66,
 IDF_fcc5da8e5405,
 VectorAssembler_7a2d79c0596e,
 ChiSqSelector_1072864799f2]

In [0]:
# Connect the topics from the LDA model with the vocabulary from the count vectorization model 
vocab = t_pipeline_model.stages[3].vocabulary #access a speific stage of a pipeline - access fitted model's contents
# Convert word indices to word in the vocabulary
def tokens_vocab(token_list,vocab):
    '''
    Given a token_list (str list of tokenized words from text input) and vocabulary (str list) from count vectorized model
    Will link vocabulary to tokens to produce words associated with topics of a clustering model
    '''
    return [vocab[token_id] for token_id in token_list]
# Create SQL sql udf to attach function
udf_tokens_vocab = spsql.udf(lambda row: tokens_vocab(row,vocab), ArrayType(StringType()))

In [0]:
num_top_words = 20
topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_tokens_vocab(spsql.col('termIndices')))
display(topics.select('topic', 'topicWords'))

topic,topicWords
0,"List(rate, fed, november, us, cpi, data, year, even, energy, price, go, market, really, interest, work, high, lower, every, joe, consumer)"
1,"List(money, people, rates, wages, bank, good, like, know, get, trav, biden, many, us, re, global, fed, interest, government, high, m)"
2,"List(prices, gas, biden, pay, high, bitcoins, still, years, going, new, like, year, world, much, &, government, need, one, m, economy)"


### 3.3 Topic Assignment & Evaluation

In [0]:
# Function to assign topic to each tweet from it's 'topicDistribution' column prediction output:
def assign_topic(prediction):
    """
    Takes clustering topic distribution structure an assigns prevalent topic to the text row
    Input: df of prediction from clustering model
    Output: same df with additional column for prevalent topic
    """
    # Exract values of the topic distribution array, sort ascending, get topic with highest probability assignment
    udf_assign_topic = udf(lambda x: int(x.values.argsort()[::-1][0]), IntegerType()) 
    return prediction.select('*', udf_assign_topic('topicDistribution').alias("topic")) #UDF to apply to df
# Call function
lda_predictions = assign_topic(lda_predictions)

In [0]:
# Evaluation
# Compute Silhouette measurement using squared euclidean distance
evaluator = ClusteringEvaluator(predictionCol='topic',featuresCol='features',distanceMeasure='squaredEuclidean')

In [0]:
# Evaluation
# Compute Silhouette measurement using squared euclidean distance
evaluator.evaluate(lda_predictions)

Out[240]: -0.01603141631116885

The silhouette measurement with the squared euclidean distance for evaluating clustering = measure of consistency within clusters = [-1,1] <br>
A value clsoe to 1 means that the points in a cluster are close to other points in the same cluster = far from points of other clusters <br>
The value is close to 0, meaning there isn't great distinction between the clusters but there it could also be substanially worse.
More info: https://towardsdatascience.com/silhouette-coefficient-validating-clustering-techniques-e976bb81d10c

## 4.0 Clustering: Gaussian Mixture Model (GMM)

In [0]:
# Section Imports
from pyspark.ml.clustering import GaussianMixture #Latent Dirichlet allocation (LDA)

**Note**: An attempt was made to use a GMM model for topic modelling but there appears to be a memory error, which is strange considering this notebook was ran on a cluster with 1 leader and 2 workers, each with 4 cores and 32gb memory.

### 4.1 Assemble Model

more info: <Br>
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.GaussianMixture.html <br>
https://towardsdatascience.com/gaussian-mixture-models-d13a5e915c8e

In [0]:
num_topics = 3 #number of clusters/topics
max_iter = 50 #max iterations when solving
# Initialize model
gmm = GaussianMixture(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='features')

In [0]:
tweets_transformed.repartition(1).cache()

Out[243]: DataFrame[id: string, name: string, username: string, tweet: string, followers_count: string, location: string, created_at: string, text: string, F_retweet: boolean, F_reply: boolean, F_mentions: boolean, F_hastag: boolean, mentions: array<string>, hastags: array<string>, vader_score: float, vader_label: int, snlp_sentiment: string, label: double, tweet_clean: string, tokens: array<string>, tokens_noStopWords: array<string>, countvec: vector, 1gram_idf: vector, 2gram: array<string>, 2gram_tf: vector, 2gram_idf: vector, rawFeatures: vector, features: vector]

In [0]:
# Use full (no retweet) dataset 
gmm_model = gmm.fit(tweets_transformed) #Fit GMM
#gmm_predictions = gmm_model.transform(tweets_transformed) #Transform dataset

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-4071721119883758>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m# Use full (no retweet) dataset[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mgmm_model[0m [0;34m=[0m [0mgmm[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtweets_transformed[0m[0;34m)[0m [0;31m#Fit GMM[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m [0;31m#gmm_predictions = gmm_model.transform(tweets_transformed) #Transform dataset[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m             [0mcall_succeeded[0m [0;34m=[0m [0;32mFalse[0m[0;34m[0m[0;34m[0m[0m
[1;32m     29[0m             [0;32mtry[0m[0;34m:[0m[0;34

-----------------------------------------------------------

## Appendix

### A.1 Mount S3 Bucket to DataBricks

In [0]:
# Function to mount S3 buckets (function provided by WCD with additional markups)
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
    '''
    mounts a public S3 bucket to specified mount folder on databricks root; provided with AWS access key and secret access key
    Input: 
        access_key & secret_key = AWS keys
        bucket_name = S3 bucket name (public if not on account associated with access & secret key)
        mount_folder = mount destination on databricks root
    Result: mounted S3 bucket
    '''
    ACCESS_KEY_ID = access_key
    SECRET_ACCESS_KEY = secret_key
    ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F") #need to replace / in secret access key
  
    print ("Mounting", bucket_name) 

    try:
        # Unmount the data in case it was already mounted
        dbutils.fs.unmount("/mnt/%s" % mount_folder)

    except:
        # If it fails to unmount it most likely wasn't mounted in the first place
        print ("Directory not unmounted: ", mount_folder)

    finally:
        # Mount the bucket
        dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder) # specify access key and encoded secret key appended & bucket name
        print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

In [0]:
# Set AWS programmatic access credentials
ACCESS_KEY = "YOUR-KEY-HERE"
SECRET_ACCESS_KEY = "YOUR-KEY-HERE"

In [0]:
# Generic file path for the inflation tweets
filePath = '/mnt/data/Inflation/*/*/*/*/*'

In [0]:
#mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'weclouddata/twitter', 'data')

Mounting weclouddata/twitter
Directory not unmounted:  data
The bucket weclouddata/twitter was mounted to data 



Use shell command to view the folder contents: fs <br>
ls to list the contents <br>
the mounting procedure above does not actually tell whether the files successfully mounted  <br>

In [0]:
%fs ls /mnt/data/

path,name,size,modificationTime
dbfs:/mnt/my_bucket/twitterInflation_ML_in.parquet/,twitterInflation_ML_in.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_clean.parquet/,twitterInflation_clean.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_features.parquet/,twitterInflation_features.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_idf.parquet/,twitterInflation_idf.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_raw.csv/,twitterInflation_raw.csv/,0,0
dbfs:/mnt/my_bucket/twitterInflation_sparknlp.parquet/,twitterInflation_sparknlp.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_sparknlp_in.parquet/,twitterInflation_sparknlp_in.parquet/,0,0
dbfs:/mnt/my_bucket/twitterInflation_vader.parquet/,twitterInflation_vader.parquet/,0,0


In [0]:
%fs ls /mnt/data/Inflation/2022/12/13/04

path,name,size,modificationTime
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-01-48-4036b9c8-e5d1-347d-b308-d6aa94266c33,topic4-4-2022-12-13-04-01-48-4036b9c8-e5d1-347d-b308-d6aa94266c33,55130,1670904411000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-06-44-1aa2293d-5d96-3827-9a43-0db7cd5e241c,topic4-4-2022-12-13-04-06-44-1aa2293d-5d96-3827-9a43-0db7cd5e241c,59822,1670904707000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-11-35-34c365b2-ae8f-3984-a857-04e9b0232f4d,topic4-4-2022-12-13-04-11-35-34c365b2-ae8f-3984-a857-04e9b0232f4d,56508,1670904997000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-16-26-80afb111-b06d-3a67-9ec9-8f9e5e155e61,topic4-4-2022-12-13-04-16-26-80afb111-b06d-3a67-9ec9-8f9e5e155e61,54113,1670905288000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-21-18-1c4ee9c6-c435-3b5a-a575-bd8bf8a6ef6f,topic4-4-2022-12-13-04-21-18-1c4ee9c6-c435-3b5a-a575-bd8bf8a6ef6f,60136,1670905580000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-26-10-916868a7-4a51-37c2-b541-f96bc7f1deab,topic4-4-2022-12-13-04-26-10-916868a7-4a51-37c2-b541-f96bc7f1deab,63126,1670905873000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-31-05-29707c29-a9f6-34ff-bc0b-2cf6da3a7658,topic4-4-2022-12-13-04-31-05-29707c29-a9f6-34ff-bc0b-2cf6da3a7658,65068,1670906168000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-36-04-2d5f83b2-263d-3f44-9d05-379c34e829c0,topic4-4-2022-12-13-04-36-04-2d5f83b2-263d-3f44-9d05-379c34e829c0,55612,1670906468000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-40-53-ae061a20-10b8-3d6d-bb41-cc7e1d712770,topic4-4-2022-12-13-04-40-53-ae061a20-10b8-3d6d-bb41-cc7e1d712770,58877,1670906755000
dbfs:/mnt/data/Inflation/2022/12/13/04/topic4-4-2022-12-13-04-45-43-7037df98-8f65-39d8-83c6-31859d5720bd,topic4-4-2022-12-13-04-45-43-7037df98-8f65-39d8-83c6-31859d5720bd,57634,1670907046000
