In [1]:
!scala -version

Scala code runner version 2.12.10 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.


In [2]:
from datetime import datetime
from google.cloud import bigquery
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,trim,udf,regexp_replace,lower,hash,countDistinct,array_max

In [3]:
spark = SparkSession.builder \
  .appName('Covid19 News LDA Topics')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

spark.version

'2.4.5'

In [4]:
sc = spark.sparkContext
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

# Retrieve COVID-19 News Data from BigQuery

In [5]:
QUERY = """
select distinct NET.REG_DOMAIN(URL) domain, Title, CONCAT(Title, " ", Context) text
from `gdelt-bq.covid19.onlinenews`
where rand()<0.1
"""

In [6]:
spark = SparkSession.builder.appName('Query Results').getOrCreate()
bq = bigquery.Client()

In [None]:
print('Querying BigQuery')
table_id = "data-analysis-202319.jy_covid19_analysis.test_tmp_table"

job_config = bigquery.QueryJobConfig(
    allow_large_results=True, destination=table_id, use_legacy_sql=False
)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = bq.query(QUERY, job_config=job_config)
query_job.result()

Querying BigQuery


<google.cloud.bigquery.table.RowIterator at 0x7f740ad3b6d0>

In [None]:
df = spark.read.format('bigquery') \
    .option('dataset', query_job.destination.dataset_id) \
    .load(query_job.destination.table_id)

# Remove Special Characters

In [20]:
def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

In [21]:
def removePunctuation(column):
     return trim(lower(regexp_replace(column,'[^\sa-zA-Z0-9]', '')))

In [22]:
df_text = df.withColumn("text_no_ascii", ascii_udf('text')) \
.withColumn("text_no_special", removePunctuation(col("text_no_ascii"))) \
.withColumn("text_lower", lower(col("text_no_special"))) \
.withColumn("text_hashed", hash('text_lower')) \
.filter('length(text_lower) > 10') 
# .limit(500000)
# limit to x0k records for testing pipeline

In [23]:
df_text_unique = df_text.select("text_lower","text_hashed")\
    .dropDuplicates()

In [24]:
df_text.count()

2807552

In [25]:
df_text_unique.count()

2190020

From the outputs of the last two cells, we see that a large amount of our news records are duplicates, or articles that were syndicated among news outlets. As a result, we drop the duplicates for our topic modeling.

# Top Domains

In [26]:
# Create Dataframe with top domains
df_domains_top = df_text\
.groupBy("domain")\
.count()\
.sort(col("count").desc()) \
.limit(100)

In [27]:
# Create Dataframe with domain-text_hashed mapping for only top domains
df_domain = df_text\
.join(df_domains_top, on=['domain'])\
.select("domain","text_hashed")

# Text Prepping

In [28]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover

In [29]:
# http://spark.apache.org/docs/latest/ml-features.html#tf-idf
# https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/3783546674231782/4413065072037724/latest.html

# remove "" from array https://sparkbyexamples.com/spark/working-with-spark-dataframe-filter/

tokenizer = Tokenizer(inputCol="text_lower", outputCol="words")
df_tokenizer = tokenizer.transform(df_text_unique)


# Add Custom StopWords https://stackoverflow.com/questions/43623400/how-to-add-custom-stop-word-list-to-stopwordsremover
stopwordList = ["", "coronavirus","covid19","pandemic", "said", "also", "ap", "due"] 
stopwordList.extend(StopWordsRemover().getStopWords())

remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stopwordList)
df_remover = remover.transform(df_tokenizer)

vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",
                             vocabSize=5000).fit(df_remover)

df_text_out = vectorizer.transform(df_remover)

# hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100)

# idf = IDF(inputCol="rawFeatures", outputCol="features")

# pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])

# model = pipeline.fit(df_titles)
# df_titles_out = model.transform(df_titles)

In [30]:
df_text_out

text_lower,text_hashed,words,filtered,features
de blasio says ny...,1640177803,"[de, blasio, says...","[de, blasio, says...","(5000,[1,6,9,13,1..."
rupee vs dollar r...,-1773633840,"[rupee, vs, dolla...","[rupee, vs, dolla...","(5000,[0,2,5,15,2..."
baidus value will...,128485907,"[baidus, value, w...","[baidus, value, u...","(5000,[3,10,20,27..."
south koreans ret...,1451117704,"[south, koreans, ...","[south, koreans, ...","(5000,[4,8,9,10,2..."
stock markets plu...,516600583,"[stock, markets, ...","[stock, markets, ...","(5000,[1,39,67,10..."
trading briefly h...,-98579109,"[trading, briefly...","[trading, briefly...","(5000,[0,1,2,6,15..."
powell recovery m...,-1364474195,"[powell, recovery...","[powell, recovery...","(5000,[5,10,74,11..."
coronavirus south...,-1350252603,"[coronavirus, sou...","[south, attleboro...","(5000,[1,3,5,11,2..."
trump churches ar...,440719351,"[trump, churches,...","[trump, churches,...","(5000,[23,24,38,5..."
families connect ...,2016617532,"[families, connec...","[families, connec...","(5000,[3,16,37,39..."


# LDA

In [31]:
from pyspark.ml.clustering import LDA

In [None]:
# https://gist.github.com/feynmanliang/3b6555758a27adcb527d
# https://databricks.com/blog/2015/09/22/large-scale-topic-modeling-improvements-to-lda-on-apache-spark.html

numTopics = 10
lda = LDA(featuresCol = 'features', k=numTopics, maxIter =30, optimizer="online")

model = lda.fit(df_text_out)


In [None]:
# ll = model.logLikelihood(df_titles_out)
# lp = model.logPerplexity(df_titles_out)
# print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
# print("The upper bound on perplexity: " + str(lp))


In [None]:
# model.estimatedDocConcentration()
# model.topicsMatrix()

# LDA Topic Description

In [None]:
# Describe topics.
# https://www.zstat.pl/2018/02/07/scala-spark-get-topics-words-from-lda-model/
topics = model.describeTopics(5)

In [None]:
from pyspark.sql.types import *

In [None]:
def DFindices_to_DFnames(topics, vectorizer, num_topics=10):
    words = vectorizer.vocabulary
    cSchema = StructType([StructField('termIndices',ArrayType(LongType(),True),True),
                          StructField('termNames',ArrayType(StringType(),True),True)])
    df_termNames = spark.createDataFrame(sc.emptyRDD(), schema=cSchema)
    
    for i in range(num_topics):
        
        indices = topics.select("termIndices").collect()[i][0]
        wordsList = []
        for j in indices:
            wordsList.append(words[j])
        input_list = [(indices,wordsList)]
        df_i = spark.createDataFrame(input_list)
        df_out = df_i.withColumnRenamed("_1","termIndices") \
            .withColumnRenamed("_2","termNames")

        df_termNames = df_termNames.union(df_out)
    return df_termNames

In [None]:
df_termNames = DFindices_to_DFnames(topics, vectorizer, num_topics=numTopics)
topics_with_names = topics.join(df_termNames, on=['termIndices'])
topics_with_names.select("topic", "termNames", "termIndices").show(10,False)

+-----+-------------------------------------------+---------------------+
|topic|termNames                                  |termIndices          |
+-----+-------------------------------------------+---------------------+
|0    |[2020, school, new, time, students]        |[21, 140, 1, 27, 213]|
|1    |[market, us, economic, global, economy]    |[95, 5, 104, 75, 111]|
|2    |[cases, new, deaths, confirmed, number]    |[0, 1, 12, 17, 15]   |
|3    |[positive, tested, news, home, people]     |[8, 25, 7, 19, 3]    |
|4    |[health, county, state, cases, public]     |[2, 22, 6, 0, 13]    |
|5    |[patients, hospital, health, care, medical]|[33, 57, 2, 46, 51]  |
|6    |[people, virus, health, vaccine, spread]   |[3, 4, 2, 334, 11]   |
|7    |[new, china, lockdown, people, travel]     |[1, 34, 14, 3, 96]   |
|8    |[help, support, food, people, new]         |[39, 86, 132, 3, 1]  |
|9    |[trump, us, people, president, health]     |[53, 5, 3, 59, 2]    |
+-----+-------------------------------

In [None]:
topics_with_names.schema

StructType(List(StructField(termIndices,ArrayType(IntegerType,false),true),StructField(topic,IntegerType,false),StructField(termWeights,ArrayType(DoubleType,false),true),StructField(termNames,ArrayType(StringType,true),true)))

# Topic Distribution

In [None]:
# Shows the result
transformed = model.transform(df_text_out)
transformed

text_lower,text_hashed,words,filtered,features,topicDistribution
de blasio says ny...,1640177803,"[de, blasio, says...","[de, blasio, says...","(5000,[1,6,9,13,1...",[0.33444016287311...
rupee vs dollar r...,-1773633840,"[rupee, vs, dolla...","[rupee, vs, dolla...","(5000,[0,2,5,15,2...",[0.00168033910113...
baidus value will...,128485907,"[baidus, value, w...","[baidus, value, u...","(5000,[3,10,20,27...",[0.00287827704010...
south koreans ret...,1451117704,"[south, koreans, ...","[south, koreans, ...","(5000,[4,8,9,10,2...",[0.60250762043066...
stock markets plu...,516600583,"[stock, markets, ...","[stock, markets, ...","(5000,[1,39,67,10...",[0.00156216042141...
trading briefly h...,-98579109,"[trading, briefly...","[trading, briefly...","(5000,[0,1,2,6,15...",[0.00164921744532...
powell recovery m...,-1364474195,"[powell, recovery...","[powell, recovery...","(5000,[5,10,74,11...",[0.00228583131459...
coronavirus south...,-1350252603,"[coronavirus, sou...","[south, attleboro...","(5000,[1,3,5,11,2...",[0.18304239081944...
trump churches ar...,440719351,"[trump, churches,...","[trump, churches,...","(5000,[23,24,38,5...",[0.00228575271392...
families connect ...,2016617532,"[families, connec...","[families, connec...","(5000,[3,16,37,39...",[0.78465695252273...


In [None]:
# https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark
def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)


In [None]:
df_transformed = transformed \
.withColumn("topic_array", to_array("topicDistribution")) \
.withColumn("topic_max", array_max("topic_array")) 
# .withColumn("topic_index", array_position(col("topic_array"), "topic_max"))

In [None]:
# UDF to assign index to list items 
# https://html.developreference.com/article/10897735/Get+index+of+item+in+array+that+is+a+column+in+a+Spark+dataframe
max_topic_index = udf(lambda x,y: [i for i, e in enumerate(x) if e==y ])

In [None]:
df_transformed2 = df_transformed \
.withColumn("topicPred", max_topic_index(col("topic_array"),col("topic_max")))\
.withColumn("topicPredStr",col("topicPred").substr(2,1))\
.withColumn("topic", col("topicPredStr").cast(IntegerType()))

In [None]:
df_transformed_names = topics_with_names.join(df_transformed2, on=['topic'])\
.select("topic", "topic_max", "termNames","text_hashed")

In [None]:
df_transformed_names_agg = df_transformed_names\
    .groupBy("topic", "termNames")\
    .count()\
    .orderBy("topic").cache()

df_transformed_names_agg.show(10,False)

+-----+-------------------------------------------+------+
|topic|termNames                                  |count |
+-----+-------------------------------------------+------+
|0    |[2020, school, new, time, students]        |281955|
|1    |[market, us, economic, global, economy]    |252257|
|2    |[cases, new, deaths, confirmed, number]    |253840|
|3    |[positive, tested, news, home, people]     |158069|
|4    |[health, county, state, cases, public]     |257269|
|5    |[patients, hospital, health, care, medical]|157643|
|6    |[people, virus, health, vaccine, spread]   |207877|
|7    |[new, china, lockdown, people, travel]     |225033|
|8    |[help, support, food, people, new]         |254329|
|9    |[trump, us, people, president, health]     |141748|
+-----+-------------------------------------------+------+



In [None]:
# Create timestamp for saving table records
save_time = datetime.now().strftime("%Y%m%d_%H_%M")

In [None]:
df_transformed_names_agg.select("topic", 
                                col("termNames").cast(StringType()).alias("termNamesStr"),
                                "count")\
    .write.format('bigquery') \
    .option("temporaryGcsBucket","data-analysis-jy") \
    .save('data-analysis-202319.jy_covid19_analysis.topic_counts_'+str(save_time))

# Topics by Domains

In [None]:
df_transformed_domains = df_transformed_names.join(df_domain, on=['text_hashed'])\
    .groupBy("topic","termNames", "domain")\
    .count()\
    .orderBy("topic","domain").cache()

In [None]:
df_transformed_domains.show(30,False)

+-----+-----------------------------------+---------------------+-----+
|topic|termNames                          |domain               |count|
+-----+-----------------------------------+---------------------+-----+
|0    |[2020, school, new, time, students]|680news.com          |367  |
|0    |[2020, school, new, time, students]|abc.net.au           |485  |
|0    |[2020, school, new, time, students]|abs-cbn.com          |225  |
|0    |[2020, school, new, time, students]|accesswdun.com       |354  |
|0    |[2020, school, new, time, students]|aljazeera.com        |204  |
|0    |[2020, school, new, time, students]|allafrica.com        |803  |
|0    |[2020, school, new, time, students]|aninews.in           |214  |
|0    |[2020, school, new, time, students]|apnews.com           |338  |
|0    |[2020, school, new, time, students]|bbc.co.uk            |466  |
|0    |[2020, school, new, time, students]|bbc.com              |245  |
|0    |[2020, school, new, time, students]|business-standard.com

In [None]:
df_transformed_domains.select("topic", 
                              col("termNames").cast(StringType()).alias("termNamesStr"),
                              "domain",
                              "count")\
    .write.format('bigquery') \
    .option("temporaryGcsBucket","data-analysis-jy") \
    .save('data-analysis-202319.jy_covid19_analysis.topic_domain_counts_'+str(save_time))


# Save LDA Model

In [53]:
model.save("gs://data-analysis-jy/covid19LDA/lda-model-20200719")

# toPandas

In [None]:
pdf = df_transformed_domains.toPandas()

In [None]:
# pandas csv
path = "gs://data-analysis-jy/covid19LDA/pandas_df.csv"

pdf.to_csv(path)