In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, regexp_replace, lower, col, explode, regexp_extract
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.orc('data/dataframe.orc')
df.printSchema()

In [None]:
df.count()

In [None]:
# https://meta.stackexchange.com/questions/2677/database-schema-documentation-for-the-public-data-dump-and-sede
df.select('Title', 'Body', 'Score', 'Tags', 'PostTypeId').show()

In [None]:
# Get only questions
questions = df.filter(col('PostTypeId') == 1)
questions.count()

In [None]:
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline
import string

In [None]:
tokenizer = Tokenizer(inputCol="Body", outputCol="words")
stopwords = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="stopwords")
punctuation = StopWordsRemover(inputCol=stopwords.getOutputCol(), outputCol="filtered", stopWords=[''] + list(string.punctuation))

stages = [tokenizer, stopwords, punctuation]
pipeline = Pipeline(stages=stages)

In [None]:
words = pipeline.fit(questions).transform(questions)

In [None]:
%%time
word_count = words.select(explode(col('filtered')).alias('word')).groupby('word').count().cache()

In [None]:
%%time
word_count.sort(col('count').desc()).show(truncate=False)

In [None]:
word_count.count()

In [None]:
VOCAB_SIZE = 100000
vocab = word_count.sort(col('count').desc()).limit(VOCAB_SIZE).toPandas()

In [None]:
vocab

In [None]:
vocab['count'].plot()

In [None]:
vocab.iloc[3:1000]['count'].plot()

In [None]:
vocab.head(10)

In [None]:
lookup = dict(vocab.reset_index().set_index('word')['index'])

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.ml.feature import HashingTF, Tokenizer, IDF


import numpy as np

In [None]:
def bag_of_words(words: list) -> list:
    bag = np.zeros(VOCAB_SIZE, dtype=int)
    
    for word in words:
        if word in lookup:
            bag[lookup[word]] += 1
        
    return bag.tolist()

In [None]:
SparkBag = T.ArrayType(T.LongType())

count_vectorizer = F.udf(bag_of_words, SparkBag)

In [None]:
vectorized = words.withColumn('vector', count_vectorizer(F.col('filtered')))

In [None]:
vectorized.select('Title', 'Body', 'words', 'stopwords', 'filtered', 'vector').show()

In [None]:
cv = CountVectorizer(inputCol=punctuation.getOutputCol(), outputCol="counts", minDF=2.0, vocabSize=50000)
idf = IDF(inputCol=cv.getOutputCol(), outputCol="features")

pipeline = Pipeline(stages=stages + [cv, idf])

In [None]:
tfidf = pipeline.fit(questions)

In [None]:
features = tfidf.transform(questions)

In [None]:
features.printSchema()

In [None]:
features.select('Body', 'words', 'stopwords', 'filtered', 'counts', 'features').show()

In [None]:
idfModel = tfidf.stages[-1]

In [None]:
inverseFreq = idfModel.idf

In [None]:
vocab[inverseFreq.argsort()[:20]]

In [None]:
vocab[inverseFreq.argsort()[-20:]]

## LDA

In [None]:
from pyspark.ml.clustering import LDA, LocalLDAModel, DistributedLDAModel
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
lda = LDA(k=30, featuresCol="counts", seed=1, optimizer='online', maxIter=10, optimizeDocConcentration=True)

In [None]:
model = lda.fit(features)

In [None]:
model.save('lda')

In [None]:
model = LocalLDAModel.load('lda')

In [None]:
ll = model.logLikelihood(features)
lp = model.logPerplexity(features)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

In [None]:
topics = model.describeTopics(5)
topics.show(truncate=70)

In [None]:
categorized = model.transform(features)
categorized.select('Title', 'topicDistribution').show(truncate=70)

## Interpreting LDA

In [None]:
topicsDF = topics.toPandas()

In [None]:
pipeline.getStages()

In [None]:
_, _, _, vectorizerModel, _ = tfidf.stages

In [None]:
vectorizerModel.vocabulary[:10]

In [None]:
vocab = np.array(vectorizerModel.vocabulary)

In [None]:
topicsDF['words'] = topicsDF.termIndices.apply(lambda x: vocab[x])

In [None]:
topicsDF

## Cross Validation

In [None]:
num_topics = [5, 20, 50]

paramGrid = ParamGridBuilder() \
    .addGrid(lda.k, num_topics) \
    .build()

In [None]:
modelVal = lda.fitMultiple(features, paramMaps = paramGrid)