In [1]:
%matplotlib qt

In [6]:
# parser_elarge_fallback.py
import re, time
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
import fitz  # PyMuPDF
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ---------------- SETTINGS ----------------
INPUT_CSV  = "TDK_E_and_Toroid_Cores_no_catalog.csv"
ID_COL     = "Part No."
URL_COL    = "Catalog / Data Sheet"
AL_COL     = "AL Value / (nH / N²)"
BS_COL     = "Saturation Magnetic Flux Density Bs / mT"
GAP_COL    = "Air Gap / mm"
MUI_COL    = "Initial Permeability μi"
MAT_COL    = "Material Name"
SHAPE_COLS = ["Core Shape", "Shape"]

CACHE_DIR  = Path("pdf_cache")
SLEEP_BETWEEN = 0.0
STOP_ON_DOWNLOAD_FAILURE = False
CLEANED_CSV = "TDK_E_and_Toroid_Cores_no_catalog_cleaned.csv"
# -------------------------------------------

CACHE_DIR.mkdir(parents=True, exist_ok=True)

# --- HTTP session with retries ---
session = requests.Session()
retries = Retry(total=5, connect=3, read=5, backoff_factor=1.5,
                status_forcelist=[429, 500, 502, 503, 504],
                allowed_methods=["GET", "HEAD", "OPTIONS"])
adapter = HTTPAdapter(max_retries=retries, pool_connections=2, pool_maxsize=2)
session.mount("https://", adapter)
session.mount("http://", adapter)

HEADERS = {
    "User-Agent": "Mozilla/5.0",
    "Referer": "https://product.tdk.com/",
    "Accept": "application/pdf,*/*;q=0.9",
}

def download_pdf(url: str, dest: Path) -> bool:
    try:
        if dest.exists() and dest.stat().st_size > 1024:
            return True
        with session.get(url, headers=HEADERS, timeout=(10, 180), stream=True) as r:
            r.raise_for_status()
            if "pdf" not in (r.headers.get("Content-Type") or "").lower():
                raise ValueError(f"Non-PDF content-type: {r.headers.get('Content-Type')}")
            with open(dest, "wb") as f:
                for chunk in r.iter_content(65536):
                    if chunk:
                        f.write(chunk)
        return True
    except Exception as e:
        print(f"Download failed: {url} -> {e}")
        return False

# ========= Patterns & cleaners (robust to OCR and grouping) =========
PAT_MM1 = r"m\s*m"
PAT_MM2 = r"m\s*m\s*(?:2|\^?\s*2|²)"
PAT_MM3 = r"m\s*m\s*(?:3|\^?\s*3|³)"

# wide number: 102 000 | 102 000 | 102,000 | 102000 | 683.5
PAT_NUM_WIDE = r"([-+]?(?:\d{1,3}(?:[ \u00A0,]\d{3})+|\d+)(?:[.,]\d+)?)"
PUNCT_OPT = r"(?:[:=≈])?"

def _lab(tag):          # 'ae' -> A[_ ]?e
    return rf"(?:{tag[0]}\s*[_ ]?\s*{tag[1]})"

def _lab_any_l_e():     # tolerate l/I/ℓ for 'le'
    return r"(?:[lIℓ]\s*[_ ]?\s*e)"

def _clean_num_wide(s: str | None):
    if not s:
        return None
    s = (str(s)
         .replace("\u2212","-").replace("\u2013","-")
         .replace("\u00A0"," ").strip())
    s = s.replace(",", ".")                      # comma -> decimal point
    s = re.sub(r"(?<=\d)[ ](?=\d{3}\b)", "", s)  # remove thousand spaces
    s = re.sub(r"[^\d.+\-eE]", "", s)
    try:
        return float(s)
    except:
        return None

def read_pdf_fulltext(pdf_path: Path) -> str:
    try:
        doc = fitz.open(str(pdf_path))
    except Exception:
        return ""
    pages = []
    for p in doc:
        t = p.get_text("text")
        t = (t.replace("²","2").replace("³","3")
               .replace("\u2212","-").replace("\u00A0"," "))
        pages.append(t)
    doc.close()
    return "\n".join(pages)

def _slice_magnetic_block(txt: str) -> str | None:
    m = re.search(r"Magnetic\s+characteristics", txt, flags=re.IGNORECASE)
    if not m:
        return None
    win = txt[m.end(): m.end()+12000]
    cut = re.search(r"\b(Gapped|Accessories|Calculation\s+factors|Coil\s+former|Symbols and terms)\b",
                    win, flags=re.IGNORECASE)
    return win[:cut.start()] if cut else win

# --- Generic E-family patterns (now wide-number + optional punctuation)
GEN_LE = re.compile(rf"\b{_lab_any_l_e()}\s*{PUNCT_OPT}\s*{PAT_NUM_WIDE}\s*{PAT_MM1}\b",
                    re.IGNORECASE | re.DOTALL)
GEN_AE = re.compile(rf"\b{_lab('ae')}\s*{PUNCT_OPT}\s*{PAT_NUM_WIDE}\s*{PAT_MM2}\b",
                    re.IGNORECASE | re.DOTALL)
GEN_VE = re.compile(rf"\b{_lab('ve')}\s*{PUNCT_OPT}\s*{PAT_NUM_WIDE}\s*{PAT_MM3}\b",
                    re.IGNORECASE | re.DOTALL)

# --- E(Large) patterns (same as generic; kept separate in case PDFs vary)
EL_LE = GEN_LE
EL_AE = GEN_AE
EL_VE = GEN_VE

def parse_generic_e(full_text: str):
    txt = _slice_magnetic_block(full_text) or full_text
    le  = _clean_num_wide(next((m.group(1) for m in GEN_LE.finditer(txt)), None))
    ae  = _clean_num_wide(next((m.group(1) for m in GEN_AE.finditer(txt)), None))
    ve  = _clean_num_wide(next((m.group(1) for m in GEN_VE.finditer(txt)), None))
    # physics backfill
    if ae is None and (le is not None) and (ve is not None) and le > 0: ae = ve / le
    if le is None and (ae is not None) and (ve is not None) and ae > 0: le = ve / ae
    if ve is None and (ae is not None) and (le is not None):           ve = ae * le
    return (le, ae, ve)

def parse_e_large_geom(pdf_path: Path):
    txt_full = read_pdf_fulltext(pdf_path)
    txt = _slice_magnetic_block(txt_full) or txt_full
    le  = _clean_num_wide(next((m.group(1) for m in EL_LE.finditer(txt)), None))
    ae  = _clean_num_wide(next((m.group(1) for m in EL_AE.finditer(txt)), None))
    ve  = _clean_num_wide(next((m.group(1) for m in EL_VE.finditer(txt)), None))
    if ae is None and (le is not None) and (ve is not None) and le > 0: ae = ve / le
    if le is None and (ae is not None) and (ve is not None) and ae > 0: le = ve / ae
    if ve is None and (ae is not None) and (le is not None):           ve = ae * le
    return {
        "Sigma_l_over_A_mm_inv": None,
        "le_m":  le*1e-3  if le is not None else None,
        "Ae_m2": ae*1e-6  if ae is not None else None,
        "Ve_m3": ve*1e-9  if ve is not None else None,
    }

# ========= Toroid (Ring) helpers (unchanged) =========
def _num_tolerant(s: str):
    if s is None: return None
    s = (str(s).replace("\u2212","-").replace("\u2013","-").replace("\u00A0"," ").replace(",", ".").strip())
    m = re.search(r"[-+]?\d+(?:\.\d+)?", s)
    return float(m.group(0)) if m else None

def normalize_ordering_code(s: str) -> str:
    s = str(s or "")
    s = re.sub(r"\s*\(.*?\)\s*$", "", s)
    m = re.search(r"(B[0-9A-Z]+)", s.upper())
    return m.group(1) if m else s.split()[0]

def read_pdf_text(pdf_path: Path) -> tuple[str, list[str]]:
    doc = fitz.open(pdf_path)
    pages = [page.get_text("text") for page in doc]
    doc.close()
    return "\n".join(pages), pages

def _quartet_after_code(text: str, code: str):
    pat = rf"{re.escape(code)}\s+(?P<slA>[-+]?\d+(?:[.,]\d+)?)\s+(?P<le>[-+]?\d+(?:[.,]\d+)?)\s+(?P<Ae>[-+]?\d+(?:[.,]\d+)?)\s+(?P<Ve>[-+]?\d+(?:[.,]\d+)?)"
    return re.search(pat, text, flags=re.IGNORECASE)

def _plausible(g):
    slA=_num_tolerant(g.get("slA")); le=_num_tolerant(g.get("le")); Ae=_num_tolerant(g.get("Ae")); Ve=_num_tolerant(g.get("Ve"))
    if None in (slA, le, Ae, Ve): return None
    if not (0.05 <= slA <= 500): return None
    if not (1    <= le  <= 2000): return None
    if not (0.05 <= Ae  <= 1e4): return None
    if not (1    <= Ve  <= 1e8): return None
    return slA, le, Ae, Ve

def _header_quartet_tokenwise(text: str):
    hdr = re.search(r"Magnetic\s+characteristics", text, flags=re.IGNORECASE)
    if not hdr: return None
    window = text[hdr.end(): hdr.end()+12000]
    toks = [m.group(0) for m in re.finditer(r"[-+]?\d+(?:[.,]\d+)?", window)]
    def is_dec(s): return "." in s or "," in s
    for i in range(len(toks)-3):
        a,b,c,d = toks[i:i+4]
        if len(a)==3 and a.isdigit(): continue
        if not (is_dec(a) and is_dec(b) and is_dec(c)): continue
        try: fa,fb,fc,fd=(float(a.replace(",", ".")),float(b.replace(",", ".")),float(c.replace(",", ".")),float(d.replace(",", ".")))
        except ValueError: continue
        if 0.05<=fa<=500 and 1<=fb<=2000 and 0.05<=fc<=1e4 and 1<=fd<=1e8:
            return fa,fb,fc,fd
    return None

def parse_toroid_magnetic_characteristics(pdf_path: Path, ordering_code_raw: str):
    full_text, pages = read_pdf_text(pdf_path)
    part = ordering_code_raw.strip()
    norm = normalize_ordering_code(part)

    for code in (part, norm if norm != part else None):
        if not code: continue
        m = _quartet_after_code(full_text, code)
        if m:
            pl = _plausible(m.groupdict())
            if pl:
                slA, le, Ae, Ve = pl
                return {"Sigma_l_over_A_mm_inv": slA, "le_m": le*1e-3, "Ae_m2": Ae*1e-6, "Ve_m3": Ve*1e-9}

    page_idx = next((i for i, pg in enumerate(pages) if (part in pg) or (norm and norm in pg)), None)
    if page_idx is not None:
        pl = _header_quartet_tokenwise(pages[page_idx])
        if pl:
            slA, le, Ae, Ve = pl
            return {"Sigma_l_over_A_mm_inv": slA, "le_m": le*1e-3, "Ae_m2": Ae*1e-6, "Ve_m3": Ve*1e-9}

    pl = _header_quartet_tokenwise(full_text)
    if pl:
        slA, le, Ae, Ve = pl
        return {"Sigma_l_over_A_mm_inv": slA, "le_m": le*1e-3, "Ae_m2": Ae*1e-6, "Ve_m3": Ve*1e-9}

    return {"Sigma_l_over_A_mm_inv": None, "le_m": None, "Ae_m2": None, "Ve_m3": None}

# ========= Planar ER detector & parser =========
PLANAR_HINTS = {"ER", "PLANAR", "EER", "EIR"}
def is_planar_er(row: pd.Series) -> bool:
    for key in ["core_shape","Family","Core Shape","Core Type","Core type"]:
        val = str(row.get(key, "") or "").upper()
        if any(h in val for h in PLANAR_HINTS): return True
    url = str(row.get(URL_COL, "") or "").upper()
    if any(h in url for h in PLANAR_HINTS): return True
    pn = str(row.get(ID_COL, "") or "").upper()
    if re.search(r"\bB65(5|6)\d{2}", pn) and any(h in url for h in PLANAR_HINTS): return True
    return False

PAT_LE_ER = GEN_LE; PAT_AE_ER = GEN_AE; PAT_VE_ER = GEN_VE

def _clean_float_relaxed(s): return _clean_num_wide(s)

def parse_planar_er_geom(pdf_path: Path):
    txt = read_pdf_fulltext(pdf_path)
    le  = _clean_float_relaxed(next((m.group(1) for m in PAT_LE_ER.finditer(txt)), None))
    ae  = _clean_float_relaxed(next((m.group(1) for m in PAT_AE_ER.finditer(txt)), None))
    ve  = _clean_float_relaxed(next((m.group(1) for m in PAT_VE_ER.finditer(txt)), None))
    if ae is None and (le is not None) and (ve is not None) and le > 0: ae = ve / le
    if le is None and (ae is not None) and (ve is not None) and ae > 0: le = ve / ae
    if ve is None and (ae is not None) and (le is not None):           ve = ae * le
    return {"Sigma_l_over_A_mm_inv": None, "le_m": le*1e-3 if le is not None else None,
            "Ae_m2": ae*1e-6 if ae is not None else None, "Ve_m3": ve*1e-9 if ve is not None else None}

# ========= Cache finder =========
def find_cached_pdf(part_raw: str, cache_dir: Path) -> Path | None:
    oc = normalize_ordering_code(part_raw) or part_raw
    p = cache_dir / f"{oc}.pdf"
    if p.exists() and p.stat().st_size > 1024: return p
    hits = sorted(cache_dir.glob(f"{oc}*.pdf"))
    if hits: return hits[0]
    fam = oc[:9]
    hits = sorted(cache_dir.glob(f"{fam}*.pdf"))
    if hits: return hits[0]
    for q in cache_dir.glob("*.pdf"):
        if oc in q.stem: return q
    return None

# ---------- load & clean CSV ----------
df = pd.read_csv(INPUT_CSV, dtype=str, low_memory=False)

if MAT_COL in df.columns:
    df["material_name"] = (df[MAT_COL].astype(str)
                           .str.replace(r'["\']',"",regex=True)
                           .str.strip()
                           .replace({"nan":np.nan,"None":np.nan,"":np.nan}))
else:
    df["material_name"] = np.nan

shape_src = next((c for c in SHAPE_COLS if c in df.columns), None)
if shape_src:
    df["core_shape"] = (df[shape_src].astype(str)
                        .str.replace(r'["\']',"",regex=True)
                        .str.strip()
                        .replace({"nan":np.nan,"None":np.nan,"":np.nan}))
else:
    df["core_shape"] = np.nan

# Clean CSV values → SI
df["A_L_clean"] = (df[AL_COL].astype(str)
                    .str.replace("\u2212","-",regex=False)
                    .str.extract(r"([\d]+(?:[\,\.]\d+)?)")[0]
                    .str.replace(",",".",regex=False))
df["A_L_H_per_turn2"] = pd.to_numeric(df["A_L_clean"], errors="coerce") * 1e-9

df["B_s_clean"] = (df[BS_COL].astype(str)
                    .str.replace("\u2212","-",regex=False)
                    .str.extract(r"([\d]+(?:[\,\.]\d+)?)")[0]
                    .str.replace(",",".",regex=False))
df["B_s_T"] = pd.to_numeric(df["B_s_clean"], errors="coerce") * 1e-3

if GAP_COL in df.columns:
    df["gap_clean"] = (df[GAP_COL].astype(str)
                        .str.extract(r"([-+]?\d+(?:[.,]\d+)?)")[0]
                        .str.replace(",",".",regex=False))
    df["gap_m"] = pd.to_numeric(df["gap_clean"], errors="coerce") * 1e-3
else:
    df["gap_m"] = np.nan

df["mu_i"] = pd.to_numeric(df[MUI_COL], errors="coerce") if MUI_COL in df.columns else np.nan

# ---------- PDF extraction loop ----------
rows = []
for i, row in tqdm(df.iterrows(), total=len(df), desc="TDK PDFs"):
    url_raw  = str(row.get(URL_COL, "") or "")
    part_raw = str(row.get(ID_COL, f"row{i}") or "").strip()

    oc = normalize_ordering_code(part_raw) or part_raw
    pdf_path = CACHE_DIR / f"{oc}.pdf"

    ok = False
    cached = find_cached_pdf(part_raw, CACHE_DIR)
    if cached:
        pdf_path = cached; ok = True
    elif pdf_path.exists() and pdf_path.stat().st_size > 1024:
        ok = True
    elif url_raw.lower().endswith(".pdf"):
        ok = download_pdf(url_raw, pdf_path)

    if not ok:
        if STOP_ON_DOWNLOAD_FAILURE:
            raise SystemExit(f"Failed to obtain PDF: {url_raw}")
        rows.append({"row": i, "Sigma_l_over_A_mm_inv": None, "le_m": None, "Ae_m2": None, "Ve_m3": None})
        continue

    try:
        # 1) Toroids first
        if "ring" in url_raw.lower() or "toroid" in url_raw.lower():
            geom = parse_toroid_magnetic_characteristics(pdf_path, part_raw)
        else:
            # 2) Generic E parse on the full text
            full_text = read_pdf_fulltext(pdf_path)
            le, ae, ve = parse_generic_e(full_text)
            # 3) If anything missing, try the E(Large) parser (works for that family style)
            if (le is None) or (ae is None) or (ve is None):
                geom = parse_e_large_geom(pdf_path)
            else:
                geom = {"Sigma_l_over_A_mm_inv": None,
                        "le_m": le*1e-3, "Ae_m2": ae*1e-6, "Ve_m3": ve*1e-9}
            # 4) If still missing and hints say planar ER, try ER parser
            if (geom["le_m"] is None or geom["Ae_m2"] is None or geom["Ve_m3"] is None) and is_planar_er(row):
                geom = parse_planar_er_geom(pdf_path)
            # 5) Last resort: toroid parse if URL hints it but earlier checks missed
            if (geom["le_m"] is None and geom["Ae_m2"] is None and geom["Ve_m3"] is None):
                if "ring" in url_raw.lower() or "toroid" in url_raw.lower():
                    geom = parse_toroid_magnetic_characteristics(pdf_path, part_raw)

    except Exception as e:
        print(f"Parse failed for {part_raw} ({pdf_path}): {e}")
        geom = {"Sigma_l_over_A_mm_inv": None, "le_m": None, "Ae_m2": None, "Ve_m3": None}

    geom["row"] = i
    rows.append(geom)
    if SLEEP_BETWEEN:
        time.sleep(SLEEP_BETWEEN)

geom_df = pd.DataFrame(rows).set_index("row")
df = df.join(geom_df, how="left")

# ---------- Save ----------
df.to_csv(CLEANED_CSV, index=False)
print(f"Saved cleaned DataFrame with geometry → {CLEANED_CSV}")


TDK PDFs: 100%|██████████| 772/772 [00:46<00:00, 16.72it/s]

Saved cleaned DataFrame with geometry → TDK_E_and_Toroid_Cores_no_catalog_cleaned.csv



