<a href="https://colab.research.google.com/github/kartikarizqin/mtsamples-analysis/blob/main/lda_sparknlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Topic Modeling (LDA) with SparkNLP**

Reference: https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-a99d063f1a6e (with adjustment in some lines)

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.5

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.4.5

# Install nltk
! pip install nltk

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
Collecting pyspark==2.4.5
  Using cached pyspark-2.4.5-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached py4j-0.10.7-py2.py3-none-any.whl (197 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
Collecting spark-nlp==2.4.5
  Using cached spark_nlp-2.4.5-py2.py3-none-any.whl (110 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.4.5


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import sparknlp

spark = sparknlp.start()

In [4]:
from pyspark.sql import functions as F

In [5]:
data_path = '/content/drive/MyDrive/psy_cleaned.csv'
data = spark.read.csv(data_path, header=True)

In [6]:
data.columns

['text']

In [7]:
text_col = 'text'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

In [8]:
review_text.limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                      text|
+------------------------------------------------------------------------------------------+
|adjustment disorder encopresis patient referred due concerns regarding behavioral actin...|
|agitation er visit acute episode agitation complaining felt might poisoned care facilit...|
|asperger disorder school reports continuing difficulties repetitive questioning obsessi...|
|attempted suicide consult patient year old caucasian male attempted suicide trying jump...|
|bipolar affective disorder consult patient manic disorder presently psychotic flight id...|
+------------------------------------------------------------------------------------------+



In [9]:
from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

In [10]:
from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

In [11]:
from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

In [12]:
from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [13]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [14]:
from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

In [15]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(['lemmatized']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [16]:
from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'lemmatized']) \
    .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


In [17]:
from sparknlp.base import Finisher

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos']) \

In [18]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([documentAssembler,                  
                 tokenizer,
                 normalizer,                  
                 lemmatizer,                  
                 stopwords_cleaner, 
                 pos_tagger,
                 ngrammer,  
                 finisher])

In [19]:
processed_review = pipeline.fit(review_text).transform(review_text)

In [20]:
processed_review.limit(5).show()

+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|adjustment disord...|[adjustment, diso...|[adjustment, diso...|[NN, NN, NN, NN, ...|
|agitation er visi...|[agitation, er, v...|[agitation, er, v...|[NN, UH, NN, JJ, ...|
|asperger disorder...|[asperger, disord...|[asperger, disord...|[NN, NN, NN, NN, ...|
|attempted suicide...|[attempt, suicide...|[attempt, suicide...|[NN, NN, NN, NN, ...|
|bipolar affective...|[bipolar, affecti...|[bipolar, affecti...|[NN, JJ, NN, NN, ...|
+--------------------+--------------------+--------------------+--------------------+



In [21]:
from pyspark.sql import types as T

udf_join_arr = F.udf(lambda x: ' '.join(x), T.StringType())
processed_review  = processed_review.withColumn('finished_pos', udf_join_arr(F.col('finished_pos')))

In [22]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')

In [23]:
pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')

In [24]:
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [25]:
pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams']) \

In [26]:
pos_pipeline = Pipeline().setStages([pos_documentAssembler, pos_tokenizer, pos_ngrammer, pos_finisher])

In [27]:
processed_review = pos_pipeline.fit(processed_review).transform(processed_review)

In [28]:
processed_review.columns

['text',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams']

In [29]:
processed_review.select('finished_ngrams', 'finished_pos_ngrams').limit(5).show()

+--------------------+--------------------+
|     finished_ngrams| finished_pos_ngrams|
+--------------------+--------------------+
|[adjustment, diso...|[NN, NN, NN, NN, ...|
|[agitation, er, v...|[NN, UH, NN, JJ, ...|
|[asperger, disord...|[NN, NN, NN, NN, ...|
|[attempt, suicide...|[NN, NN, NN, NN, ...|
|[bipolar, affecti...|[NN, JJ, NN, NN, ...|
+--------------------+--------------------+



In [30]:
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ['JJ', 'NN', 'NNS', 'VB', 'VBP']]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

In [31]:
processed_review = processed_review.withColumn('filtered_unigrams',
                                               udf_filter_pos(F.col('finished_unigrams'), 
                                                              F.col('finished_pos')))

In [32]:
processed_review.select('filtered_unigrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                         filtered_unigrams|
+------------------------------------------------------------------------------------------+
|[adjustment, disorder, encopresis, patient, refer, due, concern, regard, behavioral, ac...|
|[agitation, visit, acute, episode, agitation, complain, feel, poison, care, facility, c...|
|[asperger, disorder, school, report, continue, difficulty, repetitive, question, obsess...|
|[attempt, suicide, consult, patient, year, old, caucasian, male, attempt, suicide, try,...|
|[bipolar, affective, disorder, consult, patient, manic, disorder, psychotic, flight, id...|
+------------------------------------------------------------------------------------------+



In [33]:
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]
    
udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

In [34]:
processed_review = processed_review.withColumn('filtered_ngrams',
                                               udf_filter_pos_combs(F.col('finished_ngrams'),
                                                                    F.col('finished_pos_ngrams')))

In [35]:
processed_review.select('filtered_ngrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                           filtered_ngrams|
+------------------------------------------------------------------------------------------+
|[adjustment_disorder, disorder_encopresis, encopresis_patient, refer_due, due_concern, ...|
|[visit_acute, acute_episode, episode_agitation, agitation_complain, poison_care, care_f...|
|[asperger_disorder, disorder_school, school_report, continue_difficulty, difficulty_rep...|
|[attempt_suicide, suicide_consult, consult_patient, patient_year, year_old, old_caucasi...|
|[bipolar_affective, affective_disorder, disorder_consult, consult_patient, patient_mani...|
+------------------------------------------------------------------------------------------+



In [36]:
from pyspark.sql.functions import concat

processed_review = processed_review.withColumn('final', 
                                               concat(F.col('filtered_unigrams'), 
                                                      F.col('filtered_ngrams')))

In [37]:
processed_review.select('final').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                     final|
+------------------------------------------------------------------------------------------+
|[adjustment, disorder, encopresis, patient, refer, due, concern, regard, behavioral, ac...|
|[agitation, visit, acute, episode, agitation, complain, feel, poison, care, facility, c...|
|[asperger, disorder, school, report, continue, difficulty, repetitive, question, obsess...|
|[attempt, suicide, consult, patient, year, old, caucasian, male, attempt, suicide, try,...|
|[bipolar, affective, disorder, consult, patient, manic, disorder, psychotic, flight, id...|
+------------------------------------------------------------------------------------------+



In [38]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol='final', outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

In [39]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [40]:
from pyspark.ml.clustering import LDA

num_topics = 6
max_iter = 10

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

In [41]:
vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [42]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

+-----+------------------------------------------------------------------------------------------+
|topic|                                                                                topicWords|
+-----+------------------------------------------------------------------------------------------+
|    0|[mg_p, schizoaffective, schizoaffective_disorder, mg, p, pain, huntington, unable, with...|
|    1|[posttraumatic, posttraumatic_stress, performance, stress, stress_disorder, test, postt...|
|    2|[evidence, state, excessive, poor, suicide, report, wife, employee, disease, major_depr...|
|    3|[p, craniotomy, mg_p, status_change, unresponsive, q, mental_status_change, q_h, h, inf...|
|    4|[patient_state, therapist, state, husband, anger, borderline_personality, personality_d...|
|    5|[migraine, dizziness, headache, syncope, neuropsychological, migraine_symptom, neuropat...|
+-----+------------------------------------------------------------------------------------------+

