### Extracting from Kafka and putting in a raw RDD

In [None]:
import findspark
findspark.init()
import pyspark
import json
import pykafka
from pykafka import KafkaClient
client = KafkaClient(hosts ="127.0.0.1:9092")
topic = client.topics[b'kafka_nyt']
consumer = topic.get_simple_consumer(consumer_timeout_ms=1000) 
message_list = [json.loads(message.value) for message in consumer if message is not None]
from pyspark import SparkContext, SparkConf
if not 'sc' in globals(): # This 'trick' makes sure the SparkContext sc is initialized exactly once
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)
rdd = sc.parallelize(message_list)

In [None]:
from pyspark.sql.types import *

### Clean RDD and convert to DataFrame

In [None]:
def loopfunc(group): # Group type is list w/ 40 articles
    """For every list of articles, loop over every article and keep only relevant attributes for LDA."""
    final = []

    for element in group: # Select only relevant attributes for LDA
        abstract = element['abstract']
        first_published_date = element['first_published_date']
        org_facet = element['org_facet']
        title = element['title']
        dic={'abstract': abstract, 'first_published_date': first_published_date, 
             'org_facet': org_facet, 'title' : title}
        final.append(dic)
        
    return final

In [None]:
reduced_rdd = rdd.map(lambda x: loopfunc(x)) # Apply the function to every article of every article group

In [None]:
reduced_rdd.first() # Yields correct results

In [None]:
# Create a dataframe from the PipelinedRDD
from pyspark.sql import SQLContext
from pyspark.sql import Row
from collections import OrderedDict
sqlContext = SQLContext(sc)

def convert_to_row(d: dict) -> Row:
    return Row(**OrderedDict(sorted(d.items())))

df = reduced_rdd.flatMap(lambda x: x).map(lambda x: convert_to_row(x)).toDF() # First flatten the RDD, then convert everything to row
df.printSchema()

In [None]:
def truncate_pbdate(date_as_string): # Remove the time for each published date, since we only need the date
    return date_as_string.split("T")[0]

udfTruncatePbDate = pyspark.sql.functions.udf(truncate_pbdate, StringType())
df = df.withColumn("first_published_date", udfTruncatePbDate("first_published_date"))

In [None]:
df.show(5)

### Start preprocessing for LDA

In [None]:
# Continue with preprocessing + LDA algorithm
'''filter the words on stopwords and on re'''
from nltk import word_tokenize
from nltk.corpus import stopwords
import string
import re

def remove_stop(sent):
    stop = stopwords.words('english') + list(string.punctuation)
    abstract_stopwords = [i for i in word_tokenize(sent.lower()) if i not in stop]
    text_out = [re.sub('[^a-zA-Z0-9]','',abstract) for abstract in abstract_stopwords] # Remove special characters
    text_out = [word for word in text_out if len(word)>2]     # Remove stopwords and words under X length
    
    return text_out

In [None]:
'''Calling the function that cleans the data of abstract '''
udf_cleantext = pyspark.sql.functions.udf(remove_stop , ArrayType(StringType()))
cleaned_text = df.withColumn("abstract", udf_cleantext('abstract'))

# TF-IDF
TF: HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. The algorithm combines Term Frequency (TF) counts with the hashing trick for dimensionality reduction.

IDF: IDF is an Estimator which fits on a dataset and produces an IDFModel. The IDFModel takes feature vectors (generally created from HashingTF) and scales each column. Intuitively, it down-weights columns which appear frequently in a corpus.

In [None]:
'''Generate a TF-IDF (Term Frequency Inverse Document Frequency) Matrix'''

from pyspark.ml.feature import CountVectorizer 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="abstract", outputCol="rawFeatures", vocabSize = 5000)
cvmodel = cv.fit(cleaned_text) #previous cleaned_text
featurizedData = cvmodel.transform(cleaned_text)

In [None]:
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF

In [None]:
'''Processed data with features'''
rescaledData.select('abstract','features').show(20)

### Applying LDA

In [None]:
'''Use LDA to Cluster the TF-IDF Matrix'''

from pyspark.ml.clustering import LDA

lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features") # 25 topics will result
ldamodel = lda.fit(rescaledData)

ldatopics = ldamodel.describeTopics()

### Converting topics to textual JSON to visualize in Kibana

In [None]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = pyspark.sql.functions.udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped = ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc)#.show(25,False)

# ldaResults = ldamodel.transform(rescaledData)

In [None]:
'''export as json'''
topics_json_list = [json.loads(topic) for topic in ldatopics_mapped.toJSON().collect()]

In [None]:
output = []
for topic in topics_json_list:
    item = [{"topic": topic['topic'], 
      "term": term, 
      "id":str(example['topic'])+term} for term in topic['topic_desc']]
    output.append(item)

In [None]:
output_flat = [x for y in output for x in y]

In [None]:
from elasticsearch import Elasticsearch

In [None]:
es = Elasticsearch()

In [None]:
for item in output_flat:
    es.index(index="lda", doc_type='term', id=item['id'], body=item)