# 2. LDA in PySpark

In this notebook an LDA is carried out on all four periods. First, however, a pipeline is built to convert the raw text into the right format for LDA. First a Bag of Words approach is used, using a CountVectorizer. Then the speed of this approach is compared to that of using the output of the Bert Embeddings from 1. 

To run this on Google Cloud Platform, the followings instructions are to be followed.

1. First, make a bucket. Upload spark-nlp-init.sh and json dataset into that bucket.
2. Then, create a cluster: gcloud dataproc clusters create laurens-cluster --bucket=laurens-bucket --subnet default --zone europe-west2-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 1.3-deb9 --initialization-actions 'gs://dataproc-initialization-actions/jupyter/jupyter.sh,gs://dataproc-initialization-actions/python/pip-install.sh,gs://laurens-bucket/spark-nlp-init.sh' --metadata 'PIP_PACKAGES=sklearn nltk pandas numpy textblob spark-nlp'
3. Then run "gcloud beta compute ssh --zone "europe-west2-a" "laurens-cluster-m" --project "seminar-work-st446""
4. Within the cluster, gsutil cp gs://laurens-bucket/* .
4. Then do hadoop fs -mkdir txtdata, then hadoop fs -put climateChangeArticlesGuardian0019.json txtdata
5. In tab #0, gcloud compute ssh laurens-cluster-m --project=seminar-work-st446 --zone=europe-west2-a -- -D 1080 -N
6. In tab #1, /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --proxy-server=socks5://localhost:1080 --user-data-dir=/tmp/laurens-cluster-m http://laurens-cluster-m:8123

### 2.0 Importing packages and data

In [1]:
import nltk # test
import time
import numpy as np

from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import monotonically_increasing_id
import string

sc.defaultParallelism

4

In [66]:
jsonfolder = "hdfs:///user/superlaut/txtdata/climateChangeArticlesGuardian0019.json"

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType

schema = StructType([
    StructField("bodyText", StringType(), True),
    StructField("webPublicationDate", TimestampType(), True),
    StructField("wordcount", FloatType(), True)
])

df = spark.read.json(jsonfolder, schema)

df.printSchema()

root
 |-- bodyText: string (nullable = true)
 |-- webPublicationDate: timestamp (nullable = true)
 |-- wordcount: float (nullable = true)



In [67]:
# Split the dataframe into four periods
df0004 = df.where(df.webPublicationDate.between('2000-01-01','2004-12-31'))
df0509 = df.where(df.webPublicationDate.between('2005-01-01','2009-12-31'))
df1014 = df.where(df.webPublicationDate.between('2010-01-01','2014-12-31'))
df1519 = df.where(df.webPublicationDate.between('2015-01-01','2019-12-04'))

In [36]:
# This operation is carried out to transform the df into an RDD. 
news = df.select("bodyText")
news = news.toJSON()
print('Number of partitions: ', news.getNumPartitions())
n = news.count()
print('Number of documents n = ', n)

Number of partitions:  6
Number of documents n =  21001


### 2.1 LDA

First, we find the stopwords using the approach from the seminar. The doc_stop_words selects all words that appear more than 5,000 times in the raw text. From these, I manually select the words that I do not consider relevant for topic modelling.

In [None]:
stop_words = set(stopwords.words('english'))
table = str.maketrans('', '', string.punctuation)
lmtzr = WordNetLemmatizer()
def get_tokens(line):
    ###
    import nltk
#     nltk.download('all')
    ###
    tokens = word_tokenize(line)
    # convert to lower case
    tokens = [w.lower() for w in tokens]
    # remove punctuations from each word
    stripped = [w.translate(table) for w in tokens]
    # remove remaining tokens that are not alphabetic
    words = [word for word in stripped if word.isalpha()]
    # filter out stop words
    words = [w for w in words if not w in stop_words]
    # lemmatizing the words, see https://en.wikipedia.org/wiki/Lemmatisation
    words = [lmtzr.lemmatize(w) for w in words]
    return (words)

news_rdd = news.map(lambda line: (1, get_tokens(line)))
doc_stop_words = news_rdd.flatMap(lambda r: r[1]).map(lambda r: (r,1)).reduceByKey(lambda a,b: a+b)
doc_stop_words = doc_stop_words.filter(lambda a: a[1]>5000).map(lambda r: r[0]).collect()

In [1]:
doc_stop_words = ['something', 'le', 'un', 'could', 'keep', 'thing', 'must', 'find', 'however', 
                  'one', 'really', 'put', 'given', 'whether', 'make', 'way', 'nt', 'told', 'come',
                  'would', 'three', 'give', 'mr', 'like', 'also', 'u', 'rather', 'made', 'mean', 
                  'may', 'seen', 'bodytext', 'set', 'much', 'become', 'two', 'even', 'might', 
                  'around', 'going', 'per', 'many', 'said', 'say']

#### Pipeline LDA
Here I create a pipeline to convert the raw text into cleaned tokens. 

In [72]:
from pyspark.sql.functions import udf
import re
from nltk.tag import pos_tag
from nltk.corpus import stopwords

# remove non ASCII characters. Not necessarily relevant, but the text might contain some ASCII characters.
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)

# Using user-defined functions, again, so that PySpark sql recognises the function.
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

# Removing features
def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())

remove_features_udf = udf(remove_features, StringType())

# Changing abbreviations into full words. Probably not relevant, as these words
# are probably not topic modelled.
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

fix_abbreviation_udf = udf(fix_abbreviation, StringType())

# Removing stop words
def remove_stops(data_str):
    # expects a string
    stops = list(set(stopwords.words("english"))) + ['something', 'le', 'un', 'could', 'keep', 'thing', 'must', 'find', 'however', 
                  'one', 'really', 'put', 'given', 'whether', 'make', 'way', 'nt', 'told', 'come',
                  'would', 'three', 'give', 'mr', 'like', 'also', 'u', 'rather', 'made', 'mean', 
                  'may', 'seen', 'bodytext', 'set', 'much', 'become', 'two', 'even', 'might', 
                  'around', 'going', 'per', 'many', 'said', 'say']
    
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
            # rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str

remove_stops_udf = udf(remove_stops, StringType())

# Lemmatizing, using WordNetLemmatizer
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

lemmatize_udf = udf(lemmatize, StringType())

In [69]:
from pyspark.ml.feature import IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.clustering import LDA

# Configure an ML pipeline, which consists of tree stages: tokenizer, vectorizer, and lda.
tokenizer = Tokenizer(inputCol="lemm_text", outputCol="tokens")
vectorizer = CountVectorizer(inputCol= "tokens", outputCol="features")

# I have finetuned the LDA parameters, leading to a low perplexity for df0004.
# By assumption, this works for the other dataframes too.
lda = LDA(k=10, maxIter=30, optimizer = 'em', learningOffset=1024.0, 
          learningDecay=0.4, subsamplingRate=0.05, 
          optimizeDocConcentration=True, seed = 1)
pipeline = Pipeline(stages=[tokenizer, vectorizer, lda])

# I could have included the udf as part of the pipeline, but this would lead
# to really ugly code, including a lot of SQLTransformers. Instead, 
# I wrote a function.
def LDApipeline(df_0):
    start = time.time()
    df_0 = df_0.withColumn('text_non_asci',strip_non_ascii_udf(df_0['bodyText']))
    df_0 = df_0.withColumn('removed',remove_features_udf(df_0['text_non_asci']))
    df_0 = df_0.withColumn('non_abbrev',fix_abbreviation_udf(df_0['removed']))
    df_0 = df_0.withColumn('stop_text',remove_stops_udf(df_0['non_abbrev']))
    df_0 = df_0.withColumn('lemm_text',lemmatize_udf(df_0['stop_text']))
    
    model = pipeline.fit(df_0)
    end = time.time()
    dt = end - start
    print('Operation cost',dt,'seconds')
    return model

In [29]:
model0004 = LDApipeline(df0004)

Operation cost 126.04530048370361 seconds


In [35]:
model0509 = LDApipeline(df0509)

Operation cost 434.1915535926819 seconds


In [39]:
model1014 = LDApipeline(df1014)

Operation cost 407.7295799255371 seconds


In [40]:
model1519 = LDApipeline(df1519)

Operation cost 616.1662304401398 seconds


Transforming the data and creating the LDA models takes very long. This is understandable, as the dataframes have huge vocabulaires (> 26k, 67k even for the period from 2015-2019). 

In [41]:
lda_0004 = model0004.stages[-1]
print('Vocab size 00-04 =',lda_0004.vocabSize())
lda_0509 = model0509.stages[-1]
print('Vocab size 05-09 =',lda_0509.vocabSize())
lda_1014 = model1014.stages[-1]
print('Vocab size 00-04 =',lda_1014.vocabSize())
lda_1519 = model1519.stages[-1]
print('Vocab size 00-04 =',lda_1519.vocabSize())

Vocab size 00-04 = 26148
Vocab size 05-09 = 44922
Vocab size 00-04 = 46250
Vocab size 00-04 = 67406


The code below describes the topics.

In [32]:
# Describe topics
topics0004 = lda_0004.describeTopics(7)

print("The topics described by their top-weighted terms:")
# Shows the results
topic_i = topics0004.select("termIndices").rdd.map(lambda r: r[0]).collect()
for i in topic_i:
    print(np.array(model0004.stages[-2].vocabulary)[i])

The topics described by their top-weighted terms:
['people' 'water' 'island' 'town' 'home' 'kill' 'mile']
['year' 'climate' 'change' 'warm' 'global' 'scientist' 'ice']
['people' 'world' 'government' 'africa' 'country' 'aid' 'food']
['climate' 'country' 'world' 'change' 'emission' 'global' 'kyoto']
['day' 'get' 'time' 'people' 'work' 'say' 'take']
['weather' 'rain' 'flood' 'yesterday' 'south' 'north' 'east']
['energy' 'government' 'environment' 'power' 'development' 'summit'
 'minister']
['u' 'bush' 'world' 'global' 'president' 'state' 'american']
['flood' 'water' 'year' 'area' 'river' 'risk' 'home']
['earthquake' 'people' 'city' 'rescue' 'building' 'quake' 'kill']


In [38]:
# Describe topics
topics0509 = lda_0509.describeTopics(7)

print("The topics described by their top-weighted terms:")
# Shows the results
topic_i = topics0509.select("termIndices").rdd.map(lambda r: r[0]).collect()
for i in topic_i:
    print(np.array(model0509.stages[-2].vocabulary)[i])

The topics described by their top-weighted terms:
['change' 'climate' 'government' 'need' 'political' 'take' 'issue']
['use' 'energy' 'think' 'people' 'get' 'say' 'home']
['power' 'car' 'government' 'plan' 'new' 'green' 'station']
['energy' 'oil' 'climate' 'change' 'world' 'carbon' 'government']
['people' 'police' 'city' 'flood' 'area' 'water' 'day']
['year' 'water' 'food' 'tree' 'ice' 'world' 'land']
['climate' 'country' 'emission' 'change' 'u' 'global' 'target']
['year' 'people' 'say' 'change' 'day' 'time' 'climate']
['carbon' 'emission' 'government' 'energy' 'uk' 'reduce' 'year']
['climate' 'change' 'global' 'scientist' 'warm' 'rise' 'temperature']


In [42]:
# Describe topics
topics1014 = lda_1014.describeTopics(7)

print("The topics described by their top-weighted terms:")
# Shows the results
topic_i = topics1014.select("termIndices").rdd.map(lambda r: r[0]).collect()
for i in topic_i:
    print(np.array(model1014.stages[-2].vocabulary)[i])

The topics described by their top-weighted terms:
['government' 'green' 'environment' 'change' 'plan' 'climate' 'uk']
['country' 'emission' 'climate' 'develop' 'china' 'world' 'u']
['climate' 'government' 'policy' 'carbon' 'change' 'australia' 'energy']
['energy' 'carbon' 'gas' 'uk' 'power' 'emission' 'use']
['climate' 'change' 'water' 'flood' 'year' 'obama' 'people']
['climate' 'warm' 'change' 'temperature' 'year' 'global' 'ice']
['world' 'change' 'oil' 'climate' 'fossil' 'fuel' 'year']
['climate' 'change' 'repo' 'global' 'world' 'new' 'u']
['get' 'think' 'know' 'time' 'people' 'go' 'year']
['climate' 'science' 'change' 'scientist' 'people' 'scientific' 'public']


In [43]:
# Describe topics
topics1519 = lda_1519.describeTopics(7)

print("The topics described by their top-weighted terms:")
# Shows the results
topic_i = topics1519.select("termIndices").rdd.map(lambda r: r[0]).collect()
for i in topic_i:
    print(np.array(model1519.stages[-2].vocabulary)[i])

The topics described by their top-weighted terms:
['climate' 'change' 'government' 'country' 'australia' 'people' 'say']
['emission' 'carbon' 'energy' 'country' 'climate' 'paris' 'gas']
['climate' 'people' 'take' 'change' 'action' 'crisis' 'u']
['climate' 'change' 'u' 'people' 'trump' 'global' 'year']
['fire' 'year' 'record' 'city' 'temperature' 'water' 'weather']
['climate' 'change' 'fossil' 'global' 'company' 'world' 'fuel']
['year' 'ice' 'change' 'warm' 'temperature' 'sea' 'reef']
['year' 'water' 'food' 'tree' 'plant' 'people' 'say']
['climate' 'university' 'change' 'science' 'london' 'guardian' 'letter']
['government' 'energy' 'policy' 'coal' 'australia' 'climate' 'minister']


### 2.2 Embeddings versus Bag of Words
The code below imports the Bert Embeddings from the Spark NLP file 1. An LDA model of the same type is made for both output from the df0004 dataframe, after transforming its output into a format readable by LDA. 

In [61]:
jsonfolder2 = "hdfs:///user/superlaut/guardian_features.json"

df_vec = spark.read.json(jsonfolder2)
df_vec = df_vec.withColumnRenamed('features', 'featureslist')
df_vec.printSchema()

root
 |-- featureslist: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [62]:
from pyspark.ml.linalg import Vector, Vectors, VectorUDT
from pyspark.sql.functions import udf

# The vector has to be in VectorUDT format for PySpark, thus I convert it here.
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
df_vec_feat = df_vec.withColumn("features", to_vector(df_vec["featureslist"]))

In [63]:
start = time.time()
lda0004 = lda.fit(df_vec_feat)
end = time.time()
dt = end - start
print('Operation cost',dt,'seconds')

Operation cost 34.5931761264801 seconds


In [64]:
lda0004.vocabSize()

768

In [74]:
# Run the entire operation from the pipeline, without fitting the LDA, so that the LDA operation can be fairly timed.
tokenizer = Tokenizer(inputCol="lemm_text", outputCol="tokens")
vectorizer = CountVectorizer(inputCol= "tokens", outputCol="features")
lda = LDA(k=10, maxIter=30, optimizer = 'em', learningOffset=1024.0, 
          learningDecay=0.4, subsamplingRate=0.05, 
          optimizeDocConcentration=True, seed = 1)

df_0 = df0004
df_0 = df_0.withColumn('text_non_asci',strip_non_ascii_udf(df_0['bodyText']))
df_0 = df_0.withColumn('removed',remove_features_udf(df_0['text_non_asci']))
df_0 = df_0.withColumn('non_abbrev',fix_abbreviation_udf(df_0['removed']))
df_0 = df_0.withColumn('stop_text',remove_stops_udf(df_0['non_abbrev']))
df_0 = df_0.withColumn('lemm_text',lemmatize_udf(df_0['stop_text']))
tokenised = tokenizer.transform(df_0)
cv_model = vectorizer.fit(tokenised)

In [79]:
vectorized_2 = cv_model.transform(tokenised)

In [81]:
start = time.time()
lda_old = lda.fit(vectorized_2)
end = time.time()
dt = end - start
print('Operation cost',dt,'seconds')

Operation cost 105.92472839355469 seconds


Clearly, the LDA on the embeddings is much faster. In fact, it is 3x faster. This is understandable as, instead of a large vector containing a count for all different words within each document (more than 26,000 in total), the LDA model is fed a vector of size 768 for each document.