#1 Building the text corpus

To build a large text corpus, we will scrape some newspaper articles from zeit.de which are publicly available and easily to parse. Newspaper articles in general are a good start to get familiar with topic modelling since we can more or less anticipate distinct topics. 

To scrape, we will first navigate through zeit.de's sitemap and download all available urls to online articles for a given period. Then, we will download them, parse their content and save the resulting text in a text file, one article per line. 

We will be using Python's asyncio library which will allows us to write asynchronous code. This way, Python can request multiple articles from the server at once without having to wait for the first request to be completed.

Install third party libraries:

In [None]:
!pip install aiohttp==3.6.2
!pip install nest_asyncio==1.4.0
!pip install aiosqlite==0.15.0

Prepare folder structure:

In [None]:
from pathlib import Path
article_dir = Path('articles')
article_dir.mkdir(exist_ok=True)

In [None]:
#sqllite database
article_db = article_dir / 'articles.db'

We will need this, so asyncio works within Ipython notebook:

In [None]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
import aiohttp
import asyncio
import aiosqlite
import sqlite3
import time
import tqdm
import traceback
from datetime import date, timedelta
from bs4 import BeautifulSoup
import re

In [None]:
db = sqlite3.connect(str(article_db))
db.execute("""
  CREATE TABLE articles
    (
      article_id INTEGER PRIMARY KEY,
      url TEXT UNIQUE,
      title TEXT,
      text TEXT,
      authors TEXT,
      publishing_date TEXT,
      topic_ref TEXT,
      tags TEXT,
      downloaded TEXT
    )
""")
db.commit()
db.close()

In [None]:
async def fetch_html(url, session):
      response = await session.request(method="GET", url=url)
      response.raise_for_status()
      content = await response.text()
      return content

async def parse_html(url, session):
    try:
        html = await fetch_html(url=url, session=session)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        print(e)
        return None
    else:
        soup = BeautifulSoup(html, features='lxml')
        urls = [entry.loc.text for entry in soup.findAll('url')]
        return urls

async def write_urls_to_db(db_session, url, session):
    result = await parse_html(url=url, session=session)
    if result:
        for article_url in result:
            entry = (f'{article_url}', 'false',)
            try:
                await db_session.execute(
                  "INSERT OR IGNORE INTO articles (url, downloaded) VALUES (?,?)",
                  entry
                )
            except sqlite3.InterfaceError as e:
                print(e)
 
async def bulk_crawl_and_write(article_db, start_date, end_date):
    def daterange(start_date, end_date):
        '''Helper function to easily iterate over date range'''
        for n in range(int((end_date - start_date).days)):
            yield start_date + timedelta(n)
    
    base_url = "https://www.zeit.de/gsitemaps/index.xml?date="
    # We have to trick zeit.de into thinking we are running the requests
    # using the library requests:
    headers = {'User-Agent': 'python-requests/2.21.0'}
    con = aiohttp.TCPConnector(limit=5)

    # Start client session:
    async with aiohttp.ClientSession(
          connector=con,
          cookie_jar=aiohttp.CookieJar(),
          headers=headers
        ) as session:
        # Run a request to set a cookie for the session:
        await session.request(method="GET", url='https://www.zeit.de/gsitemaps/index.xml')
        
        async with aiosqlite.connect(article_db) as db:
          tasks = []
          for single_date in daterange(start_date, end_date):
              url = base_url + single_date.strftime("%Y-%m-%d")
              tasks.append(
                  write_urls_to_db(db_session=db, url=url, session=session)
              )
          #await asyncio.gather(*tasks)

          responses = []
          for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
              responses.append(await f)

          await db.commit()

Retrieve all URLs listed in their sitemap for a defined period:

In [None]:
start_date = date(year=2020, month=1, day=1) #including
end_date = date(year=2020, month=2, day=1) #excluding

start_time = time.time()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(bulk_crawl_and_write(
    article_db=article_db,
    start_date=start_date,
    end_date=end_date
))
duration = time.time() - start_time
print(f"\nDownloaded sites in {duration} seconds")

Let's see how many URLs we have collected:

In [None]:
db = sqlite3.connect(str(article_db))
cursor = db.cursor()
print(len(list(cursor.execute("SELECT * FROM articles WHERE downloaded = 'false'"))))
db.close()

If we have a look at the https://www.zeit.de/robots.txt, it tells us we should not touch those URLs that cointain:
* /zeit/
* /templates/
* /hp_channels/
* /send/
* /suche/
* /comment-thread
* /liveblog-backend

Most of these URLs won't be listed in the sitemap but we never know, so let's explicitly remove them from our list of URLs:

In [None]:
db = sqlite3.connect(str(article_db))
cursor = db.cursor()

# Delete articles in english:
cursor.execute("DELETE FROM articles WHERE url LIKE '%/zeit/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/templates/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/hp_channels/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/send/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/suche/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/comment-thread/%'")
cursor.execute("DELETE FROM articles WHERE url LIKE '%/liveblog-backend/%'")

db.commit()
db.close()

In [None]:
def extract_content(tag):
    content = ''
    for x in tag.contents:
        if isinstance(x, str):
            content += ' ' + x
        else:
            content += extract_content(x)
    return content


def clean_text(text):
    # Remove line breaks
    text = text.replace('\n', ' ').replace('\r', ' ')
    
    # Remove double whitespace
    text = re.sub(r'\s{2,}', ' ', text)
    text = text.strip()
    return text


def parse_article(raw_html):
    
    result = {
      'text': '',
      'title': '',
      'tags': '',
      'authors': '',
      'publishing_date': '',
    }
    
    html = BeautifulSoup(raw_html, 'html.parser')

    publishing_date_html = html.find('time', class_='metadata__date')
    if publishing_date_html:
      result['publishing_date'] = publishing_date_html.get('datetime', '')

    title_html = html.find('span', class_='article-heading__title')
    if title_html:
      result['title'] = title_html.text

    tags_html = html.find_all('a', class_='article-tags__link')
    if tags_html:
        tags = [tag.text.replace(' ', '_') for tag in tags_html]
        result['tags'] = ' '.join(tags)

    authors_html = html.find('div', class_='byline')
    if authors_html:
        authors = [
            author.text.replace(' ', '_')
            for author
            in authors_html.find_all('span', itemprop="name")
        ]
        result['authors'] = ' '.join(authors)

    text = ''
    paragraphs = html.find_all('p', class_='paragraph article__item')
    if paragraphs:
        for paragraph in paragraphs:
            text += extract_content(paragraph)
    result['text'] = clean_text(text)

    return result

async def fetch_html(url, session):
    try:
        response = await session.request(method="GET", url=url)
        response.raise_for_status()
        return await response.text()        
    except (
        aiohttp.ClientError,
        aiohttp.ClientResponseError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        traceback.print_exc()
        return None


async def download_and_parse_article_url(url, article_id, session, db):
    result = await fetch_html(url=url, session=session)
    if result:
        result_str = str(result)
        if 'komplettansicht" data-ct-label="all"' in result_str:
            result = await fetch_html(url=url + '/komplettansicht', session=session)
        if result:
            parsing_result = parse_article(result)
            data = (
                parsing_result['title'],
                parsing_result['text'],
                parsing_result['authors'],
                parsing_result['publishing_date'],
                parsing_result['tags'],
                'true',
                article_id,
                )
            await db.execute("""
                UPDATE articles SET
                  title = ?,
                  text = ?,
                  authors = ?,
                  publishing_date = ?,
                  tags = ?,
                  downloaded = ?
                  WHERE article_id = ?
                """, data
              )

async def download_and_parse_text_of_all_article_urls(articles_db):
    con = aiohttp.TCPConnector(limit=30)
    timeout = aiohttp.ClientTimeout(total=None)
    headers = {'User-Agent': 'python-requests/2.21.0'}
    async with aiohttp.ClientSession(
          connector=con,
          cookie_jar=aiohttp.CookieJar(),
          headers=headers,
          timeout=timeout
        ) as session:
        # Run initial request to set a cookie:
        await session.request(method="GET", url='https://www.zeit.de')

        tasks = []
        async with aiosqlite.connect(articles_db) as db:
            cursor = await db.execute("SELECT rowid, url FROM articles WHERE downloaded = 'false'")
            fetchall = await cursor.fetchall()
            for row in fetchall:
                article_id = row[0]
                url = row[1]
                tasks.append(
                    download_and_parse_article_url(url=url, article_id=article_id, session=session, db=db)
                )
            for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
              await f
            await db.commit()

In [None]:
start_time = time.time()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(download_and_parse_text_of_all_article_urls(articles_db=article_db))
duration = time.time() - start_time
print(f"\nDownloaded sites in {duration} seconds")

Depending on the time of the day and the capacity of the server, we might trigger some 503 errors, meaning the server ran out of resources to fulfill our request. However, we can just rerun the cell above to redownload those that failed in the first run. There also might be an occaional 404 error indicating a dead link. 

In [None]:
db = sqlite3.connect(str(article_db))
cursor = db.cursor()
num_downloaded = len(list(cursor.execute("SELECT * FROM articles WHERE downloaded = 'true'")))
num_not_downloaded = len(list(cursor.execute("SELECT * FROM articles WHERE downloaded = 'false'")))
db.close()

print(f'URLs downloaded: {num_downloaded}')
print(f'URLs not downloaded: {num_not_downloaded}')

In [None]:
db = sqlite3.connect(str(article_db))
cursor = db.cursor()

num_rows_before = len(list(cursor.execute("SELECT * FROM articles")))

# Let's delete everything that has not been downloaded at this point, (probaly only dead urls left)
cursor.execute("DELETE FROM articles WHERE downloaded = 'false'")

# Delete articles without text:
cursor.execute("DELETE FROM articles WHERE downloaded = 'true' AND text = '' ")

# Delete articles in english:
cursor.execute("DELETE FROM articles WHERE text LIKE '%Lesen Sie diesen Text auf Deutsch%'")

db.commit()

num_rows_after = len(list(cursor.execute("SELECT * FROM articles")))
db.close()

print(num_rows_before)
print(num_rows_after)

#2 Topic Modelling using Latent Dirichlet Allocation

##2.1 Preperations:

Note: in Colab the third-party packages are already installed, if you run this notebook locally, you might have to install them before. 

In [None]:
import re
from pprint import pprint

import numpy as np
import pandas as pd
import gensim
import spacy
import nltk

In [None]:
!python -m spacy download de_core_news_sm
nltk.download('stopwords')

In [None]:
import de_core_news_sm
nlp = de_core_news_sm.load()

In [None]:
!wget https://raw.githubusercontent.com/solariz/german_stopwords/master/german_stopwords_full.txt

In [None]:
# Combine stopwords from spacy and nltk and solariz:
stopwords_solariz = set()
with open('german_stopwords_full.txt') as f:
    for word in f:
        if not word.startswith(';'):
            stopwords_solariz.add(word.strip())

stopwords_spacy = spacy.lang.de.STOP_WORDS

stopwords_nltk = nltk.corpus.stopwords.words('german')

stopwords = stopwords_spacy | set(stopwords_nltk) | stopwords_solariz | set(['hauptsache', 'jetzig', 'mittlerweile', 'freilich', 'fortan'])

## 2.2 Data preprocessing:

Data preprocessing steps:

1. remove newline characters and multiple consecutive whitespaces
2. remove quotation marks
3. remove punctuation
4. remove numerals
5. lowercase
6. tokenization
7. remove stopwords
8. Lemmatization
9. bigram and trigram collocation detection

In [None]:
# Create new table for preprocessed text
db = sqlite3.connect(str(article_db))
db.execute('CREATE TABLE articles_preprocessed (article_id INTEGER UNIQUE, preprocessed_text TEXT)')
db.commit()
db.close()

In [None]:
def preprocessing(text, stopwords):
  # Takes care of 1. - 5.
  tokens = gensim.utils.simple_preprocess(text, deacc=False) # takes care of 1.-5.
  
  # Remove stopwords
  tokens = [token for token in tokens if token not in stopwords]

  # Lemmatization
  allowed_postags= set(['NOUN', 'ADJ', 'VERB', 'ADV'])
  doc = nlp(" ".join(tokens))
  tokens = [token.lemma_ for token in doc if token.pos_ in allowed_postags]

  return tokens

In [None]:
db = sqlite3.connect(str(article_db))
cursor_iter = db.cursor()
cursor_writer = db.cursor()
num_rows = len(list(cursor_iter.execute("SELECT * FROM articles")))
for row in tqdm.tqdm(cursor_iter.execute("SELECT * FROM articles"), total=num_rows):
    entry = (row[0], ' '.join(preprocessing(row[3], stopwords)),)
    cursor_writer.execute("INSERT OR IGNORE INTO articles_preprocessed (article_id, preprocessed_text) VALUES (?,?)", entry)

db.commit()
db.close()

In [None]:
def iterate_over_processed_documents(article_db):
    db = sqlite3.connect(str(article_db))
    cursor = db.cursor()
    for row in cursor.execute("SELECT * FROM articles_preprocessed"):
      yield row[1].split(' ')
    db.close()

Bigram and Trigram collocation detection:

In [None]:
bigram = gensim.models.Phrases(
    iterate_over_processed_documents(article_db),
    min_count=10,
    threshold=0.6,
    scoring='npmi'
)
bigram_mod = gensim.models.phrases.Phraser(bigram)

trigram = gensim.models.Phrases(
    bigram_mod[iterate_over_processed_documents(article_db)],
    min_count=10,
    threshold=0.8,
    scoring='npmi'
)  
trigram_mod = gensim.models.phrases.Phraser(trigram)

Mark bigrams and trigrams:

In [None]:
db = sqlite3.connect(str(article_db))
cursor_iter = db.cursor()
cursor_writer = db.cursor()
num_rows = len(list(cursor_iter.execute("SELECT * FROM articles_preprocessed")))
for row in tqdm.tqdm(cursor_iter.execute("SELECT * FROM articles_preprocessed"), total=num_rows):
    text = row[1].split(' ')
    trigrammed_text = trigram_mod[bigram_mod[text]]
    data = (' '.join(trigrammed_text), row[0], )
    cursor_writer.execute("UPDATE articles_preprocessed SET preprocessed_text = ? WHERE article_id = ?", data)

db.commit()
db.close()

In [None]:
# Create Dictionary
dictionary = gensim.corpora.Dictionary(iterate_over_processed_documents(article_db))

# Remove words that appear less than 10, and which appear in more 60% of all documents.
dictionary.filter_extremes(no_below=10, no_above=0.6)

In [None]:
# Create bag of words corpus
corpus = [dictionary.doc2bow(text) for text in iterate_over_processed_documents(article_db)]

documents = [x for x in iterate_over_processed_documents(article_db)]

In [None]:
# Human readable format of corpus (term-frequency)
[[(dictionary[id], freq) for id, freq in cp] for cp in corpus[:1]]

In [None]:
print(f'Number of unique tokens: {len(dictionary)}')
print(f'Number of documents: {len(documents)}')

##2.3 Training the LDA model:

In [None]:
from gensim.models.callbacks import PerplexityMetric
from gensim.models.callbacks import CoherenceMetric


# Log the perplexity and coherence score at the end of each epoch:
perplexity_logger = PerplexityMetric(corpus=corpus, logger='shell')
coherence_logger = CoherenceMetric(corpus=corpus, texts=documents, coherence="c_v", logger="shell")

lda_model = gensim.models.ldamulticore.LdaModel(
    corpus=corpus,
    id2word=dictionary,
    num_topics=20, 
    random_state=100,
    chunksize=1500,
    passes=20,
    alpha='auto',
    per_word_topics=True,
    iterations=2000,
    callbacks=[coherence_logger, perplexity_logger]
)


lda_model.save('lda_model')

Alternatively, you could load a model:

In [None]:
lda_model = LdaModel.load('')

In [None]:
# Print the most 30 distinctive keyword for all 20 topics
pprint(lda_model.print_topics(20, 30))

In [None]:
# Compute Perplexity
print(f'Perplexity: {lda_model.log_perplexity(corpus)}')

# Compute Coherence Score
coherence_model_lda = gensim.models.CoherenceModel(
    model=lda_model,
    texts=documents,
    dictionary=dictionary,
    coherence='c_v'
)
coherence_lda = coherence_model_lda.get_coherence()
print(f'Coherence Score: {coherence_lda}')

##2.4 Training the LDA model with mallet:

Mallet is a different implementation that usese Gibbs Sampling which is a bit more accurate:


In [None]:
!wget http://mallet.cs.umass.edu/dist/mallet-2.0.8.zip
!unzip -a "mallet-2.0.8.zip"

In [None]:
mallet_path = Path('mallet-2.0.8') / 'bin' / 'mallet'
ldamallet = gensim.models.wrappers.LdaMallet(
    mallet_path=str(mallet_path),
    corpus=corpus,
    num_topics=20,
    id2word=dictionary,
    iterations=2000,
)

In [None]:
# Show Topics
pprint(ldamallet.show_topics(20, 30))

In [None]:
# Compute Coherence Score
coherence_model_ldamallet = gensim.models.CoherenceModel(model=ldamallet, texts=documents, dictionary=dictionary, coherence='c_v')
coherence_ldamallet = coherence_model_ldamallet.get_coherence()
print(f'Coherence Score: {coherence_ldamallet}')

#3 LDA Visualization

##3.1 Word clouds

In [None]:
from matplotlib import pyplot as plt
from wordcloud import WordCloud, STOPWORDS
import matplotlib.colors as mcolors

cols = [color for name, color in mcolors.TABLEAU_COLORS.items()]  # more colors: 'mcolors.XKCD_COLORS'

cloud = WordCloud(stopwords=stopwords,
                  background_color='white',
                  width=1800,
                  height=1000,
                  max_words=15,
                  colormap='tab10',
                  color_func=lambda *args, **kwargs: cols[i],
                  prefer_horizontal=1.0)

topics = ldamallet.show_topics(num_topics=10, num_words=20, formatted=False)

fig, axes = plt.subplots(4, 2, figsize=(20,20), sharex=True, sharey=True)

for i, ax in enumerate(axes.flatten()):
    fig.add_subplot(ax)
    topic_words = dict(topics[i][1])
    cloud.generate_from_frequencies(topic_words, max_font_size=200)
    plt.gca().imshow(cloud)
    plt.gca().set_title('Topic ' + str(i), fontdict=dict(size=24))
    plt.gca().axis('off')


plt.subplots_adjust(wspace=0.3, hspace=0.3)
plt.axis('off')
plt.margins(x=0, y=0)

plt.savefig('word_clouds.png')

##3.2 pyLDAvis


In [None]:
!pip install pyLDAvis==2.1.2

In [None]:
# Visualize the topics
import pyLDAvis
import pyLDAvis.gensim

pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda_model, corpus, dictionary)
vis