In [239]:
#pip install pymupdf 
#pip install pdfplumber
#pip install pymupdf pillow numpy
#pip install paddlepaddle==2.5.2
#pip install paddleocr

In [None]:
from pathlib import Path

# 입력/출력 경로
PDF_PATH = Path("./test_paper.pdf")   # 분석할 PDF
OUT_ROOT = Path("./result")
DOC_NAME = PDF_PATH.stem
OUT_DIR = OUT_ROOT / DOC_NAME         # 문서 루트: ./result/<문서명>

# Step-1 산출물(그대로 유지)
PAGES_DIR  = OUT_DIR / "pages"        # page-level json/txt/png
IMAGES_DIR = OUT_DIR / "images"       # 추출된 원본 이미지
TABLES_DIR = OUT_DIR / "tables"       # 추출된 표(CSV/JSON)

# ★ 캡션 포함 저장본 폴더(새로 추가)
IMAGE_CAP_DIRNAME = "images_cap"
TABLE_CAP_DIRNAME = "tables_cap"
IMG_CAP_DIR = OUT_DIR / IMAGE_CAP_DIRNAME
TBL_CAP_DIR = OUT_DIR / TABLE_CAP_DIRNAME

# Step-1 manifest (Step-2에서 사용)
MANIFEST = OUT_DIR / "manifest.json"

# 디렉토리 생성
for d in [OUT_DIR, PAGES_DIR, IMAGES_DIR, TABLES_DIR, IMG_CAP_DIR, TBL_CAP_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print("PDF:", PDF_PATH.resolve())
print("OUT_DIR:", OUT_DIR.resolve())

# --------- OCR 설정 ---------
USE_PADDLE = True          # False면 pytesseract 사용
OCR_LANG   = "korean"      # 'eng', 'korean' 등 문서에 맞게

# --------- 매칭 가중치/힌트 ---------
W_DIST      = 1000.0   # 거리 가중치(1/d)
W_CAPTION_T = 5.0      # 캡션에 term 포함 가중
W_OCR_T     = 4.0      # 이미지 OCR에 term 포함 가중
W_TABLE_T   = 1.2      # 표 텍스트 가중(이미지보다 약하게)
W_BASE_CAP  = 1.0
W_BASE_OCR  = 1.0
HINTS = ["figure","fig.","diagram","architecture","table","grid","chart","plot","그림","표"]

MIN_WORD_LEN = 2       # 클릭 대상 최소 길이

# --------- 후보 필터/스코어 보정 ---------
MIN_IMG_AREA_RATIO = 0.005   # 너무 작은 아이콘 제거(페이지 면적 대비)
MAX_IMG_AREA_RATIO = 0.40    # 너무 큰 영역 제거(거의 전체 페이지)
PAGE_GAP_PENALTY   = 300.0   # 다른 페이지일 때 감점(페이지 차이 * 값)

# --------- 캡션 탐지 관련 설정 ---------
CAPTION_SIDE_PAD        = 12   # 이미지/표 좌우 여백
CAPTION_BOTTOM_PAD      = 8    # 아래쪽 여백
CAPTION_TOP_PAD         = 6    # 위쪽 여백
CAPTION_RADIUS_IMG      = 80   # 이미지 아래쪽 캡션 탐지 반경
CAPTION_RADIUS_TBL_ABOVE= 50   # 표 위 캡션 탐지 반경
CAPTION_RADIUS_TBL_BELOW= 80   # 표 아래 캡션 탐지 반경

# --------- “저장본만” 사용(동기화 모드) ---------
STRICT_SAME_AS_STEP1 = True                 # 저장된 결과물만 사용
ALLOWED_IMAGE_DIRS   = [IMAGE_CAP_DIRNAME]  # 이미지 후보는 images_cap/만
ALLOWED_TABLE_DIRS   = [TABLE_CAP_DIRNAME]  # 표 후보는 tables_cap/만

# --------- 기타 토글 ---------
VFIG_ENABLE = False             # 가상 그림(벡터 도식 자동 추정) 사용 안 함
SAVE_CAPTION_COMPOSITE = False  # 캡션 포함 저장본을 생성(이미지/표 둘 다)


PDF: /home/dataplay/workspace/test_paper.pdf
OUT_DIR: /home/dataplay/workspace/result/test_paper


In [250]:
import json, math, re, base64

def _to_jsonable(obj):
    """PDF 추출물 안의 bytes/셋/복합객체를 JSON 직렬화 가능 형태로 변환"""
    if isinstance(obj, bytes):
        # 1) UTF-8로 읽히면 텍스트로 저장
        try:
            return obj.decode("utf-8")
        except UnicodeDecodeError:
            # 2) 아니면 base64로 안전 저장
            return "base64:" + base64.b64encode(obj).decode("ascii")
    if isinstance(obj, dict):
        return {str(k): _to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)):
        return [_to_jsonable(v) for v in obj]
    if isinstance(obj, set):
        return [_to_jsonable(v) for v in obj]
    # PyMuPDF Rect 같은 객체가 들어오면 리스트로 변환
    if hasattr(obj, "__dict__") and obj.__class__.__name__ in ("Rect",):
        try:
            return [float(obj.x0), float(obj.y0), float(obj.x1), float(obj.y1)]
        except Exception:
            return str(obj)
    return obj

def save_json(path, obj):
    path.parent.mkdir(parents=True, exist_ok=True)
    safe = _to_jsonable(obj)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(safe, f, ensure_ascii=False, indent=2)

def bbox_center(b):
    x0,y0,x1,y1 = b
    return ((x0+x1)/2.0, (y0+y1)/2.0)

def bbox_distance(b1, b2):
    c1 = bbox_center(b1); c2 = bbox_center(b2)
    return math.hypot(c1[0]-c2[0], c1[1]-c2[1])

In [251]:
import fitz  # PyMuPDF

def parse_pdf(pdf_path: Path):
    doc = fitz.open(pdf_path)
    meta = {"page_count": len(doc), "pages": []}
    for pno in range(len(doc)):
        page = doc[pno]
        width, height = page.rect.width, page.rect.height

        # words: (x0,y0,x1,y1, word, block_no, line_no, word_no)
        words = page.get_text("words")
        word_items = [{"text": w[4], "bbox": [w[0], w[1], w[2], w[3]]} for w in words]

        # plain text
        plain = page.get_text("text")

        # blocks (optional, useful for grouping)
        blocks_raw = page.get_text("blocks")
        block_items = []
        for b in blocks_raw:
            # (x0, y0, x1, y1, text, block_no, block_type)
            x0,y0,x1,y1, text, *rest = b
            block_items.append({"bbox":[x0,y0,x1,y1], "text": text})

        # images with bbox via rawdict
        raw = page.get_text("rawdict")
        image_items = []
        def walk(block):
            t = block.get("type")
            if t == 1:  # image
                bbox = block.get("bbox")
                xref = block.get("image")
                if bbox and xref:
                    image_items.append({"xref": xref, "bbox": bbox})
            for k in ("blocks","lines","spans"):
                if isinstance(block.get(k, []), list):
                    for child in block.get(k, []):
                        walk(child)
        for b in raw.get("blocks", []):
            walk(b)

        # vector drawings (lines/rects/curves) → 표 라인 등 힌트
        drawings = page.get_drawings()  # list of dicts
        drawing_count = len(drawings)

        # render page to PNG (debug/visual)
        pix = page.get_pixmap(matrix=fitz.Matrix(2,2))  # 2x scale
        png_path = PAGES_DIR / f"page_{pno+1:04d}.png"
        pix.save(str(png_path))

        # save per-page txt
        txt_path = PAGES_DIR / f"page_{pno+1:04d}.txt"
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(plain)

        # collect page meta (image paths will be filled after export)
        meta["pages"].append({
            "page": pno,
            "width": width,
            "height": height,
            "words": word_items,
            "blocks": block_items,
            "images": image_items,   # {"xref":.., "bbox":[..]}
            "drawing_count": drawing_count,
            "png": str(png_path)
        })
    doc.close()
    return meta

doc_meta = parse_pdf(PDF_PATH)
print("Pages parsed:", doc_meta["page_count"])

Pages parsed: 10


In [252]:
def export_images(pdf_path: Path):
    doc = fitz.open(pdf_path)
    xref_to_path = {}
    for pno in range(len(doc)):
        page = doc[pno]
        for img_info in page.get_images(full=True):
            xref = img_info[0]
            if xref in xref_to_path:
                continue
            pix = fitz.Pixmap(doc, xref)
            if pix.n >= 5:  # CMYK → RGB
                pix = fitz.Pixmap(fitz.csRGB, pix)
            img_bytes = pix.tobytes("png")
            img_id = "img_xref_{}".format(xref)
            img_path = IMAGES_DIR / "{}.png".format(img_id)
            with open(img_path, "wb") as f:
                f.write(img_bytes)
            xref_to_path[xref] = str(img_path)
    doc.close()
    return xref_to_path

xref_to_path = export_images(PDF_PATH)

# add image file paths into doc_meta
for p in doc_meta["pages"]:
    for im in p["images"]:
        im["path"] = xref_to_path.get(im["xref"])

print("Exported images:", len(xref_to_path))

Exported images: 12


In [253]:
## 6) 표 추출 (가능 시 pdfplumber)
tables_found = 0
try:
    import pdfplumber
    with pdfplumber.open(str(PDF_PATH)) as pdf:
        for pno, page in enumerate(pdf.pages):
            # 표 탐지 시도
            try:
                # find_tables gives table objects with bbox; more informative than extract_tables
                tbls = page.find_tables()
            except Exception:
                tbls = []
            if not tbls:
                continue
            for ti, t in enumerate(tbls, 1):
                # CSV 저장
                csv_path = TABLES_DIR / "page_{:04d}_table_{:02d}.csv".format(pno+1, ti)
                try:
                    t.to_csv(str(csv_path))
                except Exception:
                    # fallback: rows as plain list
                    data = t.extract()
                    import csv
                    with open(csv_path, "w", newline="", encoding="utf-8") as f:
                        writer = csv.writer(f)
                        for row in data or []:
                            writer.writerow(row or [])
                # JSON 저장 (bbox 포함)
                tbl_json = {
                    "page": pno,
                    "bbox": list(t.bbox) if hasattr(t, "bbox") else None,
                    "csv": str(csv_path)
                }
                json_path = TABLES_DIR / "page_{:04d}_table_{:02d}.json".format(pno+1, ti)
                save_json(json_path, tbl_json)
                tables_found += 1
    print("Tables extracted (pdfplumber):", tables_found)
except Exception as e:
    print("[알림] pdfplumber 사용 불가 또는 표 미검출:", e)
    print("- 스캔 PDF의 경우 OCR 후 camelot/tabula 등 추가 도구가 필요할 수 있습니다.")

Tables extracted (pdfplumber): 7


In [254]:
## 7) 페이지별 JSON 저장 + manifest.json 생성
import json

# 페이지 JSON 저장
for p in doc_meta["pages"]:
    page_json = PAGES_DIR / "page_{:04d}.json".format(p["page"]+1)
    save_json(page_json, p)

# manifest
manifest = {
    "doc_name": DOC_NAME,
    "pdf": str(PDF_PATH),
    "out_dir": str(OUT_DIR),
    "page_count": doc_meta["page_count"],
    "pages": [],
    "images_total": sum(len(p["images"]) for p in doc_meta["pages"]),
    "tables_total": None,  # 아래에서 채움
}

# needs_ocr flag per page (heuristic: words < 5 or plain text file is near-empty)
for p in doc_meta["pages"]:
    txt_path = Path(p["png"]).with_suffix(".txt")
    # 실제 txt 경로는 earlier saved path; compute from known naming
    txt_path = PAGES_DIR / "page_{:04d}.txt".format(p["page"]+1)
    try:
        txt_sz = Path(txt_path).stat().st_size
    except Exception:
        txt_sz = 0
    needs_ocr = (len(p["words"]) < 5) or (txt_sz < 10)
    manifest["pages"].append({
        "page": p["page"],
        "png": p["png"],
        "json": str(PAGES_DIR / "page_{:04d}.json".format(p["page"]+1)),
        "txt": str(txt_path),
        "words": len(p["words"]),
        "images": len(p["images"]),
        "drawing_count": p["drawing_count"],
        "needs_ocr": bool(needs_ocr),
    })

# count tables from folder
try:
    table_jsons = list(TABLES_DIR.glob("*.json"))
    manifest["tables_total"] = len(table_jsons)
except Exception:
    manifest["tables_total"] = 0

save_json(OUT_DIR / "manifest.json", manifest)

print("Saved per-page JSON and manifest:")
print(json.dumps(manifest, ensure_ascii=False, indent=2)[:1000] + " ...")

Saved per-page JSON and manifest:
{
  "doc_name": "test_paper",
  "pdf": "test_paper.pdf",
  "out_dir": "result/test_paper",
  "page_count": 10,
  "pages": [
    {
      "page": 0,
      "png": "result/test_paper/pages/page_0001.png",
      "json": "result/test_paper/pages/page_0001.json",
      "txt": "result/test_paper/pages/page_0001.txt",
      "words": 423,
      "images": 1,
      "drawing_count": 2,
      "needs_ocr": false
    },
    {
      "page": 1,
      "png": "result/test_paper/pages/page_0002.png",
      "json": "result/test_paper/pages/page_0002.json",
      "txt": "result/test_paper/pages/page_0002.txt",
      "words": 636,
      "images": 0,
      "drawing_count": 2,
      "needs_ocr": false
    },
    {
      "page": 2,
      "png": "result/test_paper/pages/page_0003.png",
      "json": "result/test_paper/pages/page_0003.json",
      "txt": "result/test_paper/pages/page_0003.txt",
      "words": 434,
      "images": 2,
      "drawing_count": 2,
      "needs_ocr": fal

In [255]:
import json, re, os
from PIL import Image

def load_manifest(manifest_path: Path):
    with open(manifest_path, "r", encoding="utf-8") as f:
        mani = json.load(f)
    # 페이지 JSON 로드
    pages = []
    for p in mani["pages"]:
        with open(p["json"], "r", encoding="utf-8") as f:
            pj = json.load(f)
        # png size
        png_path = Path(p["png"])
        try:
            with Image.open(png_path) as im:
                png_w, png_h = im.size
        except Exception:
            png_w = png_h = None
        pj["_png"] = {"path": str(png_path), "w": png_w, "h": png_h}
        pages.append(pj)
    # 테이블 메타
    tables = []
    tables_dir = manifest_path.parent / "tables"
    if tables_dir.exists():
        for jf in tables_dir.glob("*.json"):
            try:
                with open(jf, "r", encoding="utf-8") as f:
                    tj = json.load(f)
                tj["_json"] = str(jf)
                tables.append(tj)
            except Exception:
                pass
    return mani, pages, tables

manifest, pages, tables_meta = load_manifest(MANIFEST)
(len(pages), len(tables_meta))

(10, 7)

In [256]:
ocr_engine = None
try:
    if USE_PADDLE:
        from paddleocr import PaddleOCR
        ocr_engine = PaddleOCR(use_angle_cls=True, lang=OCR_LANG)
        print("OCR: PaddleOCR loaded")
    else:
        import pytesseract
        ocr_engine = pytesseract
        print("OCR: pytesseract ready (엔진 설치 필요)")
except Exception as e:
    print("[경고] OCR 엔진 로드 실패:", e)
    print("OCR 없이도 매칭은 거리/캡션 기반으로 동작합니다.")

  ocr_engine = PaddleOCR(use_angle_cls=True, lang=OCR_LANG)
[32mCreating model: ('PP-LCNet_x1_0_doc_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/dataplay/.paddlex/official_models/PP-LCNet_x1_0_doc_ori`.[0m
[32mCreating model: ('UVDoc', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/dataplay/.paddlex/official_models/UVDoc`.[0m
[32mCreating model: ('PP-LCNet_x1_0_textline_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/dataplay/.paddlex/official_models/PP-LCNet_x1_0_textline_ori`.[0m
[32mCreating model: ('PP-OCRv5_server_det', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/dataplay/.paddlex/official_models/PP-OCRv5_server_det`.[0m
[32mCreating model: ('korean_PP-OCRv5_mob

OCR: PaddleOCR loaded


In [257]:
import csv

def ocr_image(path: str):
    if not path or not os.path.exists(path) or ocr_engine is None:
        return ""
    if USE_PADDLE:
        res = ocr_engine.ocr(path, cls=True)
        texts = []
        for line in res[0] if res else []:
            txt = line[1][0]
            if txt:
                texts.append(txt)
        return " ".join(texts)
    else:
        from PIL import Image
        return ocr_engine.image_to_string(Image.open(path))

def read_table_text(csv_path: str):
    try:
        rows = []
        with open(csv_path, newline="", encoding="utf-8") as f:
            for row in csv.reader(f):
                rows.append(" ".join([c for c in row if c]))
        return " ".join(rows)
    except Exception:
        return ""

import re
def score_contains(text: str, term: str):
    if not text: return 0
    try:
        return 1 if re.search(r"\b" + re.escape(term) + r"\b", text, flags=re.IGNORECASE) else 0
    except re.error:
        return (term.lower() in text.lower())*1

In [None]:
## 5) 이미지/표 보조 텍스트 + "캡션 포함 저장본" 생성 (virtual_figures 사용 안 함)
import os, csv, shutil
from pathlib import Path
from PIL import Image

# 디렉토리 보장
IMG_CAP_DIR.mkdir(parents=True, exist_ok=True)
TBL_CAP_DIR.mkdir(parents=True, exist_ok=True)

# ── 보조 함수들 ────────────────────────────────────────────────────────────────
def _union_bbox(ws):
    x0 = min(w["bbox"][0] for w in ws); y0 = min(w["bbox"][1] for w in ws)
    x1 = max(w["bbox"][2] for w in ws); y1 = max(w["bbox"][3] for w in ws)
    return [x0,y0,x1,y1]

def _to_png_scale(page):
    W, H = page["width"], page["height"]
    pm = page.get("_png", {})
    pw, ph = pm.get("w"), pm.get("h")
    if not (W and H and pw and ph): return None, None
    return pw / W, ph / H

def _save_fallback_copy(src_path: str | None, dst_dir: Path, base: str):
    if not src_path or not isinstance(src_path, (str, os.PathLike)) or not os.path.exists(src_path):
        return None
    dst = dst_dir / f"cap_{Path(base).name}"
    try:
        shutil.copy2(src_path, dst)
        return str(dst)
    except Exception:
        return None

# ── 이미지 저장 ──────────────────────────────────────────────────────────────
def _save_img_with_caption(page, img_bbox, basename, orig_img_path: str | None = None):
    sx, sy = _to_png_scale(page)
    png_path = page.get("_png", {}).get("path")
    if not (sx and sy and png_path and os.path.exists(png_path)):
        return _save_fallback_copy(orig_img_path, IMG_CAP_DIR, basename), ""

    W, H = page["width"], page["height"]
    x0,y0,x1,y1 = img_bbox

    caps = []
    for w in page.get("words", []):
        wb = w.get("bbox")
        if not wb: continue
        wy = (wb[1]+wb[3])/2
        if wy >= y1 and (wy - y1) <= CAPTION_RADIUS_IMG:
            if not (wb[2] < x0 or wb[0] > x1):
                caps.append(w)

    rx0 = max(0, x0 - CAPTION_SIDE_PAD)
    rx1 = min(W, x1 + CAPTION_SIDE_PAD)
    if caps:
        _, _, _, cy1 = _union_bbox(caps)
        ry1 = min(H, max(y1, cy1 + CAPTION_BOTTOM_PAD))
    else:
        ry1 = y1
    ry0 = max(0, y0)

    crop_box = (int(rx0*sx), int(ry0*sy), int(rx1*sx), int(ry1*sy))
    if crop_box[2]-crop_box[0] <= 8 or crop_box[3]-crop_box[1] <= 8:
        return _save_fallback_copy(orig_img_path, IMG_CAP_DIR, basename), ""

    out_path = IMG_CAP_DIR / f"cap_{basename}"
    try:
        with Image.open(png_path) as im:
            im.crop(crop_box).save(out_path)
        caption_text = " ".join(w.get("text","") for w in caps if w.get("text"))
        return str(out_path), caption_text
    except Exception as e:
        print("[warn] save_img_with_caption:", e)
        return _save_fallback_copy(orig_img_path, IMG_CAP_DIR, basename), ""

# ── 표 저장 ────────────────────────────────────────────────────────────────
def _save_table_with_caption(page, tbl_bbox, basename):
    sx, sy = _to_png_scale(page)
    png_path = page.get("_png", {}).get("path")
    if not (sx and sy and png_path and os.path.exists(png_path)):
        return None, ""

    W, H = page["width"], page["height"]
    x0,y0,x1,y1 = tbl_bbox

    above, below = [], []
    for w in page.get("words", []):
        wb = w.get("bbox")
        if not wb: continue
        wy = (wb[1]+wb[3])/2
        if wy <= y0 and (y0 - wy) <= CAPTION_RADIUS_TBL_ABOVE:
            if not (wb[2] < x0 or wb[0] > x1):
                above.append(w)
        if wy >= y1 and (wy - y1) <= CAPTION_RADIUS_TBL_BELOW:
            if not (wb[2] < x0 or wb[0] > x1):
                below.append(w)

    rx0 = max(0, x0 - CAPTION_SIDE_PAD)
    rx1 = min(W, x1 + CAPTION_SIDE_PAD)
    ry0 = max(0, y0)
    ry1 = min(H, y1)

    if above:
        _, ay0, _, _ = _union_bbox(above)
        ry0 = max(0, min(ry0, ay0 - CAPTION_TOP_PAD))
    if below:
        _, _, _, by1 = _union_bbox(below)


TypeError: expected str, bytes or os.PathLike object, not NoneType

In [241]:
def _save_img_no_caption(page, img_bbox, basename):
    sx, sy = _to_png_scale(page)
    png_path = (page.get("_png", {}) or {}).get("path")
    if not (sx and sy and png_path and os.path.exists(png_path)):
        # 페이지 PNG가 없으면 원본 이미지 파일을 그대로 복사 시도(없으면 None)
        return _save_fallback_copy(basename if os.path.exists(basename) else "", IMG_CAP_DIR, basename), ""

    x0,y0,x1,y1 = img_bbox
    # 페이지 PNG 좌표로 변환 후 저장 (캡션 확장 X)
    crop_box = (int(x0*sx), int(y0*sy), int(x1*sx), int(y1*sy))
    if crop_box[2]-crop_box[0] <= 8 or crop_box[3]-crop_box[1] <= 8:
        return _save_fallback_copy("", IMG_CAP_DIR, basename), ""

    out_path = IMG_CAP_DIR / f"cap_{basename}"
    try:
        with Image.open(png_path) as im:
            im.crop(crop_box).save(out_path)
        return str(out_path), ""   # ← 캡션 텍스트 없음
    except Exception as e:
        print("[warn] save_img_no_caption:", e)
        return _save_fallback_copy("", IMG_CAP_DIR, basename), ""

# ===== 페이지별 이미지 후보 구성 (캡션 여부 토글 반영) =====
page_image_aux = {}
for p in pages:
    W, H = p["width"], p["height"]
    area_total = max(1.0, W*H)
    cand = []
    for idx, img in enumerate(p.get("images", [])):
        bbox = img.get("bbox")
        if not bbox or len(bbox) != 4:
            continue
        x0,y0,x1,y1 = bbox

        # path None 대비 안전한 파일명
        raw_path = img.get("path")
        if isinstance(raw_path, str) and raw_path.strip():
            base = Path(raw_path).name
        else:
            base = f"p{int(p['page']):04d}_img{idx:03d}.png"

        # 토글에 따라 저장 방식 선택

        cap_path, cap_text = _save_img_no_caption(p, bbox, base)    # (신규, 캡션 없음)

        if not cap_path:
            continue

        # 크기 필터
        ar = (x1-x0)*(y1-y0)/area_total
        if ar < MIN_IMG_AREA_RATIO or ar > MAX_IMG_AREA_RATIO:
            continue

        # OCR(선택)
        try:
            ocr_text = ocr_image(cap_path) if cap_path else ""
        except Exception:
            ocr_text = ""

        cand.append({
            "bbox": bbox,
            "path": cap_path,
            "caption_text": cap_text,  # 캡션 비활성화면 "", 사용안함
            "ocr_text": ocr_text
        })
    page_image_aux[p["page"]] = cand

print("images_cap saved:", len(list(IMG_CAP_DIR.glob('*.png'))), "files")
print("tables_cap saved:", len(list(TBL_CAP_DIR.glob('*.png'))), "files")
len(page_image_aux)

images_cap saved: 11 files
tables_cap saved: 7 files


  res = ocr_engine.ocr(path, cls=True)


10

In [242]:
## 6) 매칭 스코어 함수
def bbox_center(b):
    x0,y0,x1,y1 = b
    return ( (x0+x1)/2.0, (y0+y1)/2.0 )

def distance_score(word_bbox, target_bbox):
    wx, wy = bbox_center(word_bbox)
    tx, ty = bbox_center(target_bbox)
    d = ((wx-tx)**2 + (wy-ty)**2) ** 0.5 + 1e-6
    return W_DIST / d

def hint_score(text: str):
    if not text: return 0.0
    t = text.lower()
    return sum(h in t for h in HINTS) * 1.0

def total_image_score(term, word_bbox, img_aux):
    s = 0.0
    s += distance_score(word_bbox, img_aux["bbox"])
    s += W_CAPTION_T * score_contains(img_aux.get("caption_text",""), term)
    s += W_OCR_T     * score_contains(img_aux.get("ocr_text",""), term)
    s += W_BASE_CAP  * hint_score(img_aux.get("caption_text",""))
    s += W_BASE_OCR  * hint_score(img_aux.get("ocr_text",""))
    return s

def total_table_score(term, word_bbox, tbl):
    s = 0.0
    if tbl.get("bbox"):
        s += distance_score(word_bbox, tbl["bbox"])
    s += W_TABLE_T * score_contains(tbl.get("text",""), term)
    return s

In [244]:
## 7) 단어 → top-k(이미지/표) 매칭 인덱스 생성 (모든 페이지 후보 + 캡션 포함 저장본만)
import os, math, json
from pathlib import Path
from datetime import datetime

# ── 안전한 기본값/헬퍼 ─────────────────────────────────────────────────────────
IMAGE_CAP_DIRNAME = globals().get("IMAGE_CAP_DIRNAME", "images_cap")
TABLE_CAP_DIRNAME = globals().get("TABLE_CAP_DIRNAME", "tables_cap")
ALLOWED_IMAGE_DIRS = globals().get("ALLOWED_IMAGE_DIRS", [IMAGE_CAP_DIRNAME])
ALLOWED_TABLE_DIRS = globals().get("ALLOWED_TABLE_DIRS", [TABLE_CAP_DIRNAME])

def _norm(p): 
    return (p or "").replace("\\", "/")

def _in_allowed_dirs(path_str, dirs):
    p = _norm(path_str)
    return any(f"/{d}/" in p for d in dirs)

def _bbox_center(b):
    x0,y0,x1,y1 = b
    return ((x0+x1)/2.0, (y0+y1)/2.0)

def _l2(a,b):
    return math.hypot(a[0]-b[0], a[1]-b[1])

# 가중치(없으면 기본값)
W_DIST      = globals().get("W_DIST", 1000.0)
W_CAPTION_T = globals().get("W_CAPTION_T", 5.0)
W_OCR_T     = globals().get("W_OCR_T", 4.0)
W_TABLE_T   = globals().get("W_TABLE_T", 1.2)
W_BASE_CAP  = globals().get("W_BASE_CAP", 1.0)
W_BASE_OCR  = globals().get("W_BASE_OCR", 1.0)
PAGE_GAP_PENALTY = globals().get("PAGE_GAP_PENALTY", 300.0)
HINTS = [h.lower() for h in globals().get("HINTS", ["figure","fig.","diagram","architecture","table","grid","chart","plot","그림","표"])]

MIN_WORD_LEN = globals().get("MIN_WORD_LEN", 2)
TOP_K = globals().get("TOP_K", 3)

# Step-5 산출물(필수): page_image_aux, tables_by_page
if 'page_image_aux' not in globals():
    raise RuntimeError("page_image_aux 가 없습니다. Step-5(캡션 포함 저장본 생성)를 먼저 실행하세요.")
if 'tables_by_page' not in globals():
    tables_by_page = {}

# ── 전 페이지 후보 풀 구성(캡션 저장본만 허용) ─────────────────────────────────────
all_imgs = []   # each: {page, bbox, path, caption_text, ocr_text}
for pno, items in page_image_aux.items():
    for it in items or []:
        path = it.get("path")
        if not path or not _in_allowed_dirs(path, ALLOWED_IMAGE_DIRS):
            continue
        all_imgs.append({**it, "page": pno})

all_tbls = []   # each: {page, bbox, png, csv, text, caption_text}
for pno, tlist in tables_by_page.items():
    for tb in tlist or []:
        png = tb.get("png")
        if not png or not _in_allowed_dirs(png, ALLOWED_TABLE_DIRS):
            continue
        all_tbls.append({**tb, "page": pno})

print(f"[info] image candidates: {len(all_imgs)} (from {ALLOWED_IMAGE_DIRS})")
print(f"[info] table candidates: {len(all_tbls)} (from {ALLOWED_TABLE_DIRS})")

# ── 스코어러(간단/견고 버전) ────────────────────────────────────────────────────
def _contains(text, term):
    if not text or not term: return False
    return term.lower() in text.lower()

def _hint_bonus(text):
    if not text: return 0.0
    t = text.lower()
    return sum(1.0 for h in HINTS if h in t) * W_BASE_CAP

def image_score(term, word_bbox, wpage, img):
    # 거리(같은 페이지일 때만), 다른 페이지는 0에 가까운 점수 + 페널티
    s = 0.0
    if img.get("page") == wpage and word_bbox and img.get("bbox"):
        d = max(1.0, _l2(_bbox_center(word_bbox), _bbox_center(img["bbox"])))
        s += W_DIST / d
    else:
        # 페이지 차이 페널티 (멀수록 감점)
        gap = abs((img.get("page") or 0) - (wpage or 0))
        s -= PAGE_GAP_PENALTY * gap

    # 캡션/ocr에 term 포함
    if _contains(img.get("caption_text",""), term):
        s += W_CAPTION_T
    if _contains(img.get("ocr_text",""), term):
        s += W_OCR_T

    # 힌트 일반 보너스
    s += _hint_bonus(img.get("caption_text","")) + _hint_bonus(img.get("ocr_text",""))
    return s

def table_score(term, word_bbox, wpage, tb):
    s = 0.0
    if tb.get("page") == wpage and word_bbox and tb.get("bbox"):
        d = max(1.0, _l2(_bbox_center(word_bbox), _bbox_center(tb["bbox"])))
        s += W_DIST / d
    else:
        gap = abs((tb.get("page") or 0) - (wpage or 0))
        s -= PAGE_GAP_PENALTY * gap

    # 표 텍스트/캡션 텍스트에 term 포함
    t_all = (tb.get("caption_text","") + " " + tb.get("text","")).strip()
    if _contains(t_all, term):
        s += W_TABLE_T

    s += _hint_bonus(tb.get("caption_text",""))  # 캡션 힌트
    return s

# ── 인덱스 생성 ────────────────────────────────────────────────────────────────
OUT_DIR.mkdir(parents=True, exist_ok=True)
LINKER_DIR = OUT_DIR / "linker"
LINKER_DIR.mkdir(parents=True, exist_ok=True)

manifest = globals().get("manifest", {"doc_name": DOC_NAME, "pdf": str(PDF_PATH)})

index = {
    "doc": manifest.get("doc_name"),
    "pdf": manifest.get("pdf"),
    "created": datetime.utcnow().isoformat()+"Z",
    "pages": [],
}

for p in pages:
    pno = p["page"]
    words = [w for w in p.get("words", []) if len(w.get("text","")) >= MIN_WORD_LEN]
    page_entry = {
        "page": pno,
        "png":  (p.get("_png",{}) or {}).get("path"),
        "png_w":(p.get("_png",{}) or {}).get("w"),
        "png_h":(p.get("_png",{}) or {}).get("h"),
        "width": p.get("width"),
        "height": p.get("height"),
        "items": []
    }

    for w in words:
        term = w["text"]
        wb = w["bbox"]

        # 이미지 후보 스코어링(모든 페이지)
        scored_imgs = []
        for im in all_imgs:
            try:
                sc = image_score(term, wb, pno, im)
            except Exception:
                sc = 0.0
            scored_imgs.append((sc, im))
        scored_imgs.sort(key=lambda x: x[0], reverse=True)
        top_imgs = [
            {"score": float(s), "path": im.get("path"), "bbox": im.get("bbox")}
            for (s, im) in scored_imgs[:TOP_K]
            if im.get("path")
        ]

        # 표 후보 스코어링(모든 페이지) — ★ png 포함!
        scored_tbls = []
        for tb in all_tbls:
            try:
                sc = table_score(term, wb, pno, tb)
            except Exception:
                sc = 0.0
            scored_tbls.append((sc, tb))
        scored_tbls.sort(key=lambda x: x[0], reverse=True)
        top_tbls = [
            {"score": float(s), "png": tb.get("png"), "csv": tb.get("csv"), "bbox": tb.get("bbox")}
            for (s, tb) in scored_tbls[:TOP_K]
            if tb.get("png")  # png가 있는 표만
        ]

        page_entry["items"].append({
            "word": term,
            "bbox": wb,
            "images": top_imgs,
            "tables": top_tbls
        })

    index["pages"].append(page_entry)

# 저장
INDEX_JSON = LINKER_DIR / "index_linker.json"
with open(INDEX_JSON, "w", encoding="utf-8") as f:
    json.dump(index, f, ensure_ascii=False, indent=2)

print("Saved index →", INDEX_JSON.resolve())
print("pages:", len(index["pages"]), 
      "| words sampled:", sum(len(p["items"]) for p in index["pages"]))
# 간단 검증: 첫 페이지 첫 단어의 후보 개수
if index["pages"] and index["pages"][0]["items"]:
    it0 = index["pages"][0]["items"][0]
    print("[sample] word:", it0["word"], 
          "| imgs:", len(it0["images"]), 
          "| tbls:", len(it0["tables"]))


[info] image candidates: 10 (from ['images_cap'])
[info] table candidates: 7 (from ['tables_cap'])
Saved index → /home/dataplay/workspace/result/test_paper/linker/index_linker.json
pages: 10 | words sampled: 3888
[sample] word: 논문 | imgs: 3 | tbls: 3


In [246]:
## 8) 오버레이 뷰어 HTML 생성 — 경로를 OUT_DIR 기준 상대경로로 정규화
from pathlib import Path
import json as _json_mod
import json

# 필수: CONFIG에서 정의됨
HTML_OUT = OUT_DIR / "viewer_overlay.html"

def to_rel(p: str) -> str | None:
    """OUT_DIR 기준 상대경로로 변환 (없으면 그대로)"""
    if not p:
        return p
    try:
        abs_p = Path(p).resolve()
    except Exception:
        p_str = str(p).replace("\\", "/")
        prefix = str(OUT_DIR.resolve()).replace("\\", "/") + "/"
        return p_str[len(prefix):] if p_str.startswith(prefix) else p_str

    try:
        rel = abs_p.relative_to(OUT_DIR.resolve())
        return str(rel).replace("\\", "/")
    except Exception:
        p_str = str(p).replace("\\", "/")
        prefix = str(OUT_DIR.resolve()).replace("\\", "/") + "/"
        return p_str[len(prefix):] if p_str.startswith(prefix) else p_str

def normalize_paths(idx: dict) -> dict:
    """index_linker.json 내 모든 파일 경로를 문서 루트 기준 상대경로로 변경"""
    for pg in idx.get("pages", []):
        if pg.get("png"):
            pg["png"] = to_rel(pg["png"])
        for it in pg.get("items", []):
            # 이미지
            for im in it.get("images", []):
                if im.get("path"):
                    im["path"] = to_rel(im["path"])
            # 표
            for tb in it.get("tables", []):
                if tb.get("png"):
                    tb["png"] = to_rel(tb["png"])
                if tb.get("csv"):
                    tb["csv"] = to_rel(tb["csv"])
    return idx

def _make_overlay_template(index):
    html_lines = [
"<!doctype html>",
"<html>",
"<head>",
"  <meta charset='utf-8'/>",
"  <title>PDF Overlay Viewer</title>",
"  <style>",
"    body { margin:0; font-family:system-ui, sans-serif; }",
"    .wrap { display:flex; height:100vh; }",
"    .left { width:70%; border-right:1px solid #ddd; padding:12px; box-sizing:border-box; overflow:auto; }",
"    .right { flex:1; padding:12px; overflow:auto; }",
"    .canvas { position:relative; background-size:100% 100%; background-repeat:no-repeat; background-position:top left; }",
"",
"    /* 버튼 기본: 완전 투명 (배경/보더 없음) */",
"    .wbtn {",
"      position:absolute;",
"      background: transparent;",
"      border: 1px solid transparent;",
"      cursor: pointer;",
"      outline: none;",
"      -webkit-user-select: none;",
"      -moz-user-select: none;",
"      user-select: none;",
"      -webkit-tap-highlight-color: transparent;",
"    }",
"    /* hover 시 얇은 윤곽선만 보이게 */",
"    .wbtn:hover {",
"      background: transparent;",
"      border-color: rgba(0,0,0,0.25);",
"      box-shadow: 0 0 0 2px rgba(0,0,0,0.06) inset;",
"    }",
"",
"    .placeholder { color:#888; }",
"    select { padding:6px 8px; }",
"    .popover{ position:absolute; max-width:45%; background:#fff; border:1px solid #ddd; border-radius:10px;",
"              box-shadow:0 10px 30px rgba(0,0,0,.15); padding:10px; z-index:10; }",
"    .popover img{ max-width:100%; height:auto; display:block; }",
"    .closepop{ position:absolute; top:4px; right:8px; cursor:pointer; opacity:.6; }",
"  </style>",
"</head>",
"<body>",
"  <div class='wrap'>",
"    <div class='left'>",
"      <div style=\"display:flex;gap:8px;align-items:center;margin-bottom:8px;\">",
"        <label for=\"pageSel\">Page:</label>",
"        <select id=\"pageSel\"></select>",
"      </div>",
"      <div id=\"stage\"></div>",
"    </div>",
"    <div class='right'>",
"      <h3>Matches</h3>",
"      <div id='matches' class='cards'><p class='placeholder'>단어를 클릭하세요.</p></div>",
"    </div>",
"  </div>",
"  <script>",
"    const data = __INDEX_JSON__;",
"    const stage = document.getElementById('stage');",
"    const sel = document.getElementById('pageSel');",
"    const matches = document.getElementById('matches');",
"    (function(){",
"      const frag = document.createDocumentFragment();",
"      for (const p of data.pages) {",
"        const opt = document.createElement('option');",
"        opt.value = String(p.page);",
"        opt.textContent = 'Page ' + (p.page+1);",
"        frag.appendChild(opt);",
"      }",
"      sel.appendChild(frag);",
"    })();",
"    function makeOverlayHTML(p){",
"      if(!(p.png && p.png_w && p.png_h && p.width && p.height)) {",
"        return '<p class=\"placeholder\">No PNG available.</p>';",
"      }",
"      const sx = p.png_w / p.width; const sy = p.png_h / p.height;",
"      let btns = '';",
"      for (const it of p.items) {",
"        const b = it.bbox;",
"        const l = Math.max(0, Math.round(b[0]*sx));",
"        const t = Math.max(0, Math.round(b[1]*sy));",
"        const w = Math.max(1, Math.round((b[2]-b[0])*sx));",
"        const h = Math.max(1, Math.round((b[3]-b[1])*sy));",
"        const payloadStr = JSON.stringify({word: it.word, images: it.images, tables: it.tables})",
"                           .replaceAll('\"','&quot;');",
"        btns += `<button class=\\\"wbtn\\\" style=\\\"left:${l}px;top:${t}px;width:${w}px;height:${h}px\\\" data-left=${l} data-top=${t} data-width=${w} data-height=${h} data-payload=\\\"${payloadStr}\\\" title=\\\"${it.word}\\\"></button>`;",
"      }",
"      return `<div class=\\\"canvas\\\" id=\\\"canvas\\\" style=\\\"width:${p.png_w}px;height:${p.png_h}px;background-image:url('${p.png}');\\\">${btns}</div>`;",
"    }",
"    function renderPage(pno){",
"      const p = data.pages.find(x => x.page == pno);",
"      if(!p){ stage.innerHTML = '<p class=\\\"placeholder\\\">No page</p>'; return; }",
"      stage.innerHTML = makeOverlayHTML(p);",
"      wireButtons();",
"      matches.innerHTML = '<p class=\"placeholder\">단어를 클릭하세요.</p>';",
"    }",
"    function closePopover(){ const old = document.querySelector('.popover'); if(old) old.remove(); }",
"    function showPopover(btn, payload){",
"      closePopover();",
"      const canvas = document.getElementById('canvas');",
"      const pop = document.createElement('div');",
"      pop.className = 'popover';",
"      pop.innerHTML = `<div class='closepop' title='close'>&times;</div>`;",
"      let html = '';",
"      const im = (payload.images||[]).find(x => x.path);",
"      if(im){ html += `<img src='${im.path}' alt='match'/>`; }",
"      else { const tb = (payload.tables||[]).find(x => x.png); if(tb){ html += `<img src='${tb.png}' alt='table'/>`; } else { html += `<div class='placeholder'>연결된 이미지/표 없음</div>`; } }",
"      pop.insertAdjacentHTML('beforeend', html);",
"      canvas.appendChild(pop);",
"      const l = parseInt(btn.dataset.left), t = parseInt(btn.dataset.top), w = parseInt(btn.dataset.width);",
"      const pad = 8, canvasW = canvas.clientWidth, popW = Math.min(Math.round(canvasW*0.45), 600);",
"      pop.style.width = popW + 'px';",
"      let left = l + w + pad; if (left + popW > canvasW) left = Math.max(0, l - popW - pad);",
"      pop.style.left = left + 'px'; pop.style.top  = t + 'px';",
"      pop.querySelector('.closepop').addEventListener('click', closePopover);",
"    }",
"    function wireButtons(){",
"      document.querySelectorAll('.wbtn').forEach(btn => {",
"        btn.addEventListener('click', () => {",
"          const payload = JSON.parse(btn.dataset.payload.replaceAll('&quot;','\\\"'));",
"          const im = (payload.images||[]).find(x => x.path);",
"          const tb = (payload.tables||[]).find(x => x.png);",
"          let rightHtml = '';",
"          if (im) rightHtml = `<div><img src='${im.path}' style='max-width:100%'/></div>`;",
"          else if (tb) rightHtml = `<div><img src='${tb.png}' style='max-width:100%'/></div>`;",
"          else rightHtml = `<div class='placeholder'>연결된 이미지/표 없음</div>`;",
"          matches.innerHTML = rightHtml;",
"          showPopover(btn, payload);",
"        });",
"      });",
"    }",
"    sel.addEventListener('change', () => { renderPage(parseInt(sel.value)); });",
"    renderPage(parseInt(sel.value || 0));",
"  </script>",
"</body>",
"</html>"
    ]
    return "\n".join(html_lines)

# 인덱스 로드 → 경로 정규화 → HTML 생성
INDEX_JSON = OUT_DIR / "linker" / "index_linker.json"
if not INDEX_JSON.exists():
    INDEX_JSON = OUT_DIR / "index_linker.json"

with open(INDEX_JSON, 'r', encoding='utf-8') as f:
    _idx_raw = json.load(f)

_idx = normalize_paths(_idx_raw)   # ★ 경로 정리

_tmpl = _make_overlay_template(_idx)
_html = _tmpl.replace('__INDEX_JSON__', _json_mod.dumps(_idx))

with open(HTML_OUT, 'w', encoding='utf-8') as f:
    f.write(_html)

print("Viewer saved →", HTML_OUT.resolve())


Viewer saved → /home/dataplay/workspace/result/test_paper/viewer_overlay.html
