# Wikipedia Edits ClickHouse Streaming ETL Pipeline

In [None]:
import sseclient
import clickhouse_driver
import requests
import json
import ipaddress
from datetime import datetime
from IPython.display import display, clear_output
from clickhouse_driver import Client
import typing

In [None]:
def get_geolocation(ip_address: str) -> dict:
    url = f"https://api.ipgeolocationapi.com/geolocate/{ip_address}"
    try:
        response_raw = requests.request("GET", url)
        response_json = json.loads(response_raw.text)
        result_record = {
            "continent": response_json["continent"],
            "region":    response_json["region"],
            "subregion": response_json["subregion"],
            "country":   response_json["alpha2"].lower(),
            "lat":       float(response_json["geo"]["latitude_dec"]),
            "lon":       float(response_json["geo"]["longitude_dec"])
        }
        return result_record
    except:
        return {}

def geolocation_enricher_generator(generator: typing.Iterable[dict]) -> typing.Iterable[dict]:
    for change_record in generator:
        try: # check if "user" is an IP address...
            ipaddress.ip_address(change_record["user"])
        except:
            # no IP address, but a named user...
            change_record["anonymous"] = False
            pass
        else:
            # IP address, anonymous user, find geolocation...
            change_record["anonymous"] = True
            change_record.update(get_geolocation(change_record["user"]))
        yield change_record

In [None]:
def wikipedia_change_message_generator() -> typing.Iterable[dict]:
    sse_source_url = "https://stream.wikimedia.org/v2/stream/recentchange"
    recent_changes_stream = sseclient.SSEClient(sse_source_url)
    for raw_msg in recent_changes_stream:
        if len(raw_msg.data) == 0: continue # remove empty responses
        json_msg = json.loads(raw_msg.data)
        if json_msg["bot"]: continue # remove bot-generated edits
        if json_msg["type"] not in ["new", "edit"]: continue # only keep page edit and new page events
        result_record = { # create result record and fill it with fields from json_msg...
            "id":            json_msg["id"],
            "type":          json_msg["type"],
            "timestamp":     datetime.utcfromtimestamp(int(json_msg["timestamp"])),
            "user":          json_msg["user"],
            "title":         json_msg["title"],
            "comment":       json_msg["comment"],
            "minor":         json_msg["minor"],
            "length_old":    0 if json_msg["type"] == "new" else json_msg["length"]["old"],
            "length_new":    json_msg["length"]["new"],
            "revision_old":  0 if json_msg["type"] == "new" else json_msg["revision"]["old"],
            "revision_new":  json_msg["revision"]["new"],
            "server_name":   json_msg["server_name"],
            "wiki":          json_msg["wiki"],
            "parsedcomment": json_msg["parsedcomment"],      
            "anonymous":     None,
            "continent":     None,
            "region":        None,
            "subregion":     None,
            "country":       None,
            "lat":           None,
            "lon":           None
        }
        yield result_record

In [None]:
for change_record in geolocation_enricher_generator(wikipedia_change_message_generator()):
    clear_output(wait=True)
    display(change_record)

In [None]:
def batch_generator(iterable: typing.Iterable, n: int = 1) -> typing.Iterable:
    current_batch = []
    for item in iterable:
        current_batch.append(item)
        if len(current_batch) == n:
            yield current_batch
            current_batch = []
    yield current_batch # last, incomplete batch

In [None]:
for change_record in batch_generator(geolocation_enricher_generator(wikipedia_change_message_generator()), n=8):
    clear_output(wait=True)
    display(change_record)

In [None]:
ch = Client("localhost", password="RussiaDoesNotLikeSlow")

In [None]:
ch.execute("CREATE DATABASE IF NOT EXISTS clickhouse_experiments")

#ch.execute("DROP TABLE IF EXISTS clickhouse_experiments.wikipedia_changes")

ch.execute("""
CREATE TABLE IF NOT EXISTS clickhouse_experiments.wikipedia_changes
(
    id             UInt64,
    type           LowCardinality(String),
    timestamp      DateTime('UTC'),
    user           String,
    title          String,
    comment        String,
    minor          UInt8,
    length_old     Nullable(UInt64),
    length_new     UInt64,
    revision_old   Nullable(UInt64),
    revision_new   UInt64,
    server_name    LowCardinality(String),
    wiki           LowCardinality(String),
    parsedcomment  String,
    anonymous      UInt8,
    continent      Nullable(String),
    region         Nullable(String),
    subregion      Nullable(String),
    country        Nullable(String),
    lat            Nullable(Float64),
    lon            Nullable(Float64),
    h3index        Nullable(UInt64) MATERIALIZED isNull(lat) ? NULL : geoToH3(lon, lat, 12)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, wiki, id)
SAMPLE BY id
TTL timestamp + INTERVAL 3 YEAR
SETTINGS index_granularity=8192
""")

In [None]:
ch.execute("USE clickhouse_experiments")

ch.execute("SHOW TABLES")

In [None]:
for row_batch in batch_generator(geolocation_enricher_generator(wikipedia_change_message_generator()), n=32):
    ch.execute("INSERT INTO clickhouse_experiments.wikipedia_changes VALUES", row_batch)
    clear_output(wait=True)
    display(f"inserted a row_batch of len {len(row_batch)} with id {row_batch[0]['id']}")

In [None]:
ch.execute('SELECT count() FROM clickhouse_experiments.wikipedia_changes')

In [None]:
ch.disconnect()