In [12]:
!wget -c -O /lakehouse/default/Files/jawiki/jawiki-latest-pages-articles.xml.bz2 \
  https://dumps.wikimedia.org/jawiki/latest/jawiki-latest-pages-articles.xml.bz2

StatementMeta(, bb7f1636-36e1-4bc4-8583-74cb306edd28, 14, Submitted, Running, Running)

--2026-01-07 09:40:49--  https://dumps.wikimedia.org/jawiki/latest/jawiki-latest-pages-articles.xml.bz2
Resolving dumps.wikimedia.org... 208.80.154.71, 2620:0:861:3:208:80:154:71
Connecting to dumps.wikimedia.org|208.80.154.71|:443... connected.
HTTP request sent, awaiting response... 206 Partial Content
Length: 4550114280 (4.2G), 4111809512 (3.8G) remaining [application/octet-stream]
Saving to: ‘/lakehouse/default/Files/jawiki/jawiki-latest-pages-articles.xml.bz2’


2026-01-07 10:00:45 (3.28 MB/s) - ‘/lakehouse/default/Files/jawiki/jawiki-latest-pages-articles.xml.bz2’ saved [4550114280/4550114280]



In [16]:
pages = (
    spark.read
        .format("com.databricks.spark.xml")
        .option("rowTag", "page")
        .load("Files/jawiki/jawiki-latest-pages-articles.xml.bz2")
)

StatementMeta(, bb7f1636-36e1-4bc4-8583-74cb306edd28, 18, Finished, Available, Finished)

In [17]:
%%sql
create schema if not exists wiki

StatementMeta(, bb7f1636-36e1-4bc4-8583-74cb306edd28, 19, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [21]:
(
    pages
        .repartition(200)
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("wiki.raw_pages")
)

StatementMeta(, bb7f1636-36e1-4bc4-8583-74cb306edd28, 23, Finished, Available, Finished)

In [38]:
%%sql
drop table if exists wiki.parsed_raw_pages;

create table wiki.parsed_raw_pages as (
    SELECT
    -- page レベル
    id                          AS page_id,
    ns                          AS ns,
    title                       AS title,

    -- redirect
    redirect._VALUE             AS redirect_value,
    redirect._title             AS redirect_title,

    -- revision 基本
    revision.id                 AS revision_id,
    revision.timestamp          AS revision_timestamp,
    revision.model              AS revision_model,
    revision.format             AS revision_format,
    revision.sha1               AS revision_sha1,
    revision.parentid           AS revision_parent_id,
    revision.origin             AS revision_origin,
    revision.minor              AS revision_minor,

    -- comment
    revision.comment._VALUE     AS comment,
    revision.comment._deleted   AS comment_deleted,

    -- contributor
    revision.contributor.id       AS contributor_id,
    revision.contributor.username AS contributor_username,
    revision.contributor.ip       AS contributor_ip,
    revision.contributor._deleted AS contributor_deleted,

    -- text
    revision.text._VALUE        AS text,
    revision.text._bytes        AS text_bytes,
    revision.text._sha1         AS text_sha1,
    revision.text.`_xml:space`    AS text_xml_space

FROM wiki.raw_pages
)

StatementMeta(, bb7f1636-36e1-4bc4-8583-74cb306edd28, 42, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [1]:
%%sql
select * from wiki.parsed_raw_pages where ns=0

StatementMeta(, d68042dd-d6dd-4725-8e2b-f7c472b677fb, 2, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 23 fields>

In [32]:
import re
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StringType

# H2（== ... ==）だけを拾う（H3以上は除外）
H2_RE = re.compile(r"(?m)^\s*==(?!=)\s*(.+?)\s*==(?!=)\s*$")

def split_mediawiki_text_h2_only(text: str) -> list[str]:
    if not isinstance(text, str) or not text:
        return []

    matches = list(H2_RE.finditer(text))

    # H2が無ければ全文を1セクション
    if not matches:
        return [text]

    sections: list[str] = []

    # lead（最初のH2より前）
    lead = text[:matches[0].start()].strip()
    if lead:
        sections.append(lead)

    # 各H2ブロック
    for i, m in enumerate(matches):
        start = m.start()
        end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
        block = text[start:end].strip()
        if block:
            sections.append(block)

    return sections

# pandas UDF：array<string>
@pandas_udf(ArrayType(StringType()))
def split_mediawiki_text_h2_only_udf(col: pd.Series) -> pd.Series:
    return col.fillna("").map(split_mediawiki_text_h2_only)

# SQLから呼べるように登録
spark.udf.register("split_mediawiki_h2", split_mediawiki_text_h2_only_udf)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 44, Finished, Available, Finished)

<pyspark.sql.udf.UserDefinedFunction at 0x76ffa516bd10>

In [33]:
%%sql
drop table if exists wiki.raw_sections;

create table wiki.raw_sections as (
    select
        page_id,
        posexplode(split_mediawiki_h2(text)) as (pos, text)
    from
        wiki.parsed_raw_pages
    where 
        ns = 0
        and text is not null
        and redirect_title is null
)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 46, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [34]:
import re
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

H2_LINE_RE = re.compile(r"(?m)^\s*==(?!=)\s*(.+?)\s*==(?!=)\s*$")

def parse_h2_title(section_text: str) -> str | None:
    if not isinstance(section_text, str) or not section_text:
        return None
    m = H2_LINE_RE.search(section_text)
    return m.group(1).strip() if m else None

@pandas_udf(StringType())
def parse_header_pandas(col: pd.Series) -> pd.Series:
    # None/NaNを安全に
    return col.fillna("").map(parse_h2_title)

spark.udf.register("parse_header", parse_header_pandas)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 47, Finished, Available, Finished)

<pyspark.sql.udf.UserDefinedFunction at 0x76ffa5168710>

In [35]:
def render_mediawiki(x: str) -> str:
    try:
        return mwparserfromhell.parse(x).strip_code()
    except:
        return None

@pandas_udf(StringType())
def render_pandas(col: pd.Series) -> pd.Series:
    return col.map(render_mediawiki)

spark.udf.register("render", render_pandas)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 48, Finished, Available, Finished)

<pyspark.sql.udf.UserDefinedFunction at 0x76ffa5161710>

In [36]:
%%sql
drop table if exists wiki.sections;

create table wiki.sections as (
    select
        row_number() over(order by page_id, pos) as section_id,
        page_id,
        pos as section_order,
        parse_header(text) as header,
        render(text) as plaintext_body,
        text as mediawiki_body
    from wiki.raw_sections
)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 50, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [19]:
%%sql
drop table if exists wiki.pages;

create table wiki.pages as (
    select
        page_id,
        title
    from
        wiki.parsed_raw_pages
    where 
        ns = 0
        and text is not null
        and redirect_title is null
)

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 28, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [37]:
%%sql
select * from wiki.sections

StatementMeta(, 99ff5700-456d-445c-bfbe-2a060cf3140d, 51, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 6 fields>

In [1]:
# --- Category抽出 UDF（pandas_udf）: 1セル貼り付け用 ---
import re
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StringType

# [[Category:xxx]] / [[Category:xxx|*]] / [[カテゴリ:xxx]] も拾う
CATEGORY_RE = re.compile(r"\[\[\s*(?:Category|カテゴリ)\s*:\s*([^\]|]+)", re.IGNORECASE)

def extract_categories(text: str) -> list[str]:
    if not isinstance(text, str) or not text:
        return []
    # 重複除去（順序維持）
    seen = set()
    out = []
    for name in CATEGORY_RE.findall(text):
        name = name.strip()
        if name and name not in seen:
            seen.add(name)
            out.append(name)
    return out

@pandas_udf(ArrayType(StringType()))
def extract_categories_udf(col: pd.Series) -> pd.Series:
    return col.fillna("").map(extract_categories)

# Spark SQL から呼べるように登録
spark.udf.register("extract_categories", extract_categories_udf)


StatementMeta(, e77a4863-0347-49ad-bd2f-c11eef83ad11, 5, Finished, Available, Finished)

<pyspark.sql.udf.UserDefinedFunction at 0x7a8a30a53f90>

In [5]:
%%sql
drop table if exists wiki.categories;

create table wiki.categories as (
    with t as (
        select distinct
            explode(extract_categories(text)) category_name
        from 
            wiki.parsed_raw_pages
        where 
            ns = 0
            and text is not null
            and redirect_title is null
    )
    select
        row_number() over(order by category_name) as category_id,
        category_name
    from t
)


StatementMeta(, e77a4863-0347-49ad-bd2f-c11eef83ad11, 10, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [7]:
%%sql
select * from wiki.categories order by category_id;

StatementMeta(, e77a4863-0347-49ad-bd2f-c11eef83ad11, 12, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 2 fields>

In [10]:
%%sql
drop table if exists wiki.page_categories;

create table wiki.page_categories as (
    with t as (
        select distinct
            page_id,
            explode(extract_categories(text)) category_name
        from 
            wiki.parsed_raw_pages
        where 
            ns = 0
            and text is not null
            and redirect_title is null
    )
    select
        l.page_id,
        r.category_id
    from
        t as l
    left join 
        wiki.categories as r
    on l.category_name = r.category_name
    order by page_id, category_id
)

StatementMeta(, e77a4863-0347-49ad-bd2f-c11eef83ad11, 16, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [13]:
%%sql
drop table if exists wiki.corpus;

create table wiki.corpus as (
    with t as (
        select
            l.page_id,
            collect_list(r.category_name) as category_array
        from
            wiki.page_categories as l
        inner join 
            wiki.categories as r
        on
            l.category_id = r.category_id
        group by
            page_id
    )
    select
        l.page_id,
        l.title,
        r1.section_id,
        r1.section_order,
        r1.header as section_header,
        r1.plaintext_body as body,
        r2.category_array
    from
        wiki.pages as l
    inner join
        wiki.sections as r1
    on
        l.page_id = r1.page_id
    inner join 
        t as r2
    on
        l.page_id = r2.page_id

)

StatementMeta(, e77a4863-0347-49ad-bd2f-c11eef83ad11, 22, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [4]:
(
    spark.sql("SELECT * FROM wiki.corpus")
        .repartition(2**12)
        .write
        .mode("overwrite")
        .json("Files/jawiki/json/v1")
)


StatementMeta(, 7abccd37-d344-4904-9a7d-c0e7f784e2fb, 8, Finished, Available, Finished)