Sample LDA model in pyspark. Clusters data into topics and maps the words associated with each topic.

In [2]:
dbutils.library.installPyPI('nltk', '3.4.3')
dbutils.library.installPyPI('xlrd')

In [3]:
import nltk
import re
from operator import attrgetter
from string import punctuation
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.sql.types import StringType, ArrayType, FloatType, IntegerType
from pyspark.sql.functions import array, concat, concat_ws, split, lit, col, array_max, array_position
from pyspark.ml.clustering import LDA

nltk.download('stopwords')
stopwords = nltk.corpus.stopwords.words('english')
ps = nltk.PorterStemmer()

In [4]:
# Text Parser {

def strip_digits(strval):
    return re.sub(r'\d+','',strval) 

def strip_punctuation(strval):
    punc_free = ''.join(' ' if char in punctuation else char
                        for char in strval)
    return punc_free
  
def tokenize_text(text):
    '''Tokenizes strings, removes stopwords and stems.'''
    
    digit_free = strip_digits(text)
    punc_free = strip_punctuation(digit_free)
    tokens = re.split('\W+', punc_free)
    result = list(
        filter(
            None, 
            [ps.stem(word).lower() for word in tokens if word not in stopwords]
        )
    )
    return result

# } Topic Extractor {

def indices_to_terms(vocabulary):
    def indices_to_terms(index, count):
        return [vocabulary[int(x)] for x in index][:count]
    return udf(indices_to_terms, ArrayType(StringType()))

# Create udf functions
StringArrayType = ArrayType(StringType(), False)
tokenize_udf = udf(tokenize_text, StringArrayType)
to_array = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
array_index = udf(lambda x,y: [i for i, e in enumerate(x) if e==y ][0])

In [5]:
df = pd.read_excel('http://help.incites.clarivate.com/incitesLiveESI/10678-TRS/version/default/part/AttachmentData/data/ESIMasterJournalList-122018.xlsx')

df = spark.createDataFrame(df)

In [6]:
# display(df)

In [7]:
# Tokenize Text
df_token = df.select('Full title').withColumn("text_data", tokenize_udf('Full title'))

# Make tf-idf vectorizer
cv = CountVectorizer(
    inputCol="text_data",
    outputCol="count_vec", minDF=1
)
# transform the data
cvModel = cv.fit(df_token)
featurizedData = cvModel.transform(df_token)
idf = IDF(inputCol="count_vec", outputCol="features")

idfModel = idf.fit(featurizedData)
df_feature = idfModel.transform(featurizedData)

In [8]:
# display(df_feature)

In [9]:
# Run the LDA model
lda = LDA(k=20, seed=1)
model = lda.fit(df_feature)

In [10]:
# display(model.describeTopics(maxTermsPerTopic = 15))

In [11]:
# Map the term indices to the topic vocabulary (numbers to words).
df_terms = model.describeTopics(maxTermsPerTopic = 15).withColumn(
    "topic_words", indices_to_terms(cvModel.vocabulary)("termIndices", lit(5)))

In [12]:
# display(df_terms)

In [13]:
# Map the topic number to the highest probability 
df_model = model.transform(df_feature).withColumn('test', to_array('topicDistribution')).withColumn('topic', array_index('test', array_max('test')))

In [14]:
# display(df_model)

In [15]:
display(df_model.join(df_terms, ['topic']).select('topic', 'Full title', 'topic_words'))

topic,Full title,topic_words
19,2D Materials,"List(appli, econom, journal, inform, of)"
11,3 Biotech,"List(environ, work, associ, disord, ac)"
19,3D Printing and Additive Manufacturing,"List(appli, econom, journal, inform, of)"
1,4OR-A Quarterly Journal of Operations Research,"List(acta, pharmaceut, design, famili, e)"
5,AAPG BULLETIN,"List(scienc, review, nurs, plant, of)"
10,AAPS Journal,"List(method, pharmacolog, conserv, china, univers)"
10,AAPS PHARMSCITECH,"List(method, pharmacolog, conserv, china, univers)"
15,AATCC Journal of Research,"List(intern, studi, law, languag, polit)"
15,AATCC REVIEW,"List(intern, studi, law, languag, polit)"
7,Abacus-A Journal of Accounting Finance and Business Studies,"List(de, revista, water, busi, therapeut)"
