Used tutorial https://github.com/maobedkova/TopicModelling_PySpark_SparkNLP/blob/master/Topic_Modelling_with_PySpark_and_Spark_NLP.ipynb for reference

In [1]:
import sparknlp
from sparknlp.annotator import Tokenizer, PerceptronModel
from sparknlp.base import DocumentAssembler
from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.clustering import LDA
from pyspark.sql import types as T

spark = sparknlp.start()

24/11/10 16:18:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
og_data = spark.read.csv("../data_topicmodel.csv", header= True).select(["id", "cleaned_text"])

In [3]:
#Remove sample
og_data.show(5)
sample_data = og_data.sample(0.001)

+-----+--------------------+
|   id|        cleaned_text|
+-----+--------------------+
|hk5r2|i had an appointm...|
|iqimz|i created this si...|
|pfzt5|hello everyone  i...|
|pk714|i grew up with bo...|
|q0q8x|i have to ask whe...|
+-----+--------------------+
only showing top 5 rows



In [4]:
documentAssembler = DocumentAssembler()\
     .setInputCol("cleaned_text")\
     .setOutputCol('document')

In [5]:
tokenizer = Tokenizer() \
            .setInputCols(['document'])\
            .setOutputCol('tokenized')

In [6]:
normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') 

In [7]:
english = [
    "a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "as", "at", "be", 
    "because", "been", "before", "being", "below", "between", "both", "but", "by", "can", "cannot", "could", "did", 
    "do", "does", "doing", "down", "during", "each", "few", "for", "from", "further", "had", "has", "have", "having", 
    "he", "her", "here", "hers", "herself", "him", "himself", "his", "how", "i", "if", "in", "into", "is", "it", 
    "its", "itself", "let", "me", "more", "most", "must", "my", "myself", "no", "nor", "not", "of", "off", "on", 
    "once", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own", "same", "she", "some", "such", 
    "than", "that", "the", "their", "theirs", "them", "themselves", "then", "there", "these", "they", "this", "those", 
    "through", "to", "too", "under", "until", "up", "very", "was", "we", "were", "what", "when", "where", "which", 
    "while", "who", "whom", "why", "with", "would", "you", "your", "yours", "yourself", "yourselves", "will", "ll", 
    "re", "ve", "d", "s", "m", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", 
    "s", "t", "u", "v", "w", "x", "y", "z", "many", "us", "ok", "hows", "ive", "ill", "im", "cant", "topics", "topic",
    "discuss", "thoughts", "yo", "thats", "whats", "lets", "nothing", "oh", "omg", 
         "things", "stuff", "yall", "haha", "yes", "no", "wo", "like", 'good', 
         'work', 'got', 'going', 'dont', 'really', 'want', 'make', 'think', 
         'know', 'feel', 'people', 'life', "getting", "lot" "great", "i", "me", 
         "my", "myself", "we", "our", "ours", "ourselves", 
        "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", 
        "himself", "she", "her", "hers", "herself", "it", "its", "itself", 
        "they", "them", "their", "theirs","themselves", "what", "which", "who", 
        "whom", "this", "that", "these", "those", "am", "is", "are", "was", 
        "were", "be", "been", "being", "have", "has", "had", "having", "do", 
        "does", "did", "doing", "will", "would", "should", "can", "could", "may",
        "might", "must", "shall", "ought", "about", "above", "across", "after", 
        "against", "along", "amid", "among", "around", "as", "at", "before", "behind",
        "below", "beneath", "beside", "between", "beyond", "but", "by", 
        "concerning", "considering", "despite", "down", "during", "except", "for",
        "from", "in", "inside", "into", "like", "near", "next", "notwithstanding",
        "of", "off", "on", "onto", "opposite", "out", "outside", "over", "past",
        "regarding", "round", "since", "than", "through", "throughout", "till", 
        "to", "toward", "towards", "under", "underneath", "unlike", "until", "up",
        "upon", "versus", "via", "with", "within", "without", "cant", "cannot", 
        "couldve", "couldnt", "didnt", "doesnt", "dont", "hadnt", "hasnt", 
        "havent", "hed", "hell", "hes", "howd", "howll", "hows", "id", "ill", 
        "im", "ive", "isnt", "itd", "itll", "its", "lets", "mightve", "mustve", 
        "mustnt", "shant", "shed", "shell", "shes", "shouldve", "shouldnt", 
        "thatll", "thats", "thered", "therell", "therere", "theres", "theyd", 
        "theyll", "theyre", "theyve", "wed", "well", "were", "weve", "werent", 
        "whatd", "whatll", "whatre", "whats", "whatve", "whend", "whenll", 
        "whens", "whered", "wherell", "wheres", "whichd", "whichll", "whichre", 
        "whichs", "whod", "wholl", "whore", "whos", "whove", "whyd", "whyll", 
        "whys", "wont", "wouldve", "wouldnt", "youd", "youll", "youre", "youve",
        "f", "m", "because", "go", "lot", "get", "still", "way", "something", "much",
        "thing", "someone", "person", "anything", "goes", "ok", "so", "just", "mostly", 
        "put", "also", "lots", "yet"]

time = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", 
        "sunday", "morning", "noon", "afternoon", "evening", "night", "midnight",
        "dawn", "dusk", "week", "weekend", "weekends","weekly", "today", 
        "yesterday", "tomorrow", "yesterdays", "todays", "mondays", "tuesdays",
        "wednesdays", "thursdays", "fridays", "saturdays", "sundays", "day",
        "everyday", "daily", "workday", 'time', 'month', 'year', 'pm', 'am', "ago",
        "year"]

reddit = ["welcome", "hi", "hello", "sub", "reddit", "thanks", "thank", "maybe",
          "wo30", "mods", "mod", "moderators", "subreddit", "btw", "aw", "aww", 
          "aww", "hey", "hello", "join", "joined", "post", "rselfimprovement"]

topic_specific = ["self", "improvement", "change", "action",
    'change', 'start', 'goal', 'habit', 'new', 'old', 
    'care', 'world', 'everyone', 'love', 'u', 'right', 'mean', 'matter',
    'best', 'step', 'focus', 'hard', 'small',
    'bad', 'help', 'time', 'problem', 'issue', 'advice',
    'bit', 'experience', 'different',
    'point', 'situation', 'negative', 'control', 'positive',
    'use', 'question', 'idea', 'amp', 'medium', 'hour', 'day', 'minute',
    'aaaaloot']

stopwords = english + time + reddit + topic_specific

from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['normalized']) \
     .setOutputCol('unigrams') \
     .setStopWords(stopwords)

In [8]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(['normalized']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [9]:
pos = PerceptronModel.load("/project/macs40123/spark-jars/pos_anc_en_3.0.0_3.0_1614962126490/")\
      .setInputCols("document", "normalized")\
      .setOutputCol("pos")

                                                                                

In [10]:
from sparknlp.base import Finisher

finisher = Finisher().setInputCols(['unigrams', 'ngrams', 'pos'])

In [11]:
my_pipeline = Pipeline(
      stages = [
          documentAssembler,
          tokenizer,
          normalizer,
          stopwords_cleaner,
          ngrammer,
          pos,
          finisher
      ])

In [12]:
pipelineModel = my_pipeline.fit(sample_data)
processed_data = pipelineModel.transform(sample_data)
processed_data.show(1)



+------+--------------------+--------------------+--------------------+--------------------+
|    id|        cleaned_text|   finished_unigrams|     finished_ngrams|        finished_pos|
+------+--------------------+--------------------+--------------------+--------------------+
|19iv7g|i feel great when...|[great, office, r...|[i, feel, great, ...|[NNP, VBP, JJ, WR...|
+------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



In [13]:
processed_data.columns

['id', 'cleaned_text', 'finished_unigrams', 'finished_ngrams', 'finished_pos']

In [14]:
#data.select('document').show(5, truncate = 100)

In [15]:
#data.select('tokenized').show(1, truncate = 100)

In [16]:
#data.select('normalized').show(1, truncate = 100)

In [17]:
#data.select('lemmatized').show(1, truncate = 1000)

In [18]:
processed_data.select('finished_unigrams').show(10, truncate = 100)

+----------------------------------------------------------------------------------------------------+
|                                                                                   finished_unigrams|
+----------------------------------------------------------------------------------------------------+
|[great, office, rarely, distracted, productive, useful, company, usually, hours, working, spent, ...|
|[dive, straight, need, learn, moderation, usually, tend, completely, overdo, always, quite, usefu...|
|[years, male, liked, girl, now, started, interested, impression, coming, recently, felt, complete...|
|[room, home, cause, chronical, pain, find, depressed, felling, terribly, lonely, made, wonder, ar...|
|[actually, due, current, circumstances, trying, draw, though, seem, skill, give, satisfaction, fo...|
|[log, th, september, current, oi, boys, back, another, log, completely, biked, now, decided, rewa...|
|[long, lurker, seeking, understand, feedback, repeatedly, given, several

Create pos-tags n-grams that correspond to words n-grams

In [19]:
#Merge POS tags as just one string to be able to take it as a document in the Spark NLP Pipeline
from pyspark.sql import functions as F
pos_as_string = F.udf(lambda x: ' '.join(x), T.StringType())
processed_data = processed_data.withColumn('finished_pos', pos_as_string(F.col('finished_pos')))

In [20]:
processed_data.show(3)

[Stage 8:>                                                          (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+--------------------+
|    id|        cleaned_text|   finished_unigrams|     finished_ngrams|        finished_pos|
+------+--------------------+--------------------+--------------------+--------------------+
|19iv7g|i feel great when...|[great, office, r...|[i, feel, great, ...|NNP VBP JJ WRB NN...|
|38n8kl|hello all  ill ju...|[dive, straight, ...|[hello, all, ill,...|UH DT JJ RB JJ NN...|
|4ueopy|hey im 19 years o...|[years, male, lik...|[hey, im, years, ...|UH NN NNS JJ CC D...|
+------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



                                                                                

New pipeline for pos-tags

In [21]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')

pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')
     
    
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams']) \

pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,                  
                 pos_tokenizer,
                 pos_ngrammer,  
                 pos_finisher])

processed_data = pos_pipeline.fit(processed_data).transform(processed_data)

In [22]:
processed_data.columns

['id',
 'cleaned_text',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams']

In [23]:
processed_data.show(1)

[Stage 9:>                                                          (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|        cleaned_text|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|19iv7g|i feel great when...|[great, office, r...|[i, feel, great, ...|[NNP, VBP, JJ, WR...|[NNP, VBP, JJ, WR...|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



                                                                                

In [24]:
def filter_unigrams(finished_unigrams, finished_pos):
    '''Filters individual words based on their POS tag'''
    return [word for word, pos in zip(finished_unigrams, finished_pos)
            if pos in ['JJ', 'NN', 'NNS', 'NNPS', 'VB']]

udf_filter_unigrams = F.udf(filter_unigrams, T.ArrayType(T.StringType()))

In [25]:
processed_data = processed_data.withColumn('filtered_unigrams_by_pos', udf_filter_unigrams(
                                                   F.col('finished_unigrams'),
                                                   F.col('finished_pos')))

processed_data.show(5)

[Stage 10:>                                                         (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+
|    id|        cleaned_text|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|filtered_unigrams_by_pos|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+
|19iv7g|i feel great when...|[great, office, r...|[i, feel, great, ...|[NNP, VBP, JJ, WR...|[NNP, VBP, JJ, WR...|    [rarely, producti...|
|38n8kl|hello all  ill ju...|[dive, straight, ...|[hello, all, ill,...|[UH, DT, JJ, RB, ...|[UH, DT, JJ, RB, ...|    [need, moderation...|
|4ueopy|hey im 19 years o...|[years, male, lik...|[hey, im, years, ...|[UH, NN, NNS, JJ,...|[UH, NN, NNS, JJ,...|    [male, liked, gir...|
|9a0lsx|most of my time i...|[room, home, caus...|[most, of, my, ti...|[JJS, IN, PRP$, N...|[JJS, IN, PRP$, N...|    [chronical, pain,...|
|9cehdm|i cant do things ..

                                                                                

In [26]:
def filter_pos_ngrams(finished_ngrams, finished_pos_tags):
    return [word for word, pos in zip(finished_ngrams, finished_pos_tags) 
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]
    
udf_filter_pos_ngrams = F.udf(filter_pos_ngrams, T.ArrayType(T.StringType()))

In [27]:
processed_data = processed_data.withColumn('filtered_ngrams_by_pos',
                       udf_filter_pos_ngrams(F.col('finished_ngrams'),
                                             F.col('finished_pos_ngrams')))

In [28]:
processed_data.show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+----------------------+
|    id|        cleaned_text|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|filtered_unigrams_by_pos|filtered_ngrams_by_pos|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+----------------------+
|19iv7g|i feel great when...|[great, office, r...|[i, feel, great, ...|[NNP, VBP, JJ, WR...|[NNP, VBP, JJ, WR...|    [rarely, producti...|  [feel_great, do_t...|
|38n8kl|hello all  ill ju...|[dive, straight, ...|[hello, all, ill,...|[UH, DT, JJ, RB, ...|[UH, DT, JJ, RB, ...|    [need, moderation...|  [dive_straight, d...|
|4ueopy|hey im 19 years o...|[years, male, lik...|[hey, im, years, ...|[UH, NN, NNS, JJ,...|[UH, NN, NNS, JJ,...|    [male, liked, gir...|  [im_years, years_...|
|9a0lsx|most of my time i...

                                                                                

In [29]:
processed_data.select('id','filtered_ngrams_by_pos').show(10, truncate=100)

[Stage 12:>                                                         (0 + 1) / 1]

+------+----------------------------------------------------------------------------------------------------+
|    id|                                                                              filtered_ngrams_by_pos|
+------+----------------------------------------------------------------------------------------------------+
|19iv7g|[feel_great, do_today, hours_im, moocs_im, watch_movies, free_time, avoid_distractions, reddit_lt...|
|38n8kl|[dive_straight, do_things, overdo_things, bad_thing, other_aspects, drunk_coffee, first_americano...|
|4ueopy|[im_years, years_old, male_ive, other_way, meet_sometime, hours_albeit, cuff_quick, quick_quip, d...|
|9a0lsx|[time_im, home_cause, chronical_pain, wonder_arnt, same_situation, learn_feeling, feeling_good, b...|
|9cehdm|[do_things, current_circumstances, circumstances_im, get_angry, cant_do_things, current_circumsta...|
|9ert24|[current_time, am_oi, oi_boys, boys_im, past_week, elon_musk, rogan_podcast, bet_elon, elon_musk,...|
|9vyu3n|[r

                                                                                

In [30]:
processed_data.select('id','filtered_unigrams_by_pos').show(10, truncate=100)

[Stage 13:>                                                         (0 + 1) / 1]

+------+----------------------------------------------------------------------------------------------------+
|    id|                                                                            filtered_unigrams_by_pos|
+------+----------------------------------------------------------------------------------------------------+
|19iv7g|      [rarely, productive, usually, basically, movie, hours, enrolled, watch, dissolving, fix, free]|
|38n8kl|[need, moderation, usually, useful, example, aspects, type, coffee, half, now, never, chewed, mon...|
|4ueopy|[male, liked, girl, interested, impression, felt, finally, wanted, one, hours, talk, least, quip,...|
|9a0lsx|                                    [chronical, pain, felling, lonely, made, actually, course, even]|
|9cehdm|                                      [due, circumstances, skill, satisfaction, force, angry, worth]|
|9ert24|[log, september, boys, back, log, completely, biked, working, chilling, smoking, rogan, barely, r...|
|9vyu3n|[l

                                                                                

In [31]:
processed_data.columns

['id',
 'cleaned_text',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams',
 'filtered_unigrams_by_pos',
 'filtered_ngrams_by_pos']

In [50]:
#Now that POS was done, lemmatization makes more sense at this point

#Merge tokens as just one string to be able to take it as a document in the new Pipeline
from pyspark.sql import functions as F
tokens_as_string = F.udf(lambda x: ' '.join(x), T.StringType())
processed_data = processed_data.withColumn('joined_tokens', tokens_as_string(F.col('filtered_unigrams_by_pos')))

last_documentAssembler = DocumentAssembler() \
     .setInputCol('joined_tokens') \
     .setOutputCol('joined_document')

last_tokenizer = Tokenizer() \
     .setInputCols(['joined_document']) \
     .setOutputCol('tokenized')
     
lemmatizer = LemmatizerModel.load("../models/lemma_ewt_en_3.4.3_3.0_1651416655397/")\
      .setInputCols("tokenized")\
      .setOutputCol("lemmatized")

#Delete these tokens that remained from the lemmatizer model
space = ["_"]

last_stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('cleaned_unigrams') \
     .setStopWords(space)

last_finisher = Finisher() \
     .setInputCols(['cleaned_unigrams']) \

last_pipeline = Pipeline() \
     .setStages([last_documentAssembler,                  
                 last_tokenizer,
                 lemmatizer,
                 last_stopwords_cleaner,
                 last_finisher])

final_data = last_pipeline.fit(processed_data).transform(processed_data)


Create one column merging unigrams and ngrams

In [52]:
from pyspark.sql.functions import concat
final_data = final_data.withColumn('final', concat(F.col('finished_cleaned_unigrams'), \
                                                   F.col('filtered_ngrams_by_pos')))\
                                                   .select('id','cleaned_text','final')
                                                                                                                          

In [53]:
final_data.select('final').show(50, truncate=100)

[Stage 37:>                                                         (0 + 2) / 2]

+----------------------------------------------------------------------------------------------------+
|                                                                                               final|
+----------------------------------------------------------------------------------------------------+
|[rarely, productive, usually, basically, movie, hour, enrol, watch, dissolving, fix, free, feel_g...|
|[need, moderation, usually, useful, example, aspect, type, coffee, half, now, never, chew, month,...|
|[male, like, girl, interested, impression, feel, finally, want, one, hour, talk, least, quip, sna...|
|[chronical, pain, felling, lonely, made, actually, course, even, time_im, home_cause, chronical_p...|
|[due, circumstance, skill, satisfaction, force, angry, worth, do_things, current_circumstances, c...|
|[log, september, back, log, completely, biked, working, chill, smoke, rogan, barely, reward, achi...|
|[lurker, seek, understand, feedback, several, stop, several, year, run, 

                                                                                