## Topic Modelling using Spark NLP and Spark MLlib

You can download the dataset from the following kaggle link,

https://github.com/ravishchawla/topic_modeling/blob/master/data/abcnews-date-text.csv


We will be using Spark NLP for preprocessing the data. Spark NLP is a NLP library for performing various text proprocessing operations that are required to clean the text. For more information, check out the website. 

https://nlp.johnsnowlabs.com/

We will be using Spark MLlib's LDA model for extracting the topics from the dataset. Let's go ahead and load the data. Before loading the data, lets import all the required libraries and initialize spark session. As I am using single node spark, we need to allocation memory according to the host.

In [1]:
# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

# Start Spark Session with Spark NLP
#spark = sparknlp.start()

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","8G")\
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()

### Load data into Spark Dataframe

In [2]:
# File location and type
file_location = r'E:\Machine Learning\data\abcnews_date_txt.csv'
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Verify the count
df.count()

1041793

In [3]:
# Verify the schema. We used inferschema = True.

df.printSchema

<bound method DataFrame.printSchema of DataFrame[publish_date: int, headline_text: string]>

In [4]:
df.show()

+------------+--------------------+
|publish_date|       headline_text|
+------------+--------------------+
|    20030219|aba decides again...|
|    20030219|act fire witnesse...|
|    20030219|a g calls for inf...|
|    20030219|air nz staff in a...|
|    20030219|air nz strike to ...|
|    20030219|ambitious olsson ...|
|    20030219|antic delighted w...|
|    20030219|aussie qualifier ...|
|    20030219|aust addresses un...|
|    20030219|australia is lock...|
|    20030219|australia to cont...|
|    20030219|barca take record...|
|    20030219|bathhouse plans m...|
|    20030219|big hopes for lau...|
|    20030219|big plan to boost...|
|    20030219|blizzard buries u...|
|    20030219|brigadier dismiss...|
|    20030219|british combat tr...|
|    20030219|bryant leads lake...|
|    20030219|bushfire victims ...|
+------------+--------------------+
only showing top 20 rows



The data looks good and the count is also adequate for the example. Lets go ahead and build our NLP pipeline using Spark NLP,

### NLP Pipeline

In [37]:


# Spark NLp requires the input dataframe or column to be converted to document. 
document_assembler = DocumentAssembler() \
    .setInputCol("headline_text") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")

# Split sentence to tokens(array)
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

# clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

# stem the words to bring them to the root form.
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")

# Finisher is the most important annotator. Spark NLP adds its own structure when we convert each row in the dataframe 
# to document. Finisher helps us to bring back the expected structure viz. array of tokens.
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

# We build a ml pipeline so that each phase can be executed in sequence. This pipeline can also be used to test the model. 
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher])

# train the pipeline
nlp_model = nlp_pipeline.fit(df)

# apply the pipeline to transform dataframe.
processed_df  = nlp_model.transform(df)


tokens_df = processed_df.select('publish_date','tokens').limit(10000)

tokens_df.show()

+------------+--------------------+
|publish_date|              tokens|
+------------+--------------------+
|    20030219|[aba, decid, comm...|
|    20030219|[act, fire, wit, ...|
|    20030219|[g, call, infrast...|
|    20030219|[air, nz, staff, ...|
|    20030219|[air, nz, strike,...|
|    20030219|[ambiti, olsson, ...|
|    20030219|[antic, delight, ...|
|    20030219|[aussi, qualifi, ...|
|    20030219|[aust, address, u...|
|    20030219|[australia, lock,...|
|    20030219|[australia, contr...|
|    20030219|[barca, take, rec...|
|    20030219|[bathhous, plan, ...|
|    20030219|[big, hope, launc...|
|    20030219|[big, plan, boost...|
|    20030219|[blizzard, buri, ...|
|    20030219|[brigadi, dismiss...|
|    20030219|[british, combat,...|
|    20030219|[bryant, lead, la...|
|    20030219|[bushfir, victim,...|
+------------+--------------------+
only showing top 20 rows



In [38]:
tokens_df.count()

10000

In [39]:
from pyspark.ml.feature import CountVectorizer

# LDA model expects a vector of token counts. So we will use Countvectorizer from spark mllib.
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)

In [40]:
cv_model = cv.fit(tokens_df)

In [41]:
vectorized_tokens = cv_model.transform(tokens_df)

### Build LDA Model

In [42]:
from pyspark.ml.clustering import LDA

num_topics = 3

lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)

ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)

print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(vectorized_tokens)
transformed.show(truncate=False)

The lower bound on the log likelihood of the entire corpus: -179294.34056462505
The upper bound on perplexity: 6.319411411413543
The topics described by their top-weighted terms:
+-----+-----------+------------------------------------------------------------------+
|topic|termIndices|termWeights                                                       |
+-----+-----------+------------------------------------------------------------------+
|0    |[4, 2, 16] |[0.02316016397290078, 0.018882193389718523, 0.01638984506775528]  |
|1    |[1, 6, 8]  |[0.023489583320613704, 0.021933736786838697, 0.019705093722386827]|
|2    |[0, 2, 15] |[0.03184761923877096, 0.016924401517207907, 0.016542356800711155] |
+-----+-----------+------------------------------------------------------------------+

+------------+---------------------------------------------------+---------------------------------------------------------------+--------------------------------------------------------------+
|publish_date|tok

## Extracting words to visualize the topics.

In [43]:
vocab = cv_model.vocabulary

topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("----------")
    for word in topic:
       print(word)
    print("----------")

topic: 0
----------
govt
war
protest
iraq
polic
continu
warn
new
world
rain
----------
topic: 1
----------
iraq
plan
sai
council
win
u
claim
report
urg
water
----------
topic: 2
----------
u
war
charg
man
polic
call
fire
take
kill
death
----------


The above pipeline is a basic pipeline but one can custome it according to the need. For e.g. you can add Ngrams and see how the topics change (may get more meaning full topics).