In [1]:
!scala -version

Scala code runner version 2.12.10 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.


In [2]:
from google.cloud import bigquery
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder \
  .appName('Reddit LDA Topics')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

spark.version

'2.4.5'

In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

# Retrieve Reddit Data from BigQuery

In [5]:
QUERY = """
SELECT *
FROM `fh-bigquery.reddit_posts.2018_*`
WHERE score>0 
and subreddit in (select subr from `fh-bigquery.reddit.top20`)
-- and subreddit = 'technology' 
"""

In [6]:
spark = SparkSession.builder.appName('Query Results').getOrCreate()
bq = bigquery.Client()

In [7]:
print('Querying BigQuery')
table_id = "cptsrewards-hrd.Jason_temp.test_tmp_table"

job_config = bigquery.QueryJobConfig(
    allow_large_results=True, destination=table_id, use_legacy_sql=False
)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = bq.query(QUERY, job_config=job_config)
query_job.result()

Querying BigQuery


<google.cloud.bigquery.table.RowIterator at 0x7fa2e14288d0>

In [8]:
df = spark.read.format('bigquery') \
    .option('dataset', query_job.destination.dataset_id) \
    .load(query_job.destination.table_id)

# Remove Special Characters

In [9]:
def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

In [10]:
df_titles = df.withColumn("title_no_ascii", ascii_udf('title')) \
.withColumn("title_no_spaces", trim(col("title_no_ascii"))) \
.filter('length(title_no_spaces) > 10') \
.select('title')\
.cache()

In [11]:
df_titles.count()

6713450

# Text Prepping

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

In [13]:
# http://spark.apache.org/docs/latest/ml-features.html#tf-idf
# https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/3783546674231782/4413065072037724/latest.html

tokenizer = Tokenizer(inputCol="title", outputCol="words")
df_tokenizer = tokenizer.transform(df_titles)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_remover = remover.transform(df_tokenizer)

vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",
                             minDF=5, vocabSize=100).fit(df_remover)

df_titles_out = vectorizer.transform(df_remover)

# hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100)

# idf = IDF(inputCol="rawFeatures", outputCol="features")

# pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])

# model = pipeline.fit(df_titles)
# df_titles_out = model.transform(df_titles)

In [14]:
df_titles_out

title,words,filtered,features
MRW I don't know ...,"[mrw, i, don't, k...","[mrw, know, react...","(100,[21],[1.0])"
Faith &amp; unfal...,"[faith, &amp;, un...","[faith, &amp;, un...","(100,[83],[1.0])"
Look at this so c...,"[look, at, this, ...","[look, called, ""c...","(100,[72],[1.0])"
Anon makes an alp...,"[anon, makes, an,...","[anon, makes, alp...","(100,[82],[1.0])"
/g/ anon values h...,"[/g/, anon, value...","[/g/, anon, value...","(100,[],[])"
MRW I look outsid...,"[mrw, i, look, ou...","[mrw, look, outsi...","(100,[49,72],[1.0..."
MRW I'm finally d...,"[mrw, i'm, finall...","[mrw, finally, do...","(100,[56],[1.0])"
The group least l...,"[the, group, leas...","[group, least, li...","(100,[15],[1.0])"
Anon deals with bi,"[anon, deals, wit...","[anon, deals, bi]","(100,[],[])"
MRW my post doesn...,"[mrw, my, post, d...","[mrw, post, take,...","(100,[68],[1.0])"


In [15]:
from pyspark.ml.clustering import LDA

In [None]:
lda = LDA(featuresCol = 'features', k=10, maxIter =10)

model = lda.fit(df_titles_out)


In [None]:
# ll = model.logLikelihood(df_titles_out)
# lp = model.logPerplexity(df_titles_out)
# print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
# print("The upper bound on perplexity: " + str(lp))


In [None]:
# Describe topics.
# https://www.zstat.pl/2018/02/07/scala-spark-get-topics-words-from-lda-model/
topics = model.describeTopics(5)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

The topics described by their top-weighted terms:
+-----+--------------------+----------------------------------------------------------------------------------------------------------+
|topic|termIndices         |termWeights                                                                                               |
+-----+--------------------+----------------------------------------------------------------------------------------------------------+
|0    |[2, 7, 20, 23, 25]  |[0.19126216914263053, 0.1283388851512674, 0.07794244276993893, 0.06326187778884601, 0.06222672209580321]  |
|1    |[6, 4, 13, 38, 45]  |[0.18737980224453477, 0.17002687019598225, 0.1301715023513198, 0.07206668730650158, 0.06843630829071246]  |
|2    |[9, 19, 28, 31, 62] |[0.20974030918213601, 0.14496233517595006, 0.12366635019573312, 0.08430531602993338, 0.06996069179356383] |
|3    |[37, 41, 58, 48, 64]|[0.17263959105818222, 0.15921470615794156, 0.13890383010809923, 0.1384581753459524, 0.12087454567734648]  

In [None]:
topicIndices = model.describeTopics(maxTermsPerTopic = 5)
vocabList = vectorizer.vocabulary

In [None]:
topics.select("termIndices").collect()[1][0]

[6, 4, 13, 38, 45]

In [None]:
# Helper function
n_top_words = 3
def print_topics(topics, vectorizer, num_topics=10):
    words = vectorizer.vocabulary
    for i in range(n_top_words):
        indices = topics.select("termIndices").collect()[i][0]
        print(i)
        print(indices)
        wordsList = []
        for j in indices:
            wordsList.append(words[j])
        print(wordsList)

print_topics(topics, vectorizer, 10)

0
[2, 7, 20, 23, 25]
['like', 'new', 'make', 'life', 'got']
1
[6, 4, 13, 38, 45]
['ever', 'reddit,', 'time', 'want', 'anyone']
2
[9, 19, 28, 31, 62]
['get', '[serious]', '|', 'it?', 'help']


In [None]:
type(topicIndices)

pyspark.sql.dataframe.DataFrame

In [None]:
print_topics(my_df,vectorizer, 10)

NameError: name 'my_df' is not defined

In [None]:
topics = topicIndices.map(lambda (terms, termWeights): terms.map(vocabList(_)).zip(termWeights))


In [None]:
topicIndices

In [None]:
# Shows the result
transformed = model.transform(df_titles_out)
transformed.show(truncate=False)

In [None]:
transformed.select("topicDistribution").show(10,False)

In [None]:
model.vocabulary