# LDA Topic Modelling of r/wallstreetbets Posts

The below code was adapted from https://github.com/maobedkova/TopicModelling_PySpark_SparkNLP/blob/master/Topic_Modelling_with_PySpark_and_Spark_NLP.ipynb

## Step 1: Install and Import Packages

In [2]:
sc.install_pypi_package("boto3==1.23.10")
sc.install_pypi_package("pandas==1.0.3")
sc.install_pypi_package("nltk")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3==1.23.10
  Downloading https://files.pythonhosted.org/packages/75/ca/d917b244919f1ebf96f7bbd5a00e4641f7e9191b0d070258f5dc10f5eaad/boto3-1.23.10-py3-none-any.whl (132kB)
Collecting s3transfer<0.6.0,>=0.5.0 (from boto3==1.23.10)
  Downloading https://files.pythonhosted.org/packages/7b/9c/f51775ebe7df5a7aa4e7c79ed671bde94e154bd968aca8d65bb24aba0c8c/s3transfer-0.5.2-py3-none-any.whl (79kB)
Collecting botocore<1.27.0,>=1.26.10 (from boto3==1.23.10)
  Downloading https://files.pythonhosted.org/packages/09/b8/794e0bd260198538ded90c26b353ddb632eab01950d4e7e2e2b8ee510d12/botocore-1.26.10-py3-none-any.whl (8.8MB)
Installing collected packages: botocore, s3transfer, boto3
  Found existing installation: botocore 1.27.2
    Not uninstalling botocore at /usr/local/lib/python3.7/site-packages, outside environment /tmp/1654223709646-0
  Found existing installation: s3transfer 0.6.0
    Not uninstalling s3transfer at /usr/local/lib/python3.7/site-packages, outside environment /tmp/1654

In [3]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import *
import boto3
import io
import pandas as pd
import pyspark.sql.functions as f
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer
from sparknlp.annotator import Normalizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.annotator import StopWordsCleaner
from sparknlp.annotator import NGramGenerator
from sparknlp.annotator import PerceptronModel
from sparknlp.base import Finisher
import nltk
from nltk.corpus import stopwords
from pyspark.sql import types as t
from pyspark.sql.functions import concat
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.clustering import LDA

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 2: Read Data and Perform Regex Operations

In [22]:
data = spark.read.csv("s3://thiyaghessan-wsb/wsb_batch_2.csv",
                      header = True,
                      multiLine=True)
data = data[["text"]]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Remove unneccessary symbols and characters
data = data.withColumn('text',f.regexp_replace('text', r"http\S+", '')) \
           .withColumn('text',f.regexp_replace('text', r"[^a-zA-Z0-9]+", ' ')) \
           .withColumn('text',f.regexp_replace('text', r"@[^\s]+", '')) \
           .withColumn('text',f.regexp_replace('text', r"^\d+\s|\s\d+\s|\s\d+$", ' ')) \
           .withColumn('text',f.regexp_replace('text', r"\s+[a-zA-Z]\s+", '')) \
           .withColumn('text',f.regexp_replace('text', r"\s+", ' '))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                text|
+--------------------+
|I donusually shar...|
| Historical Post ...|
|Been buying this ...|
|What do you guys ...|
|Elon Musk played ...|
|I ll keep this br...|
|Icurrently using ...|
|Get em all boys a...|
|I can probably on...|
|Not invested befo...|
|Taylor Morrison H...|
| If you agree pos...|
|My boyfriend has ...|
|          amp x200B |
|Ascent Solar Tech...|
|Helloknow that so...|
|I got this becaus...|
|Material needed b...|
| Never thoughtd s...|
|Why risking with ...|
+--------------------+
only showing top 20 rows

# Step 3: Pre-processing Pipeline

In [25]:
eng_stopwords = ["amp", "xb", "yesfiller", "text", "i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In this pipeline I lemmatize, remove stopwards, create unigrams and trigrams, and perform POS tagging. I use a pre-trained neural network to identify POS tags.

In [45]:
# First round of pre-processing
documentAssembler = DocumentAssembler() \
     .setInputCol("text") \
     .setOutputCol('document')
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')
normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)
ngrammer = NGramGenerator() \
    .setInputCols(['unigrams']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')
pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'unigrams']) \
    .setOutputCol('pos')
finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos'])
pipeline = Pipeline() \
     .setStages([documentAssembler,                  
                 tokenizer,
                 normalizer,                  
                 lemmatizer,                  
                 stopwords_cleaner, 
                 pos_tagger,
                 ngrammer,  
                 finisher])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]

In [46]:
processed_data = pipeline.fit(data).transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
processed_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|I donusually shar...|[donusually, shar...|[donusually, shar...|[RB, NN, JJ, NN, ...|
| Historical Post ...|[historical, post...|[historical, post...|[JJ, NN, NNS, VBP...|
|Been buying this ...|[buy, thing, ever...|[buy, thing, ever...|[VB, NN, RB, IN, ...|
|What do you guys ...|[guy, think, ptn,...|[guy, think, ptn,...|[NN, VBP, NN, NN,...|
|Elon Musk played ...|[elon, musk, play...|[elon, musk, play...|[NN, NN, NN, NN, ...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

I then join all pos tags together to identify POS trigrams

In [48]:
pos_joiner = f.udf(lambda x: ' '.join(x), t.StringType())
processed_data  = processed_data.withColumn('finished_pos', pos_joiner(f.col('finished_pos')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 4: Pre-Processing Pipeline 2

In this pipeline, I identify POS trigrams to have a better sense of sentence structure.

In [49]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')
pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')
pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams'])
pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,                  
                 pos_tokenizer,
                 pos_ngrammer,  
                 pos_finisher])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
pos_processed_data = pos_pipeline.fit(processed_data).transform(processed_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
pos_processed_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|I donusually shar...|[donusually, shar...|[donusually, shar...|[RB, NN, JJ, NN, ...|[RB, NN, JJ, NN, ...|
| Historical Post ...|[historical, post...|[historical, post...|[JJ, NN, NNS, VBP...|[JJ, NN, NNS, VBP...|
|Been buying this ...|[buy, thing, ever...|[buy, thing, ever...|[VB, NN, RB, IN, ...|[VB, NN, RB, IN, ...|
|What do you guys ...|[guy, think, ptn,...|[guy, think, ptn,...|[NN, VBP, NN, NN,...|[NN, VBP, NN, NN,...|
|Elon Musk played ...|[elon, musk, play...|[elon, musk, play...|[NN, NN, NN, NN, ...|[NN, NN, NN, NN, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

I then filter out uninformaive POS tags and POS combinations

In [52]:
# only select relevant individual POS tags
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ['JJ', 'NN', 'NNS', 'VB', 'VBP']]

remove_pos = f.udf(filter_pos, t.ArrayType(t.StringType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
pos_processed_data = pos_processed_data.withColumn('filtered_unigrams',
                                                    remove_pos(f.col('finished_unigrams'), 
                                                               f.col('finished_pos')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# only select informative POS tag combinations
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]
    
remove_pos_combs = f.udf(filter_pos_combs, t.ArrayType(t.StringType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
pos_processed_data = pos_processed_data.withColumn('filtered_ngrams',
                                                   remove_pos_combs(f.col('finished_ngrams'),
                                                                    f.col('finished_pos_ngrams')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
pos_processed_data = pos_processed_data.withColumn('final', 
                                                   concat(f.col('filtered_unigrams'), 
                                                          f.col('filtered_ngrams')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 5: Tf-Idf Vectorization and Topic Modelling

In [57]:
tfizer = CountVectorizer(inputCol='final',
                         outputCol='tf_features')
idfizer = IDF(inputCol='tf_features',
              outputCol='tf_idf_features')
tfidf_pipeline = Pipeline() \
     .setStages([tfizer,
                 idfizer])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
tfidf_processed_data = tfidf_pipeline.fit(pos_processed_data).transform(pos_processed_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
tfidf_processed_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|   filtered_unigrams|     filtered_ngrams|               final|         tf_features|     tf_idf_features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|I donusually shar...|[donusually, shar...|[donusually, shar...|[RB, NN, JJ, NN, ...|[RB, NN, JJ, NN, ...|[share, secret, w...|[share_secret, se...|[share, secret, w...|(262144,[0,4,8,10...|(262144,[0,4,8,10...|
| Historical Post ...|[historical, post...|[historical, post...|[JJ, NN, NNS, VBP...|[JJ, NN, NNS, VBP...|[historical, post...|[historical_post,...|[his

In [60]:
# Fit topic model for 10 topics
lda = LDA(k = 10,
          maxIter = 10,
          featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_processed_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 6: View Words Most Strongly Associated with each Topic

In [61]:
tf_model = tfizer.fit(pos_processed_data)
vocab = tf_model.vocabulary

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
get_words = f.udf(get_words, t.ArrayType(t.StringType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', get_words(f.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------------------------------------------------------------------------------+
|topic|                                                                                topicWords|
+-----+------------------------------------------------------------------------------------------+
|    0|                        [remove, share, market, stock, go, company, price, short, m, year]|
|    1|                          [buy, price, stock, money, company, make, get, go, year, option]|
|    2|                    [price, oil, gt_click, week, resistance, trade, ema, stock, spy, prev]|
|    3|[discussion, notify, fund_gt, expect_ipo, share_nasdaq, go_public_expect, expect_ipo_da...|
|    4|                                   [zim, east, apt, coast, salt, bark, meme, eur, gt, teu]|
|    5|                     [vaccine, clinical, y, option, market, stock, look, health, k, think]|
|    6|                       [elon, stem, anybody, buy, uranium, shib, inc, stock, buy_buy, get]|
|    7|   