### Importing Libraries
**************************

First of all we will be importing all the packages.

In [44]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pandas import ExcelWriter
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import pyLDAvis.gensim


import warnings
warnings.filterwarnings('ignore')

import xlrd
import csv
import sys 
import nltk
import xlwt
import gensim
import pandas as pd

from CleanTransformer import CleanTransformer
from StopWordsLoad import StopWords

from functools import reduce

### Creating Spark Context
**********

It holds a connection with Spark cluster manager. All Spark applications run as independent set of processes, coordinated by a SparkContext in a program.

In [45]:
conf = SparkConf().setMaster("local[2]")
sc =SparkContext(conf=conf, appName="LatentDirichletAllocation")
sqlContext = SQLContext(sc)

***************
# 1-DATA LOADING
***************

### Loading Data
**************

1) Loading all the userdefined stopwords and punctuation.<br>
<i>StopWordsLoad can be found in the repository.</i><br>
2) Loading the csv file with '|' as delimiter in a pyspark dataframe 


In [46]:
obj=StopWords()
stopwords=obj.f_load()

csvFile="bbchealth.txt"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter",'|').load(csvFile)
df.show(5)

+------------------+--------------------+--------------------+
|         entity.id|    entity.timestamp|  entity.description|
+------------------+--------------------+--------------------+
|585947808772960257|Wed Apr 08 23:30:...|GP workload harmi...|
|585947807816650752|Wed Apr 08 23:30:...|Short people's 'h...|
|585866060991078401|Wed Apr 08 18:05:...|New approach agai...|
|585794106170839041|Wed Apr 08 13:19:...|Coalition 'underm...|
|585733482413891584|Wed Apr 08 09:18:...|Review of case ag...|
+------------------+--------------------+--------------------+
only showing top 5 rows



### Renaming Column Names
********

In [47]:
#Getting the old column names

oldColumns = df.schema.names
newColumns=[]

for each in df.columns:
    newColumns.append(each.strip().replace('.','_'))
print("The old column names -{} are replaced by \nThe new column names -{}".format(oldColumns,newColumns))

df_renamed= reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df_renamed.show(5)

The old column names -['entity.id', 'entity.timestamp', 'entity.description'] are replaced by 
The new column names -['entity_id', 'entity_timestamp', 'entity_description']
+------------------+--------------------+--------------------+
|         entity_id|    entity_timestamp|  entity_description|
+------------------+--------------------+--------------------+
|585947808772960257|Wed Apr 08 23:30:...|GP workload harmi...|
|585947807816650752|Wed Apr 08 23:30:...|Short people's 'h...|
|585866060991078401|Wed Apr 08 18:05:...|New approach agai...|
|585794106170839041|Wed Apr 08 13:19:...|Coalition 'underm...|
|585733482413891584|Wed Apr 08 09:18:...|Review of case ag...|
+------------------+--------------------+--------------------+
only showing top 5 rows



**************
## 2-DATA CLEANING
**********

### Loading the Transformer
***
1)The transformer is loaded and then the description field is processed when an action is called.<br>
<i>CleanTransformer can be found in the repository.</i><br>
2)Selecting the required columns<br>

In [48]:
clean_transform = CleanTransformer(inputCol="entity_description", outputCol="Cleansed_entity_description",stopwords=stopwords)
df_cleaned = clean_transform.transform(df_renamed)
df_cleaned=df_cleaned.select(col('entity_id'),col('entity_description'),col('Cleansed_entity_description'))
df_cleaned.show(5)

+------------------+--------------------+---------------------------+
|         entity_id|  entity_description|Cleansed_entity_description|
+------------------+--------------------+---------------------------+
|585947808772960257|GP workload harmi...|       workload harming ...|
|585947807816650752|Short people's 'h...|       short people hear...|
|585866060991078401|New approach agai...|       new approach hiv ...|
|585794106170839041|Coalition 'underm...|       coalition undermi...|
|585733482413891584|Review of case ag...|       review case nhs m...|
+------------------+--------------------+---------------------------+
only showing top 5 rows



*********************
### The output from this is the cleaned text which can be further used for model building by converting it into features.
*********************

***********
## 3-FEATURE EXTRACTION
*********

### Data Table 
*******
1)Creating Table view for sql query based filtering.<br>
2)Converting the pyspark dataframe to pandas dataframe.<br>
3)Getting the list of all documents.<br>

In [49]:
df_cleaned.createOrReplaceTempView("SYSTEM")
sqlDF = sqlContext.sql("SELECT * FROM SYSTEM")
df_cleaned_pd=sqlDF.toPandas()
df_cleaned_list=df_cleaned_pd['Cleansed_entity_description'].tolist()
                                                            

### Creating some helper functions
*****

1) Getting all the token in the documents.<br>
2) Preparing a dictionary from those token.<br>
3) Creating the corpus from dictionary using bag of words.<br>

In [50]:
tokens=[]
def list_token(mf):
    for each in mf:
        if each is not None:
            token=each.split(' ')
            tokens.append(token)
    return tokens

def getdictionary(list_of_documents):
    generated_token_list=list_token(list_of_documents)
    dictionary=gensim.corpora.Dictionary(generated_token_list)
    num_words=len(dictionary)
    print("The Number of words: {}".format(num_words))
    return dictionary

def create_corpus(list_of_documents):
    dictionary=getdictionary(list_of_documents)
    return dictionary,[dictionary.doc2bow(each) for each in list_token(list_of_documents)]

### Getting the dictionary and corpus 
*********

In [51]:
dictionary,corpus=create_corpus(df_cleaned_list)


The Number of words: 8217


********
## 4-MODEL BUILDING
***********


### Topic Modelling using Latent Dirichlet Allocation
**************

1) Create an instant of the model.<br>
2) Defining various model parameters. <br>
3) Visualizing the output.

In [52]:
Lda = gensim.models.ldamodel.LdaModel
ldamodel = Lda(corpus, num_topics=10, id2word = dictionary, passes=50)
lda_display=pyLDAvis.gensim.prepare(ldamodel, corpus, dictionary)
pyLDAvis.display(lda_display)


Saliency: a measure of how much the term tells you about the topic

Relevance: a weighted average of the probability of the word given the topic and the word given the topic normalized by the probability of the topic.

Lambda of 1 means just look at the how likely the word is given the topic. This is what we see in Gensim.

Lambda of 0 means just look at the normalized version.

Relevance(term w | topic t) = lambda * P(w|t) + (1-lambda)P(w|t)/P(w)

### Stopping the Spark Context once the task is over.

In [53]:
sc.stop()
print("Successful")

Successful
