In [288]:
import mwxml
import bz2
import sqlalchemy_utils
from collections import Counter
import re
import mwparserfromhell as mwparser
import pickle
from joblib import Parallel, delayed
from multiprocessing import cpu_count

In [289]:
from sqlalchemy import (create_engine, Column, MetaData, Table, BigInteger, DateTime, 
                        Integer, String, Boolean, ForeignKey, PickleType)

In [290]:
input_file = "../data/dumps.wikipedia.org/enwiki/20180901/enwiki-20180901-pages-articles.xml.bz2"

In [291]:
metadata = MetaData()

table_revisions = Table('revisions', metadata,
    Column('rev_id', BigInteger(), primary_key = True),
    Column('timestamp', DateTime()),
    Column('user_id', Integer()),
    Column('user_text', String(255)),
    Column('page_id', BigInteger()),
    Column('page_title', String()),
    Column('page_namespace', Integer()),
    Column('page_redirect', String(255)),
    Column('is_redirect', Boolean()),
    Column('minor', Boolean()),
    Column('comment', String(767)),
    Column('text', String()),
    Column('sha1', String(40)),
    Column('parent_id', Integer()),
    Column('deleted_text', Boolean()),
    Column('deleted_comment', String(767)),
    Column('deleted_user', String(255)),
    Column('parsed_text', PickleType()))


In [292]:
table_wikilinks = Table('wikilinks', metadata,
    Column('rev_id', Integer(), ForeignKey('revisions.rev_id'), primary_key=True),
    Column('name', String(), primary_key=True),
    Column('count', Integer()),
    Column('file', Boolean()),
    Column('category', Boolean())
    )

In [293]:
table_templates = Table('templates', metadata,
    Column('rev_id', Integer(), ForeignKey('revisions.rev_id'), primary_key=True),
    Column('name', String(), primary_key=True),
    Column('count', String()))

In [294]:
engine = create_engine(f"postgres://postgres:password12345@db:5432/enwiki")

In [295]:
if not sqlalchemy_utils.database_exists(engine.url):
    sqlalchemy_utils.create_database(engine.url)

In [None]:
metadata.drop_all(engine)
metadata.create_all(engine)

In [None]:
def revision_to_dict(rev):
    """Convert a Revision to a dict object"""
    rev = rev.to_json()
    rev['rev_id'] = rev['id']
    del rev['id']
    for k in ("id", "title", "namespace", 'redirect'):
        if k != "restrictions":
            try:
                rev[f"page_{k}"] = rev[k]
            except KeyError:
                rev[f"page_{k}"] = None
        else:
            try:
                rev[f"page_restrictions"] = " ".join(str(x) for x in rev[k])
            except KeyError:
                rev[f"page_restrictions"] = None
    del rev["page"]
    for k, v in rev["deleted"].items():
        rev[f"deleted_{k}"] = v
    del rev["deleted"]
    for k in ("id", "text"):
        try:
            rev[f"user_{k}"] = rev["user"][k]
        except KeyError:
            rev[f"user_{k}"] = None
    del rev["user"]
    rev['is_revision'] = 'page_redirect' in rev
    return rev

In [None]:
def process_revision(revision):
    """Process the revision by converting to dict, parsing the wikicode, and extracting links."""
    revision = revision_to_dict(revision)
    parsed = mwparser.parse(revision['text'])
    revision['parsed_text'] = parsed
    templates = Counter(str(x.name).strip() for x in parsed.filter_templates())
    templates = [{'rev_id': revision['rev_id'], 'name': k, 'count': v} for k, v in templates.items()]
    wikilinks = Counter(str(x.title).strip() for x in parsed.filter_wikilinks())
    wikilinks = [{'rev_id': revision['rev_id'], 'name': k, 'count': v,
                  'category': bool(re.match("^category:", k, re.I)),
                  'file': bool(re.match("^(file|image):", k, re.I))
                 } 
                 for k, v in wikilinks.items()]
    return {'revisions': [revision], 'wikilinks': wikilinks, 'templates': templates}

In [None]:
def get_revisions(filename, max_pages=None):
    """Iterate over all revisions"""
    with bz2.open(input_file, "rt") as f:
        dump = mwxml.Dump.from_file(f)
        for page in itertools.islice(dump.pages, max_pages):
            for rev in page:
                yield rev

In [None]:
def split_seq(iterable, size):
    """Split iterable into chunks of size ``size``."""
    it = iter(iterable)
    item = list(itertools.islice(it, size))
    while item:
        yield item
        item = list(itertools.islice(it, size))
    

In [None]:
# batch the number of pages
batchsize = 10
max_pages = 20
workers = cpu_count() - 1

def process_chunk(revisions):
    queue = {'revisions': [], 'wikilinks': [], 'templates': []}    
    for rev in revisions:
        for k, v in process_revision(rev).items():
            queue[k].extend(v) 
    return queue

In [None]:
inserts = {k: metadata.tables[k].insert() for k in queue}
pool = Parallel(n_jobs = workers)
with engine.connect() as conn:
    revisions = split_seq(get_revisions(input_file, max_pages=max_pages), batchsize)
    for x in pool(delayed(process_chunk)(rev) for rev in revisions):
        for k, v in x.items():
            if len(v):
                conn.execute(metadata.tables[k].insert(), v)