# LDA using PySpark

In [1]:
from pyspark.sql.functions import instr,col

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark_LDA").enableHiveSupport().getOrCreate()

In [2]:
spark

In [3]:
# Repartition to 5 paritions - coalesce(5)
df = spark.sql("select * from cust_complains.cust_complaints_all_csv_raw")
df.printSchema()

root
 |-- date_received: string (nullable = true)
 |-- product: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- consumer_complaint_narrative: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- timely_response: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- complaint_id: string (nullable = true)



In [14]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import CountVectorizer, IDF,RegexTokenizer, Tokenizer

from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
import re
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

In [4]:
df.createOrReplaceTempView("ComplaintData")
df.select(['consumer_complaint_narrative','company_public_response']).show(5, False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
from pyspark.sql.functions import monotonically_increasing_id

corpus =df.select('consumer_complaint_narrative')
corpus_df = corpus.withColumn("id", monotonically_increasing_id())

In [7]:
corpus_df.show()

+----------------------------+---+
|consumer_complaint_narrative| id|
+----------------------------+---+
|        Upon looking at m...|  0|
|        Upon opening up a...|  1|
|        I XXXX XXXX has g...|  2|
|        An account in my ...|  3|
|        I have a loan wit...|  4|
|        Febuary XXXX - XX...|  5|
|        Alert from XXXX o...|  6|
|        Back in XX/XX/201...|  7|
|        On XX/XX/XXXX, I ...|  8|
|        Experian intentio...|  9|
|        About six months ...| 10|
|        Banks website mal...| 11|
|        I called the coll...| 12|
|        On XX/XX/XXXX, I ...| 13|
|        This company is v...| 14|
|        XXXX XXXX alleges...| 15|
|        When reviewing my...| 16|
|        Today : XX/XX/201...| 17|
|        I reached out to ...| 18|
|        There are many mi...| 19|
+----------------------------+---+
only showing top 20 rows



In [10]:
tokenizer = Tokenizer(inputCol="consumer_complaint_narrative", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())

regexTokenizer = RegexTokenizer(inputCol="consumer_complaint_narrative", 
                                outputCol="words",pattern="\\w+", gaps=False)

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("consumer_complaint_narrative", "words") \
    .withColumn("tokens", countTokens(col("words"))).show()

+----------------------------+--------------------+------+
|consumer_complaint_narrative|               words|tokens|
+----------------------------+--------------------+------+
|        Upon looking at m...|[upon, looking, a...|    63|
|        Upon opening up a...|[upon, opening, u...|   179|
|        I XXXX XXXX has g...|[i, xxxx, xxxx, h...|   216|
|        An account in my ...|[an, account, in,...|    39|
|        I have a loan wit...|[i, have, a, loan...|    90|
|        Febuary XXXX - XX...|[febuary, xxxx, x...|    78|
|        Alert from XXXX o...|[alert, from, xxx...|    55|
|        Back in XX/XX/201...|[back, in, xx, xx...|   328|
|        On XX/XX/XXXX, I ...|[on, xx, xx, xxxx...|   105|
|        Experian intentio...|[experian, intent...|    91|
|        About six months ...|[about, six, mont...|    73|
|        Banks website mal...|[banks, website, ...|    13|
|        I called the coll...|[i, called, the, ...|   180|
|        On XX/XX/XXXX, I ...|[on, xx, xx, xxxx...|   61

In [16]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = ["x","xx","xxx","xxxx","xxxxx","xxxxxx","xxxxxxx","mr","mrs","00","oh","one","also"]

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

+----------------------------+---+--------------------+--------------------+
|consumer_complaint_narrative| id|               words|            filtered|
+----------------------------+---+--------------------+--------------------+
|        Upon looking at m...|  0|[upon, looking, a...|[upon, looking, c...|
|        Upon opening up a...|  1|[upon, opening, u...|[upon, opening, w...|
|        I XXXX XXXX has g...|  2|[i, xxxx, xxxx, h...|[xxxx, xxxx, got,...|
|        An account in my ...|  3|[an, account, in,...|[account, credit,...|
|        I have a loan wit...|  4|[i, have, a, loan...|[loan, bank, amer...|
+----------------------------+---+--------------------+--------------------+
only showing top 5 rows

+----------------------------+---+--------------------+--------------------+--------------------+
|consumer_complaint_narrative| id|               words|            filtered|       filtered_more|
+----------------------------+---+--------------------+--------------------+----------

In [17]:
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

+--------------------+--------------------+---+
|       filtered_more|            features| id|
+--------------------+--------------------+---+
|[upon, looking, c...|(10000,[0,2,6,8,1...|  0|
|[upon, opening, w...|(10000,[0,1,3,10,...|  1|
|[got, contact, tw...|(10000,[0,1,2,3,7...|  2|
|[account, credit,...|(10000,[0,1,2,13,...|  3|
|[loan, bank, amer...|(10000,[0,2,4,5,8...|  4|
+--------------------+--------------------+---+
only showing top 5 rows



In [18]:
countVectors = featurized_df.select('features','id')

## Train LDA model

In [19]:
lda = LDA(k=10, maxIter=10)
model = lda.fit(countVectors)

ll = model.logLikelihood(countVectors)
lp = model.logPerplexity(countVectors)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(countVectors)
transformed.show(truncate=False)

The lower bound on the log likelihood of the entire corpus: -8657642.026401002
The upper bound on perplexity: 7.458171790353653
The topics described by their top-weighted terms:
+-----+--------------+--------------------------------------------------------------------+
|topic|termIndices   |termWeights                                                         |
+-----+--------------+--------------------------------------------------------------------+
|0    |[0, 1, 4]     |[0.020185206405448037, 0.0186687411770416, 0.008985970914710156]    |
|1    |[2, 0, 82]    |[0.003970772391742825, 0.0038309009920069353, 0.0036029520317694495]|
|2    |[0, 1, 2]     |[0.003963178934989447, 0.0038974800560549246, 0.00235463783726663]  |
|3    |[0, 2, 3]     |[0.029344246029582546, 0.02212993975454568, 0.011351373867695603]   |
|4    |[1, 0, 2]     |[0.004577031505987845, 0.002359274330132775, 0.0016428745552727584] |
|5    |[11, 104, 21] |[0.017133051165292307, 0.012095843754325498, 0.01177870365641559

### Display words for top 10 topics 

In [20]:
topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")

topic:  0
----------
credit
account
payment
report
information
loan
received
debt
company
told
----------
topic:  1
----------
report
credit
without
appear
many
mistakes
understanding
account
loan
debt
----------
topic:  2
----------
credit
account
report
loan
verifying
unverified
time
payments
2018
reporting
----------
topic:  3
----------
credit
report
information
inquiry
identity
inquiries
please
accounts
remove
theft
----------
topic:  4
----------
account
credit
report
diversified
information
insurance
company
consultants
rental
called
----------
topic:  5
----------
card
chase
said
bank
account
called
money
told
back
credit
----------
topic:  6
----------
account
bonus
credit
money
told
said
received
days
pay
report
----------
topic:  7
----------
consumer
wells
fargo
loan
information
bank
account
paypal
us
mortgage
----------
topic:  8
----------
account
bank
credit
2017
company
insurance
late
escrow
mortgage
received
----------
topic:  9
----------
credit
loan
never
debt
compan