In [1]:
import os
from time import time
import numpy as np
from nltk.corpus import stopwords
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, IDF
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, size
from pyspark.sql.types import StringType
import pyLDAvis

stop_words = set(stopwords.words('english'))


def init_spark():
    spark = SparkSession.builder.appName("HW3-Coord-data").getOrCreate()
    return spark


def read_json_files(root_path, spark):
    json_dir = root_path + "document_parses/pdf_json/"
    filenames = os.listdir(json_dir)

    all_json = [json_dir + filename for filename in filenames]
    # todo - for now restrict this to 100 files
    all_json = all_json[:100]

    data = spark.read.json(all_json, multiLine=True)
    data.createOrReplaceTempView("data")
    return data


def get_body_text(spark, data):
    # Select text columns
    # todo - add more columns
    covid_sql = spark.sql(
        """
            SELECT
                body_text.text AS body_text,
                paper_id
            FROM data
            """)
    return covid_sql


def topic_render(topic, wordNumbers, vocabArray):  # specify vector id of words to actual words
    terms = topic[1]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result


def clean_up_sentences(sentence):
    matches = [word for word in sentence.split(' ') if word.isalnum()]
    matches = [word.lower() for word in matches]
    matches = [word for word in matches if word not in stop_words]
    matches = [word for word in matches if len(word) >= 3]
    return matches


def clean_up(document):
    cleaned = [clean_up_sentences(w) for w in document]
    joined = [' '.join(w) for w in cleaned]
    return joined

def format_data_to_pyldavis(cleaned_DataFrame, cvmodel, lda_transformed, lda_model):
    counts = cleaned_DataFrame.select((explode(cleaned_DataFrame.filtered)).alias("tokens")).groupby("tokens").count()
    wc = {i['tokens']: i['count'] for i in counts.collect()}
    wc = [wc[x] for x in cvmodel.vocabulary]


    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T,
            'doc_topic_dists': np.array([x.toArray() for x in lda_transformed.select(["topicDistribution"]).toPandas()['topicDistribution']]),
            'doc_lengths': [x[0] for x in cleaned_DataFrame.select(size(cleaned_DataFrame.filtered)).collect()],
            'vocab': cvmodel.vocabulary,
            'term_frequency': wc}

    return data

# todo - need to change this code - straight from stackoverflow
def filter_bad_docs(data):
    bad = 0
    doc_topic_dists_filtrado = []
    doc_lengths_filtrado = []

    for x,y in zip(data['doc_topic_dists'], data['doc_lengths']):
        if np.sum(x)==0:
            bad+=1
        elif np.sum(x) != 1:
            bad+=1
        elif np.isnan(x).any():
            bad+=1
        else:
            doc_topic_dists_filtrado.append(x)
            doc_lengths_filtrado.append(y)

    data['doc_topic_dists'] = doc_topic_dists_filtrado
    data['doc_lengths'] = doc_lengths_filtrado


def main():
    start = time()
    root_path = '../data/archive/'
    spark = init_spark()
    json_files = read_json_files(root_path, spark)
    data = get_body_text(spark, json_files)

    # clean the data
    word_clean_up_F = F.udf(lambda x: clean_up(x), StringType())
    data = data.withColumn("body_text_cleaned", word_clean_up_F("body_text"))

    # tokenize documents
    tokenizer = Tokenizer(inputCol="body_text_cleaned", outputCol="words")
    token_DataFrame = tokenizer.transform(data)

    # Remove stopwords
    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    cleaned_DataFrame = remover.transform(token_DataFrame)

    # Count vectorizer
    cv_tmp = CountVectorizer(inputCol="filtered", outputCol="count_features")
    cvmodel = cv_tmp.fit(cleaned_DataFrame)
    count_dataframe = cvmodel.transform(cleaned_DataFrame)

    # TF-IDF Vectorizer
    tfidf = IDF(inputCol="count_features", outputCol="features")
    tfidfmodel = tfidf.fit(count_dataframe)
    tfidf_dataframe = tfidfmodel.transform(count_dataframe)


    # Fit the LDA Model
    num_topics = 10
    max_iterations = 50
    lda = LDA(seed=1, optimizer="em", k=num_topics, maxIter=max_iterations)
    lda_model = lda.fit(tfidf_dataframe)
    lda_transformed = lda_model.transform(tfidf_dataframe
                                          )
    print("done fitting")
    # joblib.dump(lda_model, 'lda.csv')

    # Get terms per topic
    topics = lda_model.topicsMatrix()
    vocabArray = cvmodel.vocabulary

    wordNumbers = 15  # number of words per topic
    topicIndices = lda_model.describeTopics(maxTermsPerTopic=wordNumbers).rdd.map(tuple)

    topics_final = topicIndices.map(lambda topic: topic_render(topic, wordNumbers, vocabArray)).collect()

    for topic in range(len(topics_final)):
        print("Topic" + str(topic) + ":")
        print(topics_final[topic])
        
    
    # Data vizualization
    data = format_data_to_pyldavis(cleaned_DataFrame, cvmodel, lda_transformed, lda_model)
    filter_bad_docs(data)
    
    return data

In [3]:
data = main()
data



done fitting




Topic0:
['ultrasound', 'phage', 'niclosamide', 'telerobotic', 'junv', 'cells', 'pedv', 'peptide', 'exams', 'denv', 'ifn', 'abs', 'peptides', 'vero', 'protein']
Topic1:
['hpv', 'rhinitis', 'patients', 'nasal', 'bronchiolitis', 'chest', 'aerosolization', 'participants', 'pediatric', 'species', 'acps', 'leptospirosis', 'children', 'plasma', 'kdh']
Topic2:
['ibv', 'chickens', 'rndv', 'ims', 'optimized', 'ndv', 'mvs', 'immunized', 'expressing', 'des', 'codon', 'virulent', 'gene', 'pcv2', 'protein']
Topic3:
['ehrs', 'lipid', 'ata', 'mam', 'lqts', 'smooth', 'hcq', 'rough', 'repurposed', 'student', 'calcium', 'pay', 'drugs', 'access', 'fatty']
Topic4:
['aviation', 'emissions', 'wearable', 'u937', 'reovirus', 'healthcare', 'emission', 'cat', 'etco', 'intention', 'ivh', 'icu', 'international', 'participants', 'infants']
Topic5:
['business', 'aging', 'chiasma', 'mrna', 'ribosome', 'translation', 'cell', 'mcs', 'tcs', 'linguistic', 'localization', 'chromosome', 'rna', 'habronattus', 'lncrnas']
Top

{'topic_term_dists': array([[7.32595645e+00, 2.03213562e+02, 1.76965756e+01, ...,
         6.79755118e-05, 3.24171015e-05, 8.05564779e-05],
        [5.51637074e+00, 1.71932117e+01, 1.89885966e+01, ...,
         6.73345899e-05, 3.10044037e-05, 6.38356778e-05],
        [5.52417188e+00, 9.70355157e+01, 9.95415443e+00, ...,
         6.67210030e-05, 3.14888025e-05, 8.38815913e-05],
        ...,
        [6.48341464e+00, 6.70171454e+00, 1.10193369e+01, ...,
         9.32052060e-05, 3.77114935e-05, 3.92123980e+00],
        [6.08246016e+00, 7.94414187e+01, 1.04785458e+01, ...,
         7.45807242e-05, 3.92164534e+00, 7.42534375e-05],
        [5.88472699e+00, 2.59156262e+01, 4.84772086e+00, ...,
         3.92124368e+00, 4.82194682e-05, 9.55394644e-05]]),
 'doc_topic_dists': [array([1.21736982e-03, 1.34384664e-03, 1.27289679e-03, 1.21364342e-03,
         1.16948672e-03, 9.89749618e-01, 1.03527576e-03, 9.71340252e-04,
         9.94256416e-04, 1.03226593e-03]),
  array([0.00161207, 0.98664095, 0.00

In [4]:
# Try PyLDAviz
py_lda_prepared_data = pyLDAvis.prepare(**data)

In [5]:
pyLDAvis.display(py_lda_prepared_data)