In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
from pathlib import Path
import re
from tqdm import tqdm
from typing import List, Optional

BASE_DIR = Path(
    "/local/scratch/group/guldigroup/climate_change/wiki_history_rosie/date_tagging_pipeline/"
    "tagged_output_2025-09-20_deduped"
)
SUBFOLDERS = [
    "historical_objects_tagged",
    "history_of_ideologies_tagged",
    "history_of_sports_tagged",
]

OUT_DIR = Path.home() / "Desktop"

MIN_YEAR = -9999
MAX_YEAR = 2050

DATE_TAG_RE = re.compile(r"<\s*date\s*>(.*?)<\s*/\s*date\s*>", re.IGNORECASE)
YEAR_BC_RE = re.compile(r"\b(\d{1,4})\s*(BC|BCE)\b", re.IGNORECASE)
YEAR_AD_RE = re.compile(r"\b(\d{1,4})(?:\s*(AD|CE))?\b", re.IGNORECASE)
DATE_FULL_RE = re.compile(r"\b(\d{1,2})\s+[A-Za-z]+\s+(\d{2,4})\b")
ISO_DATE_RE = re.compile(r"\b(\d{4})-(\d{2})-(\d{2})\b")

YEAR_SPAN_BC_AD = re.compile(r"\d+\s*(BC|BCE)\s*(?:-|–|to)\s*\d+\s*(AD|CE)", re.IGNORECASE)
YEAR_SPAN_AD_BC = re.compile(r"\d+\s*(AD|CE)\s*(?:-|–|to)\s*\d+\s*(BC|BCE)", re.IGNORECASE)
YEAR_SPAN_BC_BC = re.compile(r"\d+\s*(BC|BCE)\s*(?:-|–|to)\s*\d+\s*(BC|BCE)", re.IGNORECASE)
YEAR_SPAN_AD_AD = re.compile(r"\d+\s*(AD|CE)\s*(?:-|–|to)\s*\d+\s*(AD|CE)", re.IGNORECASE)
YEAR_SPAN_GENERAL = re.compile(r"\b\d{1,4}\s*(?:-|–|to)\s*\d{1,4}\b")

DECADE_RE = re.compile(r"\b(\d{3,4})0s\b|\b'\d{2}s\b", re.IGNORECASE)
CENTURY_RE = re.compile(r"\b\d{1,2}(st|nd|rd|th)?\s+century\b", re.IGNORECASE)
DYNASTY_RE = re.compile(r"\b(?:han|qin|tang|song|yuan|ming|qing|tokugawa|edo|meiji|taisho|showa|heisei|reiwa)\b", re.IGNORECASE)


def extract_years(s: str, stats: dict) -> List[int]:
    if not s:
        return []

    stats["total_tags"] += 1
    s = re.sub(r"(\d),(?=\d{3}\b)", r"\1", s)

    big_num_matches = re.findall(r"\b\d{5,}\b", s)
    if big_num_matches:
        stats["big_numbers_removed"] += len(big_num_matches)
        s = re.sub(r"\b\d{5,}\b", "", s)

    years = []
    skip_nums = set()
    matched_iso = False

    if (
        YEAR_SPAN_BC_AD.search(s)
        or YEAR_SPAN_AD_BC.search(s)
        or YEAR_SPAN_BC_BC.search(s)
        or YEAR_SPAN_AD_AD.search(s)
        or (YEAR_SPAN_GENERAL.search(s) and not ISO_DATE_RE.search(s))
    ):
        stats["span_dropped"] += 1
        return []

    if DECADE_RE.search(s):
        stats["decade_dropped"] += 1
        return []

    if CENTURY_RE.search(s):
        stats["century_dropped"] += 1
        return []

    if DYNASTY_RE.search(s):
        stats["dynasty_dropped"] += 1
        return []

    for y, m, d in ISO_DATE_RE.findall(s):
        years.append(int(y))
        matched_iso = True

    if not matched_iso:
        for _, year in DATE_FULL_RE.findall(s):
            y = int(year)
            if y > 32:
                years.append(y)

    for y, era in YEAR_BC_RE.findall(s):
        val = int(y)
        years.append(-val)
        skip_nums.add(val)

    if not matched_iso:
        for y, era in YEAR_AD_RE.findall(s):
            val = int(y)
            if era and era.upper() in ("AD", "CE"):
                years.append(val)
            else:
                if val not in skip_nums:
                    years.append(val)

    years = [y for y in years if MIN_YEAR <= y <= MAX_YEAR]
    if years:
        return [max(years, key=lambda x: abs(x))]
    return []


def extract_dates(cell: Optional[str], stats: dict) -> List[int]:
    if not isinstance(cell, str) or not cell:
        return []
    cell = cell.replace(",", "")
    tags = DATE_TAG_RE.findall(cell)
    out = []
    for tag in tags:
        year = extract_years(tag, stats)
        out.extend(year)
    return out


def process_subfolder(subfolder: str):
    stats = {
        "span_dropped": 0,
        "big_numbers_removed": 0,
        "decade_dropped": 0,
        "century_dropped": 0,
        "dynasty_dropped": 0,
        "total_tags": 0,
    }

    folder_path = BASE_DIR / subfolder
    all_csvs = list(folder_path.rglob("*.csv"))
    results = {}
    total_years = 0

    for csv_file in tqdm(all_csvs, desc=f"Scanning {subfolder}", unit="file"):
        try:
            df = pd.read_csv(csv_file, low_memory=False, usecols=["filename", "date_tagged"])
        except:
            continue
        if df.empty:
            continue

        for _, row in df.iterrows():
            years = extract_dates(str(row["date_tagged"]), stats)
            if not years:
                continue
            fname = row["filename"]
            results.setdefault(fname, []).extend(years)
            total_years += len(years)

    final_rows = []
    for fname, years in results.items():
        final_rows.append({"filename": fname, "parsed_years": ", ".join(map(str, years))})

    df_final = pd.DataFrame(final_rows)
    outfile = OUT_DIR / f"parsed_years_{subfolder}.csv"
    df_final.to_csv(outfile, index=False, encoding="utf-8", quoting=1)

    print(f"\n✅ {subfolder} done → {len(results)} files, total {total_years} years, output: {outfile}")
    print("=== Cleaning stats ===")
    print(f"Total tags: {stats['total_tags']}")
    print(f"Spans dropped: {stats['span_dropped']}")
    print(f"Decades dropped: {stats['decade_dropped']}")
    print(f"Centuries dropped: {stats['century_dropped']}")
    print(f"Dynasties dropped: {stats['dynasty_dropped']}")


def main():
    for sub in SUBFOLDERS:
        process_subfolder(sub)


def _test_extract():
    test_stats = {
        "span_dropped": 0,
        "big_numbers_removed": 0,
        "decade_dropped": 0,
        "century_dropped": 0,
        "dynasty_dropped": 0,
        "total_tags": 0,
    }
    samples = [
        "<DATE>2012-02-29</DATE>",
        "<DATE>2000-3000</DATE>",
        "<DATE>2000 BC–3000 AD</DATE>",
        "<DATE>4000-2000 BCE</DATE>",
        "<DATE>3100-2890 BC</DATE>",
        "<DATE>9500 BC to 1800 AD</DATE>",
        "<DATE>1980s</DATE>",
        "<DATE>5th century BC</DATE>",
        "<DATE>Han dynasty</DATE>",
        "<DATE>4,000 BC</DATE>",
        "<DATE>3035 BC to 2890 BC</DATE>",
    ]
    for s in samples:
        years = extract_years(s, test_stats)
        print(f"Input: {s} → Parsed: {years}")
    print("\nStats:", test_stats)


if __name__ == "__main__":
    _test_extract()
main()