## Downloading the dataset

### Step 1: get the author id: work id pairs for the downstream path parameters.

I downloaded the index information csv file from the git repository of aozorabunko, where the frontend website code and suplimentary documents are stored. 

Then, I exstracted the author id and work id pairs from the document, as the source of the path parameters for downstream work. 

In [2]:
#Switch to the working directory containing the CSV files
import os
os.chdir('/Users/kawa/aozorabunko/index_pages')
print("Current working directory:", os.getcwd())

Current working directory: /Users/kawa/aozorabunko/index_pages


In [None]:


#Read the extended table and extract author ID and work ID pairs
import pandas as pd
df = pd.read_csv('list_person_all_extended_utf8.csv', dtype=str)
pairs = df[['人物ID', '作品ID']].drop_duplicates().reset_index(drop=True)
print(f"Total of {len(pairs)} （作者ID，作品ID）pairs，examples:\n", pairs.head())

Current working directory: /Users/kawa/aozorabunko/index_pages
Total of 19325 （作者ID，作品ID）pairs，examples:
      人物ID    作品ID
0  001257  059898
1  001257  056078
2  001257  060224
3  001257  060225
4  001257  060231


### Step 2: Testing out the real url for each work based on the author: work id pairs

strip leading zeros from work ID

In [5]:
#Check the full URLs and status codes for the first 10 entries
import requests
from time import sleep

UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/136.0.0.0 Safari/537.36"
headers = {"User-Agent": UA, "Referer": "https://www.aozora.gr.jp/"}

for author, work in pairs.head(5).values:
    work_nozero = str(int(work))
    page_url = f"https://www.aozora.gr.jp/cards/{author}/card{work_nozero}.html"
    resp = requests.get(page_url, headers=headers)
    print(f"{author}/{work} -> {page_url} (status {resp.status_code})")
    sleep(0.1)
    


001257/059898 -> https://www.aozora.gr.jp/cards/001257/card59898.html (status 200)
001257/056078 -> https://www.aozora.gr.jp/cards/001257/card56078.html (status 200)
001257/060224 -> https://www.aozora.gr.jp/cards/001257/card60224.html (status 200)
001257/060225 -> https://www.aozora.gr.jp/cards/001257/card60225.html (status 200)
001257/060231 -> https://www.aozora.gr.jp/cards/001257/card60231.html (status 200)


### Step 3: Download ZIP files using the correct URL format

As this workflow checks whether the document exists in the outpath, it could be safely interruptted and resumed until all the literature works within the scope of the author: work pairs got downloaded. 

In [None]:

import re

os.makedirs('zips', exist_ok=True)

for author, work in pairs.values:
    work_nozero = str(int(work))
    page_url = f"https://www.aozora.gr.jp/cards/{author}/card{work_nozero}.html"
    resp = requests.get(page_url, headers=headers)
    if resp.status_code != 200:
        print(f" Failed to access page for {author}/{work_nozero}: {resp.status_code} -> {page_url}")
        continue
    m = re.search(r'([0-9]+_ruby_[0-9]+\.zip)', resp.text)
    if not m:
        print(f"ZIP filename not found at {page_url} for {author}/{work_nozero}")
        continue
    zipname = m.group(1)
    outpath = os.path.join('zips', zipname)
    if not os.path.exists(outpath):
        zip_url = f"https://www.aozora.gr.jp/cards/{author}/files/{zipname}"
        print(f"Downloading ZIP: {zipname} from {zip_url}")
        with open(outpath, 'wb') as f:
            f.write(requests.get(zip_url, headers=headers).content)
        sleep(0.5)
    else:
        print(f"Already exists, skipping: {zipname}")

Already exists, skipping: 59898_ruby_70679.zip
Already exists, skipping: 56078_ruby_51155.zip
Already exists, skipping: 60224_ruby_73172.zip
Already exists, skipping: 60225_ruby_74180.zip
Already exists, skipping: 60231_ruby_74588.zip
Already exists, skipping: 60232_ruby_74587.zip
Already exists, skipping: 56033_ruby_50649.zip
Already exists, skipping: 60226_ruby_74760.zip
Already exists, skipping: 60233_ruby_74396.zip
Already exists, skipping: 60357_ruby_75861.zip
Already exists, skipping: 46658_ruby_44679.zip
Already exists, skipping: 60227_ruby_75260.zip
Already exists, skipping: 60228_ruby_75259.zip
Already exists, skipping: 60229_ruby_72965.zip
Already exists, skipping: 53680_ruby_69540.zip
Already exists, skipping: 60230_ruby_72966.zip
Already exists, skipping: 54333_ruby_67471.zip
Already exists, skipping: 46340_ruby_24806.zip
Already exists, skipping: 46511_ruby_25555.zip
ZIP filename not found at https://www.aozora.gr.jp/cards/001245/card57975.html for 001245/57975
ZIP filenam

In [6]:
from pathlib import Path
import zipfile

# 当前 Notebook 的工作目录
BASE_DIR = Path.cwd()
ZIP_DIR = BASE_DIR / "zips"

# 初始化大小统计
total_zip_size = 0
total_unzipped_size = 0
error_count = 0

print(f"📁 正在扫描文件夹：{ZIP_DIR.resolve()}")

# 遍历所有 .zip 文件
for zip_path in ZIP_DIR.glob("*.zip"):
    try:
        total_zip_size += zip_path.stat().st_size
        with zipfile.ZipFile(zip_path, 'r') as zf:
            total_unzipped_size += sum(z.file_size for z in zf.infolist())
    except Exception as e:
        error_count += 1
        print(f"⚠️ 错误：{zip_path.name} → {e}")

# 输出统计信息
print(f"\n📦 压缩总大小：{total_zip_size / (1024**2):.2f} MB")
print(f"🗂️ 解压预估总大小：{total_unzipped_size / (1024**2):.2f} MB")
print(f"❌ 跳过出错文件：{error_count}")


📁 正在扫描文件夹：/Users/kawa/aozorabunko/index_pages/zips

📦 压缩总大小：371.44 MB
🗂️ 解压预估总大小：656.69 MB
❌ 跳过出错文件：0


### Step 4 Unziping the downloaded file

In [8]:
import os, glob, zipfile
from tqdm import tqdm
from pathlib import Path

# 设置路径
ZIP_DIR = Path("zips")
TEXT_DIR = Path("texts")
TEXT_DIR.mkdir(exist_ok=True)

# 获取所有 .zip 文件
zip_files = list(ZIP_DIR.glob("*.zip"))
total = len(zip_files)

# 统计变量
success_count = 0
fail_count = 0
failed_files = []

# 解压带进度条
for zip_path in tqdm(zip_files, desc="📦 解压 ZIP 文件中", unit="file"):
    zip_basename = zip_path.stem
    target_dir = TEXT_DIR / zip_basename
    target_dir.mkdir(exist_ok=True)

    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(target_dir)
        success_count += 1
    except zipfile.BadZipFile:
        print(f"❌ 跳过无效 ZIP 文件：{zip_path.name}")
        fail_count += 1
        failed_files.append(zip_path.name)
    except Exception as e:
        print(f"⚠️ 其他错误：{zip_path.name} → {e}")
        fail_count += 1
        failed_files.append(f"{zip_path.name} ({e})")

# 输出总结
print("\n✅ 解压完毕")
print(f"✔️ 成功解压：{success_count}")
print(f"❌ 失败或损坏：{fail_count}")

# 保存报错日志
if failed_files:
    with open("failed_zip_log.txt", "w", encoding="utf-8") as f:
        for name in failed_files:
            f.write(name + "\n")
    print("📝 错误列表已保存至 failed_zip_log.txt")


📦 解压 ZIP 文件中: 100%|██████████| 17206/17206 [00:11<00:00, 1560.06file/s]


✅ 解压完毕
✔️ 成功解压：17206
❌ 失败或损坏：0





In [9]:
from pathlib import Path

TEXT_DIR = Path("texts")
subdirs = [d for d in TEXT_DIR.iterdir() if d.is_dir()]

print(f"📁 解压后的子文件夹数量：{len(subdirs)}")


📁 解压后的子文件夹数量：17206


## DataCleaning

Step 1: Decoding with Shift-JIS and with unrecognized bytes preserved

- the right codec of my raw text: Shift-JIS
- Decode with errors="surrogateescape" so that no bytes are silently dropped. 

Step 2: Post-process: Unicode normalization

> Unicode normalization is the decomposition and composition of characters. Some Unicode characters have the same appearance but multiple representations. For example, "â" can be represented as one code point for "â" (U+00E2) (precomposed character), and two decomposed code points for "a" (U+0061) and " ̂" (U+0302) (combining character sequence (CCS)). 

>Canonical equivalence normalizes while preserving visually and functionally equivalent characters. e.g. "â" <-> "a" + " ̂"

>Compatibility equivalence normalizes characters that have different semantic shapes. e.g. "ﬁ" -> "f" + "i"

NFC: 
  Collapses any decomposed kana+marks into single codepoints, making your downstream tokenization and matching reliable.
  Preserves any raw surrogates so I can round-trip back to the original bytes if needed to.

In [9]:
import unicodedata
from pathlib import Path

In [10]:
# python's return-type annotation
# def name(parameter: type hint) -> return type hint:

def decode_shift_jis_variant(raw: bytes) -> str:
    return raw.decode('shift_jis', errors='surrogateescape')
def normalize(text: str) -> str:
    return unicodedata.normalize('NFC', text)

In [11]:
def process_file(
    input_path: Path,
    output_path: Path,
    *,
    decode_fn=decode_shift_jis_variant,
    normalize_fn=normalize
):
    raw = input_path.read_bytes()
    decoded = decode_fn(raw)
    normalized = normalize_fn(decoded)
    output_bytes = normalized.encode('utf-8', errors='surrogateescape')
    output_path.write_bytes(output_bytes)
    print(f" {input_path.name} → {output_path.name}")

In [12]:
INPUT_DIR = Path("texts")      
OUTPUT_DIR = Path("normalized")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

for txt_file in INPUT_DIR.rglob("*.txt"):
    relative_path = txt_file.relative_to(INPUT_DIR)
    out_file = OUTPUT_DIR / relative_path
    out_file.parent.mkdir(parents=True, exist_ok=True)
    process_file(txt_file, out_file)


 nenmatsuno_ichinichi.txt → nenmatsuno_ichinichi.txt
 chikagorono_yurei.txt → chikagorono_yurei.txt
 toyono_aki.txt → toyono_aki.txt
 rip_van_winkle.txt → rip_van_winkle.txt
 sato_haruo_shi.txt → sato_haruo_shi.txt
 ababababa.txt → ababababa.txt
 hoonki.txt → hoonki.txt
 uogasi.txt → uogasi.txt
 otomino_teiso.txt → otomino_teiso.txt
 christmas_eve.txt → christmas_eve.txt
 basho_zakki.txt → basho_zakki.txt
 kabocha.txt → kabocha.txt
 kurisumasu_ibu.txt → kurisumasu_ibu.txt
 ryoya.txt → ryoya.txt
 gesaku_zanmai.txt → gesaku_zanmai.txt
 zasshin_issoku.txt → zasshin_issoku.txt
 harunohino_sashita.txt → harunohino_sashita.txt
 nikko_shohin.txt → nikko_shohin.txt
 bokuno_tomodachi_nisannin.txt → bokuno_tomodachi_nisannin.txt
 eikyuni_fuyukaina.txt → eikyuni_fuyukaina.txt
 haha.txt → haha.txt
 kikuno_newake.txt → kikuno_newake.txt
 uwagoto.txt → uwagoto.txt
 kosei.txt → kosei.txt
 umano_ashi.txt → umano_ashi.txt
 sarukani_gassen.txt → sarukani_gassen.txt
 bungeitekina_amarini.txt → bungeiteki

In [None]:
import difflib
import unicodedata
from pathlib import Path
import glob

def check_file(path: Path):
    raw = path.read_bytes()
    decoded = raw.decode('cp932', errors='surrogateescape')
    normalized = unicodedata.normalize('NFC', decoded)
    
    issues = []
    
    surrogates = [(i, ch, raw[i]) for i, ch in enumerate(decoded) if '\uDC80' <= ch <= '\uDCFF']
    if surrogates:
        issues.append(f" {path.name} contains {len(surrogates)}  surrogateescape:")
        for idx, ch, byte in surrogates[:5]:
            issues.append(f"  - index={idx}, raw byte=0x{byte:02X}")
    
    diffs = list(difflib.ndiff(decoded, normalized))
    changes = [(d[0], d[2]) for d in diffs if d[0] in ('-', '+')]
    if changes:
        issues.append(f"ℹ {path.name} NFC produced {len(changes)//2} character differences:")
        for symbol, ch in changes[:10]:
            issues.append(f"  {symbol} '{ch}' (U+{ord(ch):04X})")
    
    if issues:
        print("\n".join(issues))
    else:
        print(f"{path.name} No surrogateescape nor NFC")

TEXT_DIR = Path("texts")
files = list(TEXT_DIR.rglob("*.txt"))

for path in files: 
    check_file(path)




nenmatsuno_ichinichi.txt No surrpgateescape nor NFC
chikagorono_yurei.txt No surrpgateescape nor NFC
toyono_aki.txt No surrpgateescape nor NFC
rip_van_winkle.txt No surrpgateescape nor NFC
sato_haruo_shi.txt No surrpgateescape nor NFC
ababababa.txt No surrpgateescape nor NFC
hoonki.txt No surrpgateescape nor NFC
uogasi.txt No surrpgateescape nor NFC
otomino_teiso.txt No surrpgateescape nor NFC
christmas_eve.txt No surrpgateescape nor NFC
basho_zakki.txt No surrpgateescape nor NFC
kabocha.txt No surrpgateescape nor NFC
kurisumasu_ibu.txt No surrpgateescape nor NFC
ryoya.txt No surrpgateescape nor NFC
gesaku_zanmai.txt No surrpgateescape nor NFC
zasshin_issoku.txt No surrpgateescape nor NFC
harunohino_sashita.txt No surrpgateescape nor NFC
nikko_shohin.txt No surrpgateescape nor NFC
bokuno_tomodachi_nisannin.txt No surrpgateescape nor NFC
eikyuni_fuyukaina.txt No surrpgateescape nor NFC
haha.txt No surrpgateescape nor NFC
kikuno_newake.txt No surrpgateescape nor NFC
uwagoto.txt No surrpg

Step 3: Mapping Aozora's Gaiji(external characters)

- Situation: Aozora Bunko doesn’t embed external characters (Gaiji) directly 
  What they do: insert markup language like［＃「鬱」を正しく表示するために外
字置換］and supply a separate gaiji ZIP with image files or numeric‐entity tables. 

- My approach: parse those bracketed notes and replace them with either: 
  The Unicode code point they correspond to (if one exists), or A custom U+E0xx PUA mapping. 


Ideal result after all 3 steps

a clean, fully-Unicode(UTF-8 specifically) string with no hidden encoding errors or dropped bytes.

In [14]:
from pathlib import Path
import re
import csv

AOZORA_ROOT = Path("texts")
pattern = re.compile(r'［＃(.+?)］')

return_marks = {
    "下","上","中","天","地","人","レ","一レ","上レ",
    "一","二","三","四","五","六","七","八","九","十",
    "甲","乙","丙","丁","元","亨","利","貞","乾","坤"
}

def classify_annotation(annotation: str, has_header_return: bool) -> str:
    if not has_header_return:
        return '入力者注'
    return '返り点' if annotation in return_marks else '入力者注'


rows = []

for txt_file in AOZORA_ROOT.rglob("*.txt"):
    rel_path = txt_file.relative_to(AOZORA_ROOT)

    dashed = 0
    header_lines = []
    with txt_file.open('r', encoding='shift_jis', errors='ignore') as f:
        for line in f:
            if line.startswith("----"):
                dashed += 1
                if dashed == 2:
                    break
                else:
                    continue
            if dashed == 1:
                header_lines.append(line)

    has_header_return = any("返り点" in hl for hl in header_lines)

    dashed = 0
    process = False
    with txt_file.open('r', encoding='shift_jis', errors='ignore') as f:
        for lineno, line in enumerate(f, start=1):
            if not process:
                if line.startswith("----"):
                    dashed += 1
                    if dashed == 2:
                        process = True
                continue

            for m in pattern.finditer(line):
                col = m.start() + 1
                annotation = m.group(1)
                mark_type = classify_annotation(annotation, has_header_return)

                rows.append({
                    'file': str(rel_path),
                    'line': lineno,
                    'column': col,
                    'annotation': annotation,
                    'type': mark_type
                })

out_csv = 'commenting_index.csv'
with open(out_csv, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(
        csvfile,
        fieldnames=['file', 'line', 'column', 'annotation', 'type']
    )
    writer.writeheader()
    writer.writerows(rows)

print(f"Generated {out_csv}, indexed a total of {len(rows)} annotations. ")


Generated commenting_index.csv, indexed a total of 5943 annotations. 


In [15]:
import pandas as pd
import re

df = pd.read_csv('commenting_index.csv', encoding='utf-8')

comments = {
    "字上げ", "字下げ", "終わり","小書き", "ママ", "傍点", "１段階", "見出し",
    "縦中横", "傍線", "底本では", "注記", "斜体", "太字", "感嘆符", "括弧",
    "改ページ", "横組み", "改段", "ここから", "上がり", "地付き", "レ"
}

df_input = df[df['type'] == '入力者注'].copy()

pattern = "|".join(re.escape(kw) for kw in comments)
text_pattern = re.compile(pattern)

def classify_annotation(txt: str) -> str:
    return '' if text_pattern.search(txt) else '外字の説明'

df_input['category'] = df_input['annotation'].astype(str).apply(classify_annotation)

out_file = 'commenting_index_classified.csv'
df_input.to_csv(out_file, index=False, encoding='utf-8')

print(f"Generated {out_file}，processed a total of {len(df_input)}「入力者注」。")


Generated commenting_index_classified.csv，processed a total of 4875「入力者注」。


In [2]:
from pathlib import Path
import re
import csv

AOZORA_ROOT = Path("texts") 
pattern = re.compile(r'※［＃(.+?)］')

rows = []
file_count = 0

for txt_file in AOZORA_ROOT.rglob("*.txt"):
    file_count += 1
    rel_path = txt_file.relative_to(AOZORA_ROOT)
    with txt_file.open('r', encoding='shift_jis', errors='ignore') as f:
        for lineno, line in enumerate(f, start=1):
            for m in pattern.finditer(line):
                col = m.start() + 1
                annotation = m.group(1)
                rows.append({
                    'file': str(rel_path),
                    'line': lineno,
                    'column': col,
                    'annotation': annotation
                })

out_csv = 'gaiji_index.csv'
with open(out_csv, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(
        csvfile,
        fieldnames=['file', 'line', 'column', 'annotation']
    )
    writer.writeheader()
    writer.writerows(rows)

print(f" Total files scanned: {file_count}")
print(f"Generated {out_csv}, processed a total of {len(rows)} Gaiji Annotation.")


 Total files scanned: 17205
Generated gaiji_index.csv, processed a total of 58330 Gaiji Annotation.


In [4]:
from pathlib import Path
import re
import csv

AOZORA_ROOT = Path("texts") 
pattern = re.compile(r'※［＃(.+?)］', re.DOTALL)  # 支持跨行注记

rows = []
file_count = 0

for txt_file in AOZORA_ROOT.rglob("*.txt"):
    file_count += 1
    rel_path = txt_file.relative_to(AOZORA_ROOT)
    content = txt_file.read_text(encoding='shift_jis', errors='ignore')

    for m in pattern.finditer(content):
        start_pos = m.start()
        end_pos = m.end()
        annotation = m.group(1)

        # 计算所在行号和列号
        before = content[:start_pos]
        line_number = before.count("\n") + 1
        col_number = start_pos - before.rfind("\n")

        rows.append({
            'file': str(rel_path),
            'line': line_number,
            'column': col_number,
            'annotation': annotation,
            'start_pos': start_pos,
            'end_pos': end_pos
        })

# 写出带位置索引的 CSV
out_csv = 'gaiji_index_with_span.csv'
with open(out_csv, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(
        csvfile,
        fieldnames=['file', 'line', 'column', 'annotation', 'start_pos', 'end_pos']
    )
    writer.writeheader()
    writer.writerows(rows)

print(f" Total files scanned: {file_count}")
print(f" Generated {out_csv}, processed a total of {len(rows)} Gaiji Annotation.")


 Total files scanned: 17205
 Generated gaiji_index_with_span.csv, processed a total of 58330 Gaiji Annotation.


In [6]:
from pathlib import Path
import re
import json

AOZORA_ROOT = Path("texts")
pattern = re.compile(r'※［＃(.+?)］')

# 匹配 Unicode 码位（U+XXXX）
utf16_pat = re.compile(r"U\+[0-9A-Fa-f]{4,6}")

# 匹配 JIS 编码水准（如「第3水準 1-85-5」）
jis_pat = re.compile(r"第 ?[1-4]水準 ?[0-9０-９\-ー－− ]+")

results = []
file_count = 0

for txt_file in AOZORA_ROOT.rglob("*.txt"):
    file_count += 1
    rel_path = txt_file.relative_to(AOZORA_ROOT)

    text = txt_file.read_text(encoding="shift_jis", errors="ignore")

    for idx, match in enumerate(pattern.finditer(text)):
        full_annotation = match.group(1)
        start = match.start()
        end = match.end()

        parts = [p.strip() for p in full_annotation.split("、")]
        description = parts[0] if len(parts) > 0 else None
        code = parts[1] if len(parts) > 1 else None
        location = parts[2] if len(parts) > 2 else None

        code_type = None
        if code:
            if utf16_pat.search(code):
                code_type = "utf-16"
            elif jis_pat.search(code):
                code_type = "JIS-level"

        item = {
            "id": len(results),
            "file": str(rel_path),
            "start_pos": start,
            "end_pos": end,
            "annotation": full_annotation,
            "description": description,
        }
        if code:
            item["code"] = code
        if code_type:
            item["code_type"] = code_type
        if location:
            item["location"] = location

        results.append(item)

# 输出为 JSON 文件
output_path = Path("gaiji_index.json")
output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8")

print(f" Total files scanned: {file_count}")
print(f" Total annotations extracted: {len(results)}")
print(f" Output written to: {output_path}")


 Total files scanned: 17205
 Total annotations extracted: 58330
 Output written to: gaiji_index.json
