# Topic Modeling (LDA) Using PySpark libraries

Topic modeling is a technique for determining the topics in a document. I can also be used t to discover patterns of words in a collection of documents. By analyzing the frequency of words and phrases in the documents, determines the probability of a word or phrase belonging to a certain topic and cluster documents based on their similarity or closeness.

Latent Dirichlet Allocation (LDA) is an unsupervised clustering technique that is commonly used for text analysis. Here, words are represented as topics, and documents are represented as a collection of these word topics.

Since the dataset contains \~3M rows, and using the pyspark NLP libraries, it takes **~5 hours** to run this topic modeling on the entire data. This result will not allow for any kind of filtering based on category/sentiment. 

If the purpose is to be able to dynamically filter the corpus by sentiment and category - since there are 3 sentiments and 4 categories, for each combination a new coprus is generated and LDA model is run.  This entire process takes **~8 hours** to run. Although this reduces the size of the total corpus, but the process is repeated 12 times - which results in increasing the time. Therefore, less number of runs is better. Further, the more the corpus is filtered, the corpus essentially reduces and it will not be able to produce distinct topics. 

***Disclaimer: This time can increase depending on the size of the corpus in each of the filtered dataset.***
 
The output is saved in the table **filteredsparkLDATopics** in following format:

Topic | Topic Terms | Topic Weight | Category | Sentiment |
--- | --- | --- | --- | --- |
3 | term1 | weight1 | Category1 | Positive |
3 | term2 | weight2 | Category1 | Positive |
3 | term3 | weight3 | Category1 | Positive |
...
1 | term1 | weight1 | Category2 | Negative |
1 | term2 | weight2 | Category2 | Negative |

## Import packages

In [0]:
%pip install --upgrade numpy

Python interpreter will be restarted.
Collecting numpy
  Downloading numpy-1.24.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.20.3
    Not uninstalling numpy at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-a70be8a3-bf2b-497d-8470-4531236055ff
    Can't uninstall 'numpy'. No files were found to uninstall.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
petastorm 0.11.4 requires pyspark>=2.1.0, which is not installed.
scipy 1.7.1 requires numpy<1.23.0,>=1.16.5, but you have numpy 1.24.2 which is incompatible.
numba 0.54.1 requires numpy<1.21,>=1.17, but you have numpy 1.24.2 which is incompatible.
mleap 0.20.0 requires scikit-learn<0.23.0,>=0.22.0, but you have scikit-

In [0]:
# spark packages
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, IDF

import re
import numpy as np
from functools import reduce

# Warnings
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

## Load Data
Currently, this is being loaded from DBFS. This can be changed to read directly from ADLS as well, with the correct permissions, or from hive_metastore.

In [0]:
SQL = f'''select * from test_db.resultSentiment'''
df = spark.sql(SQL)

In [0]:
df.show(5)

+-----+-------------------+-------------+---------+-----+--------------------+---------+------+
|Index|               Date|  ProductName| Category|Price|             Content|Sentiment|Rating|
+-----+-------------------+-------------+---------+-----+--------------------+---------+------+
|    6|2022-12-23 03:52:04|ProductName-1|Category2|76.68|Buyer beware. Thi...| Negative|   0.6|
|   18|2023-03-14 16:40:26|ProductName-4|Category2|15.52|i liked this albu...| Positive|   4.1|
|   21|2021-02-14 15:35:50|ProductName-2|Category3|91.25|Problem with char...| Negative|   0.6|
|   25|2021-10-29 14:49:19|ProductName-1|Category1|10.86|Batteries died wi...| Positive|   4.1|
|   28|2020-08-28 19:38:12|ProductName-4|Category3|61.41|Excellent choice ...| Positive|   4.7|
+-----+-------------------+-------------+---------+-----+--------------------+---------+------+
only showing top 5 rows



## Processing text columns

In [0]:
@udf("string")
def clean_sentence(sentence):
  
    '''function to clean up the sentence
    - remove punctuations, special characters,
    numbers, additional spaces in netween words,
    and remove any words of length <= 3'''

    sentence = re.sub(r"[^a-z A-Z]", " ", sentence)
    sentence = re.sub(r"/s+", "", sentence)
    sentence = " ".join([ele for ele in sentence.split() if len(ele) >= 3])
    return sentence

In [0]:
# clean up strings which have more than 2 same letters consecutively in a word

df = df.withColumn("Text", clean_sentence(regexp_replace(col("Content"), r"(\w)\1{2}", " ")))

Define the following spark pipelines before topic modeling can be done to tokenize, lemmatize and remove stopwords from the sentences.

In [0]:
documentAssembler = DocumentAssembler() \
    .setInputCol("Text") \
    .setOutputCol("document")

sentence = SentenceDetector() \
    .setInputCols("document") \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

POSTag = PerceptronModel.pretrained() \
    .setInputCols("document", "token") \
    .setOutputCol("pos")

chunker = Chunker() \
    .setInputCols("sentence", "pos") \
    .setOutputCol("chunk") \
    .setRegexParsers(["<NN>", "<NNS>", "<NNP>", "<JJ>", "<ADJ>"])

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(["token"]) \
     .setOutputCol("lemmatized")

stopwordsCleaner = StopWordsCleaner() \
    .setStopWords(StopWordsRemover \
    .loadDefaultStopWords("english")) \
    .setInputCols(["lemmatized"]) \
    .setOutputCol("unigram")

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[ | ][ / ][ — ][OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][ / ][OK!]


In [0]:
pipeline1 = Pipeline() \
            .setStages([
                documentAssembler,
                sentence,
                tokenizer,
                POSTag,
                chunker
            ])

In [0]:
pipelinedf = pipeline1.fit(df).transform(df)

In [0]:
pipelinedf = pipelinedf.select("Index", "Category", "Sentiment", "Text", \
                               col("chunk.result").alias("chunked")) \
                .withColumn("chunked", clean_sentence(
                            regexp_replace(clean_sentence(concat_ws(", ", array_distinct(split( \
                                           regexp_replace(concat_ws(", ", col("chunked")), "[^A-za-z0-9]", " "), " ")))), r"\s*[A-Z]\w*\s*", " ")))

In [0]:
pipelinedf = pipelinedf.filter(col("chunked").isNotNull()) \
                        .select("Index", "Category", "Sentiment", "Text", "chunked")

In [0]:
pipeline2 = Pipeline() \
            .setStages([
                documentAssembler,
                sentence,
                tokenizer,
                lemmatizer,
                stopwordsCleaner
            ])

In [0]:
pipelinedf = pipeline2.fit(pipelinedf).transform(pipelinedf)

In [0]:
pipelinedf = pipelinedf.select("Index", "Category", "Sentiment", \
                               "chunked", col("unigram.result").alias("unigrams")) \
                .withColumn("unigrams", clean_sentence(regexp_replace( \
                                          regexp_replace(clean_sentence(concat_ws(", ", \
                                                  array_distinct(split(regexp_replace(concat_ws(", ", \
                                                        col("unigrams")), "[^A-za-z0-9]", " "), " ")))), \
                                             r"\s*[A-Z]\w*\s*", " "), r"(\w)\1{2}", " "))) \
                .withColumn("words", split(col("unigrams"), " ")) 

In [0]:
pipelinedf = pipelinedf.filter((col("words").isNotNull()) ) 

In [0]:
data = pipelinedf.select("Index", "Category", "Sentiment", "words")

In [0]:
@udf(ArrayType(StringType()))
def getTopics(token_list):
    
    '''function to get the terms
    from each of the topics from
    sparkLDA model
    '''
    
    tlist = [vocab[token_id] for token_id in token_list]
    return tlist

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tf_model = tfizer.fit(data)
tf_result = tf_model.transform(data)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

lda = LDA(k=10, maxIter=12, featuresCol='tfidf_features')
lda_model = lda.fit(tfidf_result)

vocab = tf_model.vocabulary

topics = lda_model.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                    .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
topics.show()

+-----+--------------------+--------------------+
|Topic|          TopicTerms|         TermWeights|
+-----+--------------------+--------------------+
|    0|[book, read, writ...|[0.00771053367366...|
|    1|[good, well, use,...|[0.00334198822782...|
|    2|[book, informatio...|[0.00497730365979...|
|    3|[book, beginner, ...|[0.00327018683004...|
|    4|[album, music, go...|[0.00327174389509...|
|    5|[use, work, produ...|[0.00535918291146...|
|    6|[song, album, mus...|[0.00944547395510...|
|    7|[album, like, goo...|[0.00318423199658...|
|    8|[movie, watch, fi...|[0.00991585498429...|
|    9|[book, old, love,...|[0.00372998969436...|
+-----+--------------------+--------------------+



In [0]:
topicsdf = topics.withColumn("ID", monotonically_increasing_id()) \
                    .withColumn("new", arrays_zip("TopicTerms", "TermWeights")) \
                    .withColumn("new", explode("new")).select("ID", "Topic", \
                                    col("new.TopicTerms").alias("TopicTerms"), col("new.TermWeights").alias("TermWeights"))

In [0]:
topicsdf = topicsdf.filter( (col("TopicTerms").isNotNull()) & (col("TopicTerms") != "") )

## Write data to Databricks Hive Metastore

In [0]:
topicsdf.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("test_db.sparkLDATopics")

## Filtering by Category & Sentiment

In [0]:
def union_all(*dfs):
    '''function to union all the separate
    tables, each associated with each sub
    level question'''
    return reduce(DataFrame.union, dfs)

In [0]:
dfpos1 = data.filter((col("Category") == "Category1") & (col("Sentiment") == "Positive"))
dfpos2 = data.filter((col("Category") == "Category2") & (col("Sentiment") == "Positive"))
dfpos3 = data.filter((col("Category") == "Category3") & (col("Sentiment") == "Positive"))
dfpos4 = data.filter((col("Category") == "Category4") & (col("Sentiment") == "Positive"))

dfneg1 = data.filter((col("Category") == "Category1") & (col("Sentiment") == "Negative"))
dfneg2 = data.filter((col("Category") == "Category2") & (col("Sentiment") == "Negative"))
dfneg3 = data.filter((col("Category") == "Category3") & (col("Sentiment") == "Negative"))
dfneg4 = data.filter((col("Category") == "Category4") & (col("Sentiment") == "Negative"))

dfneu1 = data.filter((col("Category") == "Category1") & (col("Sentiment") == "Neutral"))
dfneu2 = data.filter((col("Category") == "Category2") & (col("Sentiment") == "Neutral"))
dfneu3 = data.filter((col("Category") == "Category3") & (col("Sentiment") == "Neutral"))
dfneu4 = data.filter((col("Category") == "Category4") & (col("Sentiment") == "Neutral"))

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfpos1)
tfresult = tfmodel.transform(dfpos1)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

posdftopic1 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfpos2)
tfresult = tfmodel.transform(dfpos2)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

posdftopic2 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfpos3)
tfresult = tfmodel.transform(dfpos3)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

posdftopic3 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfpos4)
tfresult = tfmodel.transform(dfpos4)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

posdftopic4 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneg1)
tfresult = tfmodel.transform(dfneg1)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

negdftopic1 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneg2)
tfresult = tfmodel.transform(dfneg2)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

negdftopic2 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneg3)
tfresult = tfmodel.transform(dfneg3)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

negdftopic3 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneg4)
tfresult = tfmodel.transform(dfneg4)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

negdftopic4 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneu1)
tfresult = tfmodel.transform(dfneu1)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

neudftopic1 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneu2)
tfresult = tfmodel.transform(dfneu2)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

neudftopic2 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneu3)
tfresult = tfmodel.transform(dfneu3)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

neudftopic3 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
tfizer = CountVectorizer(inputCol= "words", outputCol="tf_features")
tfmodel = tfizer.fit(dfneu4)
tfresult = tfmodel.transform(dfneu4)

idfizer = IDF(inputCol='tf_features', outputCol='tfidf_features')
idfmodel = idfizer.fit(tfresult)
tfidfresult = idfmodel.transform(tfresult)

lda = LDA(k=8, maxIter=12, featuresCol='tfidf_features')
ldamodel = lda.fit(tfidfresult)

vocab = tfmodel.vocabulary

neudftopic4 = ldamodel.describeTopics(20).withColumn("TopicTerms", getTopics(col('termIndices'))) \
                .withColumnRenamed("termWeights", "TermWeights").select("Topic", "TopicTerms", "TermWeights")

In [0]:
posdftopic1 = posdftopic1.withColumn("Category", lit("Category1")).withColumn("Sentiment", lit("Positive")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
posdftopic2 = posdftopic2.withColumn("Category", lit("Category2")).withColumn("Sentiment", lit("Positive")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
posdftopic3 = posdftopic3.withColumn("Category", lit("Category3")).withColumn("Sentiment", lit("Positive")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
posdftopic4 = posdftopic4.withColumn("Category", lit("Category4")).withColumn("Sentiment", lit("Positive")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))


negdftopic1 = negdftopic1.withColumn("Category", lit("Category1")).withColumn("Sentiment", lit("Negative")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
negdftopic2 = negdftopic2.withColumn("Category", lit("Category2")).withColumn("Sentiment", lit("Negative")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
negdftopic3 = negdftopic3.withColumn("Category", lit("Category3")).withColumn("Sentiment", lit("Negative")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
negdftopic4 = negdftopic4.withColumn("Category", lit("Category4")).withColumn("Sentiment", lit("Negative")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))


neudftopic1 = neudftopic1.withColumn("Category", lit("Category1")).withColumn("Sentiment", lit("Neutral")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
neudftopic2 = neudftopic2.withColumn("Category", lit("Category2")).withColumn("Sentiment", lit("Neutral")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
neudftopic3 = neudftopic3.withColumn("Category", lit("Category3")).withColumn("Sentiment", lit("Neutral")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))
neudftopic4 = neudftopic4.withColumn("Category", lit("Category4")).withColumn("Sentiment", lit("Neutral")) \
                            .withColumn("NewID", concat(col("Sentiment"), col("Category"), col("Topic")))

In [0]:
pos_dict = {}
pos_dict["df1"] = posdftopic1
pos_dict["df2"] = posdftopic2
pos_dict["df3"] = posdftopic3
pos_dict["df4"] = posdftopic4

neg_dict = {}
neg_dict["df1"] = negdftopic1
neg_dict["df2"] = negdftopic2
neg_dict["df3"] = negdftopic3
neg_dict["df4"] = negdftopic4

neu_dict = {}
neu_dict["df1"] = neudftopic1
neu_dict["df2"] = neudftopic2
neu_dict["df3"] = neudftopic3
neu_dict["df4"] = neudftopic4

In [0]:
filteredDF = union_all(*pos_dict.values()).union(union_all(*neg_dict.values())).union(union_all(*neu_dict.values()))

In [0]:
filteredDF = filteredDF.withColumn("ID", monotonically_increasing_id()) \
                    .withColumn("new", arrays_zip("TopicTerms", "TermWeights")) \
                    .withColumn("new", explode("new")).select("ID", "Category", "Sentiment", "Topic", \
                                    col("new.TopicTerms").alias("TopicTerms"), col("new.TermWeights").alias("TermWeights"))

In [0]:
filteredDF = filteredDF.filter( (col("TopicTerms").isNotNull()) & (col("TopicTerms") != "") )

## Write data to Databricks Hive Metastore

In [0]:
filteredDF.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("test_db.filteredsparkLDATopics")

In [0]:
filteredDF.count()

Out[42]: 1920

## Validation of data/table

In [0]:
%sql
select * from test_db.filteredsparkLDATopics

ID,Category,Sentiment,Topic,TopicTerms,TermWeights
85899345920,Category3,Neutral,0,truly,0.0027495677402951
85899345920,Category3,Neutral,0,song,0.0026096946113538
85899345920,Category3,Neutral,0,toy,0.0026046225400693
85899345920,Category3,Neutral,0,stay,0.002585143476002
85899345920,Category3,Neutral,0,love,0.0024175832701524
85899345920,Category3,Neutral,0,work,0.0023252047714273
85899345920,Category3,Neutral,0,brand,0.0022816413940916
85899345920,Category3,Neutral,0,like,0.002227437637669
85899345920,Category3,Neutral,0,well,0.0022257236602736
85899345920,Category3,Neutral,0,enjoy,0.0021545922312815
