## CIS5560: PySpark LDA Text Analysis for "R" answers and Tags analysis

Tested in Python 2 with Spark 2.1

## Text Analysis using Latent Dirichlet Allocation (LDA)

### Importing Spark SQL and Spark ML Libraries
First, importing the libraries needed:

In [3]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id

import re

### Load Source Data
Now load the Answers data into a DataFrame. This data consists of Information about the Questions, Tags, Date, Is Accepted, and comments from user.

In [5]:
IS_DSX = False

# IBM DSX: pixiedust for visualization
if IS_DSX:
  from pixiedust.display import *

### Installing pixiedust for visualization

In [7]:
!pip install pixiedust

#### Create a table: R__Answers_LDA with R_Answers.csv_ file
##### all colums are of Integer except (IsAcceptedAnswer: String, Body: String, Tag: String)

In [9]:

if IS_DSX == False:
  df_data_answers = sqlContext.sql("select OwnerUserId,Year,Month,Day,CreationDate,ParentId,Score,IsAcceptedAnswer,Body from r_answers_lda_csv")
  df_data_ans_clean = df_data_answers.dropna()
  df_data_ans_clean.show(5)


In [10]:
if IS_DSX == False:
  df_data_tags = sqlContext.sql("select * from r_tags_csv")
  df_data_tags_clean = df_data_tags.dropna()
  df_data_tags_clean.show(5)

In [11]:
joined = df_data_ans_clean.join(df_data_tags_clean,df_data_ans_clean.ParentId == df_data_tags_clean.Id )
joined.show(5)

In [12]:
rawdata = joined

#Show rawdata (as DataFrame)
rawdata.show(5)

In [13]:
# Print data types
for type in rawdata.dtypes:
    print type


## Text Pre-processing:
###   
1. Removing common words (with stoplist)
1. Handling punctuation
1. lowcase/upcase
1. Stemming
1. Part-of-Speech Tagging (nouns, verbs, adj, etc.)

In [15]:
def cleanup_text(record):
    text  = record[8]
    uid   = record[9]
    words = text.split()
    
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['<p>']
    stopwords = stopwords_core + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]    
    return text_out



## Cleaning Text

In [17]:
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))
clean_text.show(5)

## Generate TFIDF and Vectorize it

In [19]:
# Term Frequency Vectorization  - Option 1 (Using hashingTF): 
'''hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(clean_text)
'''
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)


### Term Frequency (TF)
### Inverse Document Frequency (IDF)

In [21]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

## LDA Clustering 
### Generate 25 Data-Driven Topics:

In [23]:
# Generate 25 Data-Driven Topics:
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
#ldatopics.show(25)

In [24]:
ldatopics.show(25)

### LDA Clustering - Find Data-driven Topics

In [26]:


def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)


In [27]:
ldaResults = ldamodel.transform(rescaledData)

ldaResults.select('OwnerUserId','Year','Month','Day','CreationDate','Score','IsAcceptedAnswer','Tag','words','features','topicDistribution').show()

### Breakout LDA Topics for Modeling and Reporting

In [29]:
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())

# Extract document weights for Topics 12 and 20
enrichedData = ldaResults                                                                   \
        .withColumn("Topic_12", udf_breakout_array(lit(12), ldaResults.topicDistribution))  \
        .withColumn("topic_20", udf_breakout_array(lit(20), ldaResults.topicDistribution))            

enrichedData.select('OwnerUserId','Year','Month','Day','CreationDate','Score','IsAcceptedAnswer','Tag','words','features','topicDistribution','Topic_12','Topic_20').show()

enrichedData.agg(max("Topic_12")).show()

In [30]:
enrichedData.createOrReplaceTempView("enrichedData")

In [31]:
topics = enrichedData.select('OwnerUserId','Year','Month','Day', 'CreationDate', 'Score', 'Score','IsAcceptedAnswer','Tag','Topic_12','Topic_20').sort(desc("CreationDate"))

In [32]:
topics.show(5)

## Line Chart of Tags, IsAcceptedAnswer, and Topics.

In [34]:
# pixiedust needed for IBM DSX
display(topics)

In [35]:
#display(topics)

In [36]:
# Only for Databricks
'''
%sql
SELECT id, airline, date, year_month, rating, topic_12, topic_20 FROM enrichedData where airline = "${item=Delta Air Lines,Delta Air Lines|US Airways|Southwest Airlines|American Airlines|United Airlines}" order by date
'''