# Downloading and preprocessing Wikipedia data

Download Wikipedia XML dump, decompress and filter articles to only include those with a geotag in Allegheny County.
This notebook was heavily inspired by [this notebook](https://github.com/WillKoehrsen/wikipedia-data-science/blob/master/notebooks/Downloading%20and%20Parsing%20Wikipedia%20Articles.ipynb).

Link to dumps: https://dumps.wikimedia.org/enwiki/

### Import packages

In [1]:
import os
import sys
import bz2
import subprocess
import io
import re
import gc
import json
import hashlib
from multiprocessing import Pool, set_start_method
from itertools import chain
from functools import partial
from timeit import default_timer as timer
import warnings
warnings.filterwarnings(action="ignore")

import requests                    # make http requests
import xml.sax                     # parse xml
import mwparserfromhell            # parse wikimedia
import pandas as pd                # data processing
from bs4 import BeautifulSoup      # parsing HTML
from tqdm.notebook import tqdm     # progress bars
from keras.utils import get_file   # downloading files

import multiprocessor_wiki         # mulitprocessing
from data_processor import *       # parsing coordinates

Define constants.

- ``PATH``: Path to the base data folder
- ``USA``: whether to filter for coordinates in the USA or Allegheny County
- ``CPU_CORES``: how many cpu cores to use, default = all

In [2]:
PATH = "C:\\Users\\Tim\\.keras\\datasets\\wikipedia_real_estate\\"
USA = True
CPU_CORES = os.cpu_count()

In [None]:
if USA:
    COORD_RANGE_LAT = (25.11667, 49.040000)
    COORD_RANGE_LONG = (-125.666666, -59.815000)
else:
    COORD_RANGE_LAT = (40.000000, 40.870000)
    COORD_RANGE_LONG = (-80.550000, -79.500000)

Show list of dumps.

In [3]:
base_url = 'https://dumps.wikimedia.org/enwiki/'
index = requests.get(base_url).text
soup_index = BeautifulSoup(index, 'html.parser')

# Find the links that are dates of dumps
dumps = [a['href'] for a in soup_index.find_all('a') if 
         a.has_attr('href')]
dumps

['../',
 '20210101/',
 '20210120/',
 '20210201/',
 '20210220/',
 '20210301/',
 '20210320/',
 '20210401/',
 'latest/']

In [4]:
dump_url = base_url + '20201201/'

# Retrieve the html
dump_html = requests.get(dump_url).text
dump_html[:10]

'<html>\r\n<h'

In [5]:
# Convert to a soup
soup_dump = BeautifulSoup(dump_html, 'html.parser')

# Find li elements with the class file
soup_dump.find_all('li', {'class': 'file'}, limit = 10)[:4]

[]

Iterate through files to find all downloadable files and show first 5.

In [6]:
files = []

# Search through all files
for file in soup_dump.find_all('li', {'class': 'file'}):
    text = file.text
    # Select the relevant files
    if 'pages-articles' in text:
        files.append((text.split()[0], text.split()[1:]))
        
files[:5]

[]

Select all compressed xml files.

In [7]:
files_to_download = [file[0] for file in files if '.xml-p' in file[0]]
files_to_download[-5:]

[]

Disregard multistream files.

In [8]:
files_to_download = [x for x in files_to_download if "multistream" not in x]
files_to_download[-5:]

[]

Download all relevant files. If file is already downloaded, display size of file.

In [9]:
data_paths = []
file_info = []

# create project dirs
if not os.path.exists(PATH):
        os.mkdir(PATH)
wiki_path = PATH + "wikipedia\\"
if not os.path.exists(wiki_path):
        os.mkdir(wiki_path)
compressed_path = wiki_path + "compressed\\"
if not os.path.exists(compressed_path):
        os.mkdir(compressed_path)

# Iterate through each file
for file in files_to_download:
    path = compressed_path + file
    
    
    # Check to see if the path exists (if the file is already downloaded)
    if not os.path.exists(path):
        # If not, download the file
        data_paths.append(get_file(file, dump_url + file))
        # Find the file size in MB
        file_size = os.stat(path).st_size / 1e6
        
        # Find the number of articles
        file_articles = int(file.split('p')[-1].split('.')[-2]) - int(file.split('p')[-2])
        file_info.append((file, file_size, file_articles))
        
    # If the file is already downloaded find some information
    else:
        data_paths.append(path)
        # Find the file size in MB
        file_size = os.stat(path).st_size / 1e6
        
        print(f"Found File {file}, size: {round(file_size, 2)} MB")
        
        # Find the number of articles
        file_number = int(file.split('p')[-1].split('.')[-2]) - int(file.split('p')[-2])
        file_info.append((file.split('-')[-1], file_size, file_number))

Download md5 checksums.

In [10]:
checksums = get_file(compressed_path + "md5_checksums", dump_url + "enwiki-20201201-md5sums.txt")

In [11]:
def md5(fname):
    """Returns md5 hash of file with name fname"""
    hash_md5 = hashlib.md5()
    with open(compressed_path + fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

Check if data was downloaded correctly.

In [12]:
with open(checksums) as file:
   data = file.read()

md5_lst = data.split("\n")
md5_lst = [x.split("  ") for x in md5_lst][:-1]
md5_lst = [file for file in md5_lst if ".xml-p" in file[1] and not "multistream" in file[1] and not "meta" in file[1]]

downloaded_md5 = []
if [x[1] for x in md5_lst] == files_to_download:
    
    for file in tqdm(files_to_download):
        downloaded_md5.append(md5(file))
        
    if [x[0] for x in md5_lst] == downloaded_md5:
        print("Downloads verified by MD5 checksums")
    else:
        print("Download was faulty, the following files could not be verified:")
        
        for file in [x for x in downloaded_md5 if x not in md5_lst]:
            print(file)
else:
    print("Files to downloaded are not equal to md5 checksum files")

Files to downloaded are not equal to md5 checksum files


Display total size and article count of downloaded dump.

In [13]:
file_sizes = [file[1] for file in file_info]
article_count = [file[2] for file in file_info]

print(f"The total size of files on disk is {round(sum(file_sizes) / 1e3, 2)} GB")
print(f"The total number of articles is {sum(article_count)}")

The total size of files on disk is 0.0 GB
The total number of articles is 0


Let's take a peek at the data.

In [14]:
data_path = data_paths[15]
data_path

lines = []
for i, line in enumerate(bz2.BZ2File(data_path, 'r')):
    lines.append(line)
    if i > 5e5:
        break

lines[27190:27210]

IndexError: list index out of range

Define content handler to parse XML.

In [15]:
class SimpleWikiXmlHandler(xml.sax.handler.ContentHandler):
    """Content handler for Wiki XML data using SAX"""
    def __init__(self):
        xml.sax.handler.ContentHandler.__init__(self)
        self._buffer = None
        self._values = {}
        self._current_tag = None
        self._pages = []

    def characters(self, content):
        """Characters between opening and closing tags"""
        if self._current_tag:
            self._buffer.append(content)

    def startElement(self, name, attrs):
        """Opening tag of element"""
        if name in ('title', 'text', 'timestamp'):
            self._current_tag = name
            self._buffer = []

    def endElement(self, name):
        """Closing tag of element"""
        if name == self._current_tag:
            self._values[name] = ' '.join(self._buffer)

        if name == 'page':
            self._pages.append((self._values['title'], self._values['text']))

Parse compressed files to find first 500 articles and display title for 10 of those.

In [16]:
# Object for handling xml
handler = SimpleWikiXmlHandler()

# Parsing object
parser = xml.sax.make_parser()
parser.setContentHandler(handler)

for i, line in enumerate(bz2.BZ2File(data_path, 'r')):
    
    parser.feed(line)
    
    # Stop when 500 articles have been found
    if len(handler._pages) > 500:
        break
        
print([x[0] for x in handler._pages][190:200])

NameError: name 'data_path' is not defined

Select one to take a closer look.

In [None]:
print(handler._pages[191][0])
page_text = handler._pages[191][1]
wiki = mwparserfromhell.parse(page_text)
wiki[:1000]

In [None]:
links = [x.title for x in wiki.filter_wikilinks()]
print(f"There are {len(links)} wikilinks in this article:")
links[:5]

Comments were not downloaded, so the following will always be empty.

In [None]:
print(wiki.filter_arguments())
print(wiki.filter_comments())

In [None]:
external_links = [x.url for x in wiki.filter_external_links()]
print(f'There are {len(external_links)} external links:')
external_links[:5]

In [None]:
templates = wiki.filter_templates()
print(f'There are {len(templates)} templates:')
templates[:5]

Look for coordinates.

In [None]:
infobox = wiki.filter_templates(matches="coord")[0]
print(infobox)
print(infobox.name.strip_code().strip().lower())

Test the extract coordinates method.

In [None]:
extract_coordinates(str(infobox))

Display main text of article.

In [None]:
wiki.strip_code().strip()

Define a method to extract all selected templates from an article and return them.

In [17]:
def process_article(title, text, timestamp, coord_range_lat, coord_range_long, template="coord"):
    """Process wikipedia article looking for template"""
    
    # Create a parsing object
    wikicode = mwparserfromhell.parse(text)
    
    # Search through templates for the template
    coord_matches = wikicode.filter_templates(matches=template)
    
    # Filter out errant matches
    coord_matches = [x for x in coord_matches if x.name.strip_code().strip().lower() == template.lower()]
    
    # check if match contains coordinates
    if len(coord_matches) >= 1:
        
        # extract coordinates
        coords = extract_coordinates(str(coord_matches[0]))
        
        # coords have wrong format
        if not coords:
            return None
        
        # check if coordinates are in Western Pennsylvania region
        if coord_range_lat[0] < coords[0] < coord_range_lat[1] and coord_range_long[0] < coords[1] < coord_range_long[1]:
            
            # Extract all templates
            all_templates = wikicode.filter_templates()
            
            infobox = [x for x in all_templates if "infobox" in x.name.strip_code().strip().lower()]
            
            if len(infobox) >= 1:
                # Extract information from infobox if existing
                properties = {param.name.strip_code().strip(): param.value.strip_code().strip() 
                              for param in infobox[0].params
                              if param.value.strip_code().strip()}
            else:
                properties = None

            # Extract internal wikilinks
            wikilinks = [x.title.strip_code().strip() for x in wikicode.filter_wikilinks()]
    
            # Extract external links
            exlinks = [x.url.strip_code().strip() for x in wikicode.filter_external_links()]
        
            # Find approximate length of article
            text_length = len(wikicode.strip_code().strip())

            return [title, coords, properties, wikilinks, exlinks, text_length]
        else:
            # object not in Western Pennsylvania region, disregard
            return None

Define a more complex content handler which can filter for articles from Allegheny County.

In [18]:
class WikiXmlHandler(xml.sax.handler.ContentHandler):
    """Parse through XML data using SAX"""
    def __init__(self):
        xml.sax.handler.ContentHandler.__init__(self)
        self._buffer = None
        self._values = {}
        self._current_tag = None
        self._articles = []
        self._article_count = 0
        self._non_matches = []

    def characters(self, content):
        """Characters between opening and closing tags"""
        if self._current_tag:
            self._buffer.append(content)

    def startElement(self, name, attrs):
        """Opening tag of element"""
        if name in ('title', 'text', 'timestamp'):
            self._current_tag = name
            self._buffer = []

    def endElement(self, name):
        """Closing tag of element"""
        if name == self._current_tag:
            self._values[name] = ' '.join(self._buffer)

        if name == 'page':
            self._article_count += 1
            # Search through the page to see if the geocoordinate is in Allegheny County
            article = process_article(**self._values, coord_range_lat=COORD_RANGE_LAT, coord_range_long=COORD_RANGE_LONG)
            # Append to the list of articles
            if article:
                self._articles.append(article)

Search for objects in Western Pennsylvania region in the 16th Wikipedia file and stop if 3 are found.

In [None]:
# Object for handling xml
handler = WikiXmlHandler()

# Parsing object
parser = xml.sax.make_parser()
parser.setContentHandler(handler)

print(f"Searching for articles in {data_path}...")
for i, line in enumerate(bz2.BZ2File(data_path, 'r')):
    parser.feed(line)
    # Stop when 3 objects have been found
    if len(handler._articles) > 2:
        break
       
if USA:
    print(f'Searched through {handler._article_count} articles to find {len(handler._articles)} objects in the USA')
else:
    print(f'Searched through {handler._article_count} articles to find {len(handler._articles)} objects in Western Pennsylvania region.')

Let's see what articles have been identified to be in Allegheny County.

In [None]:
print(*[article[0] for article in handler._articles], sep=", ")

## Process all articles with multiprocessing

Check if all files are correctly in the compressed path and display an exemplary data path.

In [19]:
partitions = [compressed_path + file for file in os.listdir(compressed_path) if 'xml-p' in file]
len(partitions), partitions[-1]

(59,
 'C:\\Users\\Tim\\.keras\\datasets\\wikipedia_real_estate\\wikipedia\\compressed\\enwiki-20201201-pages-articles9.xml-p2936261p4045402.bz2')

Run the script to process all compressed files and look for articles in Allegheny County.

In [20]:
multiprocessor_wiki.process(compressed_path, USA, CPU_CORES)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=59.0), HTML(value='')))




## Joining the data back together

Read all json files containing information about locations from each partition.

In [21]:
def read_data(file_path):
    """Read in json data from `file_path`"""
    
    data = []
    # Open the file and load in json
    with open(file_path, 'r') as fin:
        for l in fin.readlines():
            data.append(json.loads(l))
            
    return data

In [22]:
if USA:
    uncompressed_path = os.path.dirname(os.path.dirname(compressed_path)) + "\\uncompressed_usa\\"
else:
    uncompressed_path = os.path.dirname(os.path.dirname(compressed_path)) + "\\uncompressed\\"

saved_files = [uncompressed_path + x for x in os.listdir(uncompressed_path)]  # find all data to read

articles = []
for file in saved_files:
    articles.extend(read_data(file))

Save all articles in one file.

In [25]:
if USA:
    f_path = os.path.dirname(os.path.dirname(uncompressed_path)) + "\\wikipedia_selected_usa.ndjson"  # create path to new file
else:
    f_path = os.path.dirname(os.path.dirname(uncompressed_path)) + "\\wikipedia_selected.ndjson"  # create path to new file

if not os.path.exists(f_path):
    with open(f_path, 'wt') as fout:
        json.dump(articles, fout)
    print('Articles saved.')
else:
    print('File already saved.')

Articles saved.


Assert whether data was successfully exported.

In [26]:
with open(f_path) as fin:
    data_loaded = json.load(fin)

assert data_loaded == articles