##Topic modeling with Apache Spark and Spark NLP - LDA model on abc news data

Install Java, PySpark, and Spark NLP

In [None]:
import os
# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version
# Install pyspark
! pip install --ignore-installed pyspark==3.5.3 #Updated
# Install Spark NLP
! pip install --ignore-installed spark-nlp==5.3.2 #Updated


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
openjdk version "1.8.0_432"
OpenJDK Runtime Environment (build 1.8.0_432-8u432-ga~us1-0ubuntu2~22.04-ga)
OpenJDK 64-Bit Server VM (build 25.432-bga, mixed mode)
Collecting pyspark==3.5.3
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark==3.5.3)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l

Collecting spark-nlp==5.3.2
  Downloading spark_nlp-5.3.2-py2.py3-none-any.whl.metadata (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.1/57.1 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading spark_nlp-5.3.2-py2.py3-none-any.whl (564 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/565.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m542.7/565.0 kB[0m [31m16.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m565.0/565.0 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-5.3.2


Import the relevant package

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
spark = sparknlp.start()
! python -V
print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)


Python 3.10.12
Spark NLP version:  5.3.2
Apache Spark version:  3.5.3


#LDA model on abc news data

Download the news data

In [None]:
from pathlib import Path
import urllib.request
download_path = "./abcnews-date-text.csv"
if not Path(download_path).is_file():
  print("File Not found will downloading it!")
  url = "https://github.com/ravishchawla/topic_modeling/raw/master/data/abcnews-date-text.csv"
  urllib.request.urlretrieve(url, download_path)
else:
  print("File already present.")


File Not found will downloading it!


Read the Data

In [None]:
# if you are reading file from local storage
file_location = r'./abcnews-date-text.csv'
# if you are reading file from hdfs
# file_location = r'hdfs:\\\user\path\to\abcnews_date_txt.csv'
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
# Verify the count
df.count()


1041793

In [None]:
df.show(10)

+------------+--------------------+
|publish_date|       headline_text|
+------------+--------------------+
|    20030219|aba decides again...|
|    20030219|act fire witnesse...|
|    20030219|a g calls for inf...|
|    20030219|air nz staff in a...|
|    20030219|air nz strike to ...|
|    20030219|ambitious olsson ...|
|    20030219|antic delighted w...|
|    20030219|aussie qualifier ...|
|    20030219|aust addresses un...|
|    20030219|australia is lock...|
+------------+--------------------+
only showing top 10 rows



##Pre-processing Pipeline using Spark NLP

#Document Assembling

Spark NLP requires the input DataFrame or column to be converted to a document.

In [None]:
document_assembler = DocumentAssembler() \
.setInputCol("headline_text") \
.setOutputCol("document") \
.setCleanupMode("shrink")



#Tokenizing

Split sentence to tokens(arraay)

In [None]:
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")


# Normalizing

Clean unwanted characters and garbage

In [None]:
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized")


#Stopwords removal

Remove stopwords

In [None]:
stopwords_cleaner = StopWordsCleaner()\
.setInputCols("normalized")\
.setOutputCol("cleanTokens")\
.setCaseSensitive(False)


#Stemming

Stem the words to bring them to the root form.

In [None]:
stemmer = Stemmer() \
.setInputCols(["cleanTokens"]) \
.setOutputCol("stem")

#Finishing

The finisher is the most important annotator. Spark NLP adds structure when we convert each
row in the DataFrame to a document. Finisher helps us to bring back the expected structure and
an array of tokens

In [None]:
finisher = Finisher() \
.setInputCols(["stem"]) \
.setOutputCols(["tokens"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(False)


#Build the ML Pipeline

We build an ML pipeline so that each phase can be executed in sequence. This pipeline can also
be used to test the model

In [None]:
nlp_pipeline = Pipeline(
stages=[document_assembler,
tokenizer,
normalizer,
stopwords_cleaner,
stemmer,
finisher])
nlp_model = nlp_pipeline.fit(df)
processed_df = nlp_model.transform(df)
tokens_df = processed_df.select('publish_date','tokens').limit(10000)
tokens_df.show()


+------------+--------------------+
|publish_date|              tokens|
+------------+--------------------+
|    20030219|[aba, decid, comm...|
|    20030219|[act, fire, wit, ...|
|    20030219|[g, call, infrast...|
|    20030219|[air, nz, staff, ...|
|    20030219|[air, nz, strike,...|
|    20030219|[ambiti, olsson, ...|
|    20030219|[antic, delight, ...|
|    20030219|[aussi, qualifi, ...|
|    20030219|[aust, address, u...|
|    20030219|[australia, lock,...|
|    20030219|[australia, contr...|
|    20030219|[barca, take, rec...|
|    20030219|[bathhous, plan, ...|
|    20030219|[big, hope, launc...|
|    20030219|[big, plan, boost...|
|    20030219|[blizzard, buri, ...|
|    20030219|[brigadi, dismiss...|
|    20030219|[british, combat,...|
|    20030219|[bryant, lead, la...|
|    20030219|[bushfir, victim,...|
+------------+--------------------+
only showing top 20 rows



#Train and Apply the ML Pipeline

Train and apply the pipeline to transform the dataset. Spark NLP pipeline creates intermediary
columns that we don’t need. So, let’s select the columns that we need

In [None]:
nlp_model = nlp_pipeline.fit(df)
processed_df = nlp_model.transform(df)
tokens_df = processed_df.select('publish_date','tokens').limit(10000)
tokens_df.show()


+------------+--------------------+
|publish_date|              tokens|
+------------+--------------------+
|    20030219|[aba, decid, comm...|
|    20030219|[act, fire, wit, ...|
|    20030219|[g, call, infrast...|
|    20030219|[air, nz, staff, ...|
|    20030219|[air, nz, strike,...|
|    20030219|[ambiti, olsson, ...|
|    20030219|[antic, delight, ...|
|    20030219|[aussi, qualifi, ...|
|    20030219|[aust, address, u...|
|    20030219|[australia, lock,...|
|    20030219|[australia, contr...|
|    20030219|[barca, take, rec...|
|    20030219|[bathhous, plan, ...|
|    20030219|[big, hope, launc...|
|    20030219|[big, plan, boost...|
|    20030219|[blizzard, buri, ...|
|    20030219|[brigadi, dismiss...|
|    20030219|[british, combat,...|
|    20030219|[bryant, lead, la...|
|    20030219|[bushfir, victim,...|
+------------+--------------------+
only showing top 20 rows



#Feature Engineering

We will use Spark MLlib’s CountVectorizer to generate features from textual data. Latent
Dirichlet Allocation requires a data-specific vocabulary to perform topic modeling.

In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features",
vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

# Build the LDA Model

The LDA model requires a minimum of 2 hyperparameters: k (number of topics) and maxIter
(number of iterations). Try different values of k and maxIter to see which combination best suits
your data

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))


The lower bound on the log likelihood of the entire corpus: -179061.72753969542
The upper bound on perplexity: 6.311212728735916


# Visualize the topics

After completing the training, we can view the words that represent each topic using the
following code

In [None]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd\
  .map(lambda row: row['termIndices'])\
  .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
  .collect()
for idx, topic in enumerate(topics_words):
  print("topic: {}".format(idx))
  print("*"*25)
  for word in topic:
    print(word)
print("*"*25)


topic: 0
*************************
man
new
plan
war
take
charg
court
face
australia
mai
topic: 1
*************************
iraq
win
fire
council
water
back
boost
call
fund
mp
topic: 2
*************************
u
polic
war
iraqi
iraq
kill
baghdad
claim
world
warn
*************************



Trying  different values of k and maxIter to see which combination best suits your data in
with atleast five combinations, show their results, and explained why it’s best.


##num_topics = 5  ,maxIter=10

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 5
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -183895.3794197562
The upper bound on perplexity: 6.481579706039624


##num_topics = 7  ,maxIter=20

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 7
lda= LDA(k=num_topics, maxIter=20)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -183902.50900390453
The upper bound on perplexity: 6.4818309954851445


##num_topics = 11  ,maxIter=23

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 11
lda= LDA(k=num_topics, maxIter=23)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -189365.2321814272
The upper bound on perplexity: 6.674370230559256


##num_topics = 15  ,maxIter=25

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 15
lda= LDA(k=num_topics, maxIter=25)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -194546.90333226265
The upper bound on perplexity: 6.857003501066638


##num_topics = 18  ,maxIter=28

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 18
lda= LDA(k=num_topics, maxIter=28)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -198008.73156073922
The upper bound on perplexity: 6.979019158351164


**Higher loglikelihood and lower perplexity represents better model performance.**

Hence, num_topics = 5, maxIter=10 gave better results with the lower bound on the log likelihood of the entire corpus of -183895.3794197562
the upper bound on perplexity 6.481579706039624

==================================THE END=======================================