In [None]:
%load_ext autoreload
%autoreload 2
import pandas as pd
import numpy as np
import bz2
import csv
import io
import json
import re
import time
import random
import requests
import datetime
from pathlib import Path
from pprint import pprint
from typing import List, Dict
from dateutil.relativedelta import relativedelta
import lsde2021.csv as csvutils
import lsde2021.utils as utils
import lsde2021.download as dl
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [None]:
MAX_MEMORY = "30G"

spark = SparkSession \
    .builder \
    .appName("parse-wikipedia-sql-dumps") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .config('spark.driver.maxResultSize', MAX_MEMORY) \
    .config('spark.ui.showConsoleProgress', 'false') \
    .getOrCreate()
sc = spark.sparkContext

In [None]:
languages = spark.read.format("csv").options(header='True').load("./data/languages.csv")
languages = languages.withColumn("wiki_code", F.concat(F.col("code"), F.lit(".wikipedia")))
languages = languages.select(F.col("name").alias("language"), "dbname", "group", "code", "wiki_code")
languages.limit(100).show()

language_dbnames = languages.select(F.col("dbname")).distinct().rdd.flatMap(lambda x: x).collect()
print(language_dbnames)

In [None]:
raw_english_topics_schema = T.StructType([
    T.StructField('page_id', T.IntegerType(), False),
    T.StructField('topics1', T.ArrayType(T.StringType()), False),
    T.StructField('topics2', T.ArrayType(T.StringType()), False),
    T.StructField('topics3', T.ArrayType(T.StringType()), False),
    T.StructField('topics4', T.ArrayType(T.StringType()), False),
])


raw_english_topics = spark.read.format("parquet").load(f"../nvme/en_topics/topics_final.parquet")
raw_english_topics.limit(20).show()
print("page ids with topics:", raw_english_topics.select("page_id").distinct().count())

In [None]:
page_schema = T.StructType([
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("page_namespace", T.IntegerType(), True),
    T.StructField("page_title", T.StringType(), True),
    T.StructField("page_restrictions", T.StringType(), True),
    T.StructField("page_is_redirect", T.BooleanType(), True),
    T.StructField("page_is_new", T.BooleanType(), True),
    T.StructField("page_random", T.FloatType(), True),
    T.StructField("page_touched", T.TimestampType(), True),
    T.StructField("page_links_updated", T.TimestampType(), True),
    T.StructField("page_latest", T.StringType(), True),
    T.StructField("page_len", T.IntegerType(), True),
    T.StructField("page_content_model", T.StringType(), True),
    T.StructField("page_lang", T.StringType(), True),
])

raw_english_pages = spark.read.format("parquet").options(inferSchema='True').option("mergeSchema", "true").load(f"../nvme/wikipedia_sql_dumps/enwiki/20211001/enwiki-20211001-page.sql.parquet")
english_pages = raw_english_pages.withColumn("page_is_redirect", F.col("page_is_redirect").cast(T.BooleanType()))
english_pages = english_pages.withColumn("page_is_new", F.col("page_is_new").cast(T.BooleanType()))
english_pages = english_pages.withColumn("page_random", F.col("page_random").cast(T.FloatType()))
english_pages = english_pages.withColumn("page_touched", F.to_timestamp("page_touched", 'yyyyMMddHHmmss'))
english_pages = english_pages.withColumn("page_links_updated", F.to_timestamp("page_links_updated", 'yyyyMMddHHmmss'))
english_pages = english_pages.withColumn("page_len", F.col("page_len").cast(T.IntegerType()))
english_pages.limit(20).show()

In [None]:
english_pages = english_pages.join(raw_english_topics, on="page_id", how="outer")
english_pages.limit(20).show()

In [None]:
# count page ids without a topic
print("total page ids", english_pages.select("page_id").distinct().count())
english_pages_ns0_non_redirect = english_pages.filter(
    (F.col("page_is_redirect") == 0)
    & (F.col("page_namespace") == 0)
)
print("total page ids in namespace 0 which are not redirects", english_pages_ns0_non_redirect.select("page_id").distinct().count())
print("page ids without topics:", english_pages.filter(F.col("topics1").isNull()).count())
print("page ids in namespace 0 without topics:", english_pages_ns0_non_redirect.filter(F.col("topics1").isNull()).count())

In [None]:
english_pages_with_topics = english_pages_ns0_non_redirect.filter(
    (F.col("topics1").isNotNull())
    & (F.col("topics2").isNotNull())
    & (F.col("topics3").isNotNull())
    & (F.col("topics4").isNotNull())
)
print("writing %d pages with topics" % (english_pages_with_topics.count()))
# english_pages_with_topics.write.format("parquet").mode("overwrite").save(f"../nvme/wikipedia_sql_dumps/enwiki/20211001/enwiki-20211001-page-topics-ns0-nonredirect.sql.parquet")

In [None]:
# find the number of categories in level 1, 2, 3, 4
test = spark.read.format("parquet").load(f"../nvme/wikipedia_sql_dumps/enwiki/20211001/enwiki-20211001-page-topics-ns0-nonredirect.sql.parquet")
test = test.withColumn("topics1", F.explode("topics1"))
test = test.withColumn("topics2", F.explode("topics2"))
test = test.withColumn("topics3", F.explode("topics3"))
test = test.withColumn("topics4", F.explode("topics4"))
test = test.select("topics1", "topics2", "topics3", "topics4")
print("level 1", test.select("topics1").distinct().count())
print("level 2", test.select("topics2").distinct().count())
print("level 3", test.select("topics3").distinct().count())
print("level 4", test.select("topics4").distinct().count())

In [None]:
langlinks_schema = T.StructType([
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("lang", T.StringType(), True),
    T.StructField("lang_title", T.StringType(), True),
])

en_langlinks = None
for dbname in language_dbnames:
    langlinks = spark.read.format("parquet") \
        .schema(langlinks_schema) \
        .load(f"../nvme/wikipedia_sql_dumps/{dbname}/20211001/{dbname}-20211001-langlinks.sql.parquet")
    langlinks = langlinks.filter(F.col("lang") == "en")
    langlinks = langlinks.withColumn('dbname', F.lit(dbname))
    if en_langlinks is None:
        en_langlinks = langlinks
    else:
        en_langlinks = en_langlinks.union(langlinks)

en_langlinks = en_langlinks \
    .filter(F.col("lang_title").isNotNull()) \
    .select("page_id", 'dbname', F.col("lang_title").alias("en_title"))
en_langlinks.limit(20).show()
en_langlinks.filter(F.col("dbname") == "enwiki").limit(20).show()

In [None]:
# en_langlinks.write.format("parquet").mode("overwrite").save("../nvme/wikipedia_sql_dumps/en_langlinks.parquet")

In [None]:
pageview_complete_src = Path("../hdd/pageview_complete")
pageview_complete_dest = Path("../nvme/pageview_complete_processed")
end_date = datetime.date(2021, 10, 1)

daily_pageview_files = []
for year in [2018]: # 2019, 2020, 2021]:
    daily_range = list(dl.date_range(
        datetime.date(year, 1, 1),
        datetime.date(year, 12, 31),
    ))
    
    daily_range = [d for d in daily_range if (end_date - d).total_seconds() > 0]
    daily_pageview_files += daily_range

daily_pageview_files = [datetime.date(2021, 4, 21), datetime.date(2021, 4, 22)]
daily_pageview_files = [
    (
        pageview_complete_src / Path("/".join(dl.wikimedia_pageview_complete_local_file(date, monthly=False))),
        pageview_complete_dest / Path("/".join(dl.wikimedia_pageview_complete_local_file(date, monthly=False))).with_suffix(".parquet"),
    )
    for date in daily_pageview_files
]
pprint(daily_pageview_files[:10])

In [None]:
alphabet = {c: i for i, c in enumerate(list("abcdefghijklmnopqrstuvwxyz"))}
assert len(alphabet) == 26

def hourly_pageviews_handler(s):
    # from 0 to 23, written as 0 = A, 1 = B ... 22 = W, 23 = X, e.g. F1I1
    ans = np.zeros(24)
    if s is not None and s is not np.nan:
        s = re.sub('[\s+]', '', s)
        parts = re.split('(\d+)',s)
        for i in range(0, len(parts)-1, 2):
            ans[alphabet[parts[i].lower()]] = parts[i+1]
    return ans.astype(int).tolist()

def hourly_coding(**coded):
    ans = np.zeros(24)
    for c, val in coded.items():
        try:
            ans[alphabet[c.lower()]] = int(val)
        except Exception:
            pass
    return ans.astype(int).tolist()

assert hourly_pageviews_handler("F234I12") == hourly_coding(F=234, I=12)

hourly_pageviews_udf = F.udf(hourly_pageviews_handler, T.ArrayType(T.IntegerType()))

In [None]:
pageview_schema = T.StructType([
    T.StructField("wiki_code", T.StringType(), True),
    T.StructField("page_title",T.StringType(), True),
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("user_client", T.StringType(), True),
    T.StructField("daily_total", T.IntegerType(), True),
    T.StructField("hourly_count", T.StringType(), True),
])

for daily_file, daily_processed_output_file in daily_pageview_files:
    start = time.time()
    
    if False and daily_processed_output_file.exists():
        print(f"using existing file {daily_processed_output_file}")
        continue
    
    df = spark.read.format("csv").options(delimiter=" ", header="false").schema(pageview_schema).load(str(daily_file))
    
    # combine daily pageviews for different user clients
    df = df\
        .filter((F.col("page_id").isNotNull()) & (F.col("wiki_code").isNotNull())) \
        .groupBy(["page_id", "page_title", "wiki_code"]) \
        .agg(F.sum("daily_total").alias("daily_total"))
    
    # df = df.withColumn("hourly_count", hourly_pageviews_udf(df['hourly_count']))
    
    df = df.join(languages, on="wiki_code", how="inner")
        
    # join the english lang title
    df = df.join(en_langlinks, on=["page_id", "dbname"], how="outer")
    
    # set en title to be the same for the english pages
    df = df.withColumn("en_title", F.when(df.dbname == "enwiki", df.page_title).otherwise(df.en_title))
    df = df.filter(F.col("en_title").isNotNull())
        
    # join the english page id using the english page title
    df = df.join(
        english_pages_with_topics.select(F.col("page_title").alias("en_title"), F.col("page_id").alias("en_page_id"), "topics1", "topics2", "topics3", "topics4"),
        on="en_title",
        how="inner",
    )
    
    df.write.format("parquet").mode("overwrite").partitionBy("group").save(str(daily_processed_output_file))
    print("wrote %s in %.2f minutes" % (daily_processed_output_file, (time.time() - start) / 60))