In [None]:
# !pip install pdfplumber==0.11.0 pdfminer.six==20240706
# pip install pdfplumber pandas XlsxWriter

In [None]:
import re
import pdfplumber
import pandas as pd
import numpy as np
from typing import Any, Dict, List, Tuple

PDF_PATH = "별표 2_건설공사 품질시험기준.pdf"
OUT_XLSX = "parsed_tables_all_pages_with_ordered_triggers.xlsx"
OUT_csv = "parsed_tables_all_pages_with_ordered_triggers.csv"


# 요청 스키마

COLS = ["페이지","공종","종별","종별_상세1",
        "시험종목_1","시험종목_2","시험종목_3",
        "시험방법","시험빈도","비고"]

def norm(s: Any) -> str:
    s = "" if s is None else str(s)
    s = s.replace("\r","").replace("\t"," ")
    parts = [re.sub(r"[ ]{2,}", " ", p.strip()) for p in s.split("\n")]
    return "\n".join(parts)

def squash_cjk_spaces(s: str) -> str:
    # 트리거 판정용: 한글-한글 사이 공백/줄바꿈 제거 → '반\n입' → '반입'
    return re.sub(r'([가-힣])\s+([가-힣])', r'\1\2', s)

# ===== 공종 전환 트리거(정해진 순서) =====
PAREN_L = r'[\(\uFF08]'  # ( 또는 전각（
PAREN_R = r'[\)\uFF09]'  # ) 또는 전각）

# ===== 공종 전환 트리거 (정해진 순서, 처음 등장 시점부터 적용) =====
TRIGGER_FLOW: List[Tuple[str, re.Pattern]] = [
    # 공통
    ("토공사 및 기초공사", re.compile(r"성토용\s*흙")),
    ("철근콘크리트공사",   re.compile(r"콘크리트용\s*골재(?:\s*\(\s*KS\s*F\s*2527\s*\))?")),
    ("철강구조물공사", re.compile(
        r"강재\s*"+PAREN_L+r"\s*용접부\s*반?\s*입\s*검사\s*"+PAREN_R)),
    ("가설기자재",         re.compile(r"강재\s*파이프\s*서포트|강재\s*파이프서포트")),
    ("기타",               re.compile(r"석재")),
    # 토목
    ("도로공사",           re.compile(r"노체")),
    ("수공구조물공사",     re.compile(r"(흙댐|용수로|배수로)(?=.*일반성토.*표토)?")),
    # 건축
    ("조적공사",           re.compile(r"콘크리트\s*벽돌(?:\s*\(\s*KS\s*F\s*4004\s*\))?")),
    ("방수공사",           re.compile(r"시멘트계\s*액체형\s*방수제")),
]

# 허용 컬럼(여기에서만 트리거 검사; 비고/빈도 등은 제외)
WHITELIST_FIELDS = ["종별","종별_상세1","시험종목_1","시험종목_2","시험종목_3"]

# ===== 표 추출: 기본 extract_tables()만 =====
def extract_tables_single(page):
    try:
        return page.extract_tables() or []
    except Exception:
        return []

def advance_gongjong(current_idx: int, fields_text: str) -> int:
    """
    현재 공종 인덱스(current_idx)에서, 허용 컬럼 텍스트 fields_text를 기준으로
    앞으로 나아갈 수 있는 가장 앞선(가장 가까운) 트리거를 찾으면 그 위치로 점프.
    뒤로 가거나 동일 인덱스로 회귀하지 않음.
    """
    for i in range(current_idx + 1, len(TRIGGER_FLOW)):
        _, pat = TRIGGER_FLOW[i]
        if pat.search(fields_text):
            return i
    return current_idx

def parse_tables_from_page(page, page_no: int, current_idx: int) -> Tuple[pd.DataFrame, int]:
    frames = [pd.DataFrame(t) for t in extract_tables_single(page)]
    records: List[Dict[str,str]] = []

    for df in frames:
        if df.empty:
            continue

        # 1) 헤더 행 탐지
        header_idx = None
        for r in range(min(4, len(df))):
            row_vals = " ".join(map(str, df.iloc[r].tolist()))
            if any(k in row_vals for k in ("시험방법","시험종목","종별","시험빈도","비고")):
                header_idx = r; break
        if header_idx is None:
            continue

        headers = [str(x) for x in df.iloc[header_idx].tolist()]
        # 2) 열 인덱스 매핑
        idx_map: Dict[str,int] = {}
        for c, name in enumerate(headers):
            n = str(name)
            if "종별" in n:       idx_map["종별"] = c
            if "시험종목" in n:   idx_map["시험종목"] = c
            if "시험방법" in n:   idx_map["시험방법"] = c
            if "시험빈도" in n:   idx_map["시험빈도"] = c
            if "비고"   in n:     idx_map["비고"]   = c

        used = set(idx_map.values())
        all_idx = list(range(len(headers)))
        remain = [i for i in all_idx if i not in used]

        idx_jong      = idx_map.get("종별", -1)
        idx_item_main = idx_map.get("시험종목", -1)

        # 종별_상세1: 종별 오른쪽이면서 시험종목 왼쪽인 첫 열
        jong_detail_idx = -1
        if idx_jong != -1:
            cand = [i for i in remain if (idx_item_main == -1 or i < idx_item_main) and i > idx_jong]
            if cand:
                jong_detail_idx = cand[0]
                remain.remove(jong_detail_idx)

        method_idx = idx_map.get("시험방법", -1)
        freq_idx   = idx_map.get("시험빈도", -1)
        note_idx   = idx_map.get("비고", -1)

        # 시험종목_1~3 배분
        item1_idx = idx_item_main if idx_item_main is not None else -1
        ban = {method_idx, freq_idx, note_idx, idx_jong, jong_detail_idx}
        ban.discard(-1)
        other_for_item = [i for i in remain if i not in ban]
        item2_idx = other_for_item[0] if len(other_for_item) > 0 else -1
        item3_idx = other_for_item[1] if len(other_for_item) > 1 else -1

        prev_jong, prev_freq = "", ""

        # 3) 데이터 행
        for r in range(header_idx+1, len(df)):
            row = df.iloc[r].tolist()
            get = lambda i: norm(row[i]) if (i!=-1 and i < len(row)) else ""

            jong   = get(idx_jong)
            jong_d = get(jong_detail_idx)
            item1  = get(item1_idx)
            item2  = get(item2_idx)
            item3  = get(item3_idx)
            mtd    = get(method_idx)
            freq   = get(freq_idx)
            note   = get(note_idx)

            # 완전 빈 행 스킵
            if not any([jong, jong_d, item1, item2, item3, mtd, freq, note]):
                continue

            # 표 블록 내 전방 채움(종별/빈도만)
            if not jong: jong = prev_jong
            if not freq: freq = prev_freq
            if jong: prev_jong = jong
            if get(freq_idx): prev_freq = freq

            # === 공종 전환: 허용 컬럼 텍스트로만 검사 ===
            # 비고/빈도는 포함하지 않음 (오탐 방지)
            fields_text = " ".join([
                norm(jong), norm(jong_d), norm(item1), norm(item2), norm(item3)
            ])
            new_idx = advance_gongjong(current_idx, fields_text)
            current_idx = new_idx  # 필요 시 앞으로 전진

            current_gj = TRIGGER_FLOW[current_idx][0]

            records.append(dict(
                페이지=page_no, 공종=current_gj, 종별=jong, 종별_상세1=jong_d,
                시험종목_1=item1, 시험종목_2=item2, 시험종목_3=item3,
                시험방법=mtd, 시험빈도=freq, 비고=note
            ))

    if not records:
        return pd.DataFrame(columns=COLS), current_idx

    out = pd.DataFrame(records)
    out = out[COLS].drop_duplicates(keep="first").reset_index(drop=True)
    return out, current_idx

# === (1) 보조 정규식/파서: 베이스/KS 분해 ===
KS_RE     = re.compile(r"\(\s*KS\s*[A-Z]\s*\d{3,5}\s*\)")
KS_ONLYRE = re.compile(r"^\(\s*KS\s*[A-Z]\s*\d{3,5}\s*\)$")

def _parse_base_ks(s: str):
    s = (str(s) if s is not None else "").strip()
    if not s:
        return "", ""
    m = KS_RE.search(s)
    if m:
        base = (s[:m.start()] + s[m.end():]).strip()
        return base, m.group(0)
    if KS_ONLYRE.fullmatch(s):
        return "", s
    return s, ""

# === (2) 페이지 경계 전후로 끊어진 종별을 병합/통일 ===
def unify_jong_by_page_split(df: pd.DataFrame, col: str = "종별") -> pd.DataFrame:
    # 전제: df 인덱스는 0..N-1 연속, '페이지' 컬럼 존재
    pages = df["페이지"].tolist()
    vals  = df[col].tolist()
    n = len(df)
    if n == 0:
        return df

    def scan_left_block(end_idx):
        """end_idx에서 위로 올라가며 같은 base를 가진 연속 블록 구간 [lo, hi]와 base/ks를 반환"""
        hi = end_idx
        b, k = _parse_base_ks(vals[hi])
        base_ref = b if b else None
        lo = hi
        while lo-1 >= 0:
            b2, k2 = _parse_base_ks(vals[lo-1])
            # base가 있으면 base가 같을 때만 같은 블록으로 간주
            if base_ref is not None:
                if b2 == base_ref:
                    lo -= 1
                else:
                    break
            else:
                # base가 없으면(=KS-only 또는 빈값) → 같은 "무-base"가 연속인 구간까지만
                if b2 == "":
                    lo -= 1
                else:
                    break
        # 기준 base는 블록 끝의 base(가능하면 문자열)로
        b, k = _parse_base_ks(vals[end_idx])
        return lo, hi, (b or ""), k

    def scan_right_block(start_idx, expect_ks_only=False, expect_base=None):
        """
        start_idx에서 아래로 내려가며:
         - expect_ks_only=True 이면 KS-only 연속 구간을 수집
         - expect_base=문자열 이면 해당 base가 연속되는 구간을 수집
        """
        lo = start_idx
        hi = start_idx
        while hi+1 < n:
            b2, k2 = _parse_base_ks(vals[hi+1])
            if expect_ks_only:
                if b2 == "" and k2:
                    hi += 1
                else:
                    break
            elif expect_base is not None:
                if b2 == expect_base:
                    hi += 1
                else:
                    break
            else:
                break
        b, k = _parse_base_ks(vals[start_idx])
        return lo, hi, (b or ""), k

    # 경계 인덱스(새 페이지의 첫 행들)
    boundaries = [i for i in range(1, n) if pages[i] != pages[i-1]]

    for b in boundaries:
        left_end   = b - 1
        right_start = b

        # 왼쪽 블록 & 오른쪽 블록 상황 파악
        l_lo, l_hi, l_base, l_ks = scan_left_block(left_end)
        r_b, r_k = _parse_base_ks(vals[right_start])

        # 케이스 A) 왼쪽은 base 있고, 오른쪽은 KS-only → 양쪽을 base+KS로 통일
        if l_base and (r_b == "" and r_k):
            # 오른쪽에서 KS-only 연속 구간 찾기
            r_lo, r_hi, _, _ = scan_right_block(right_start, expect_ks_only=True)
            combo = f"{l_base}{r_k}"
            for i in range(l_lo, l_hi+1):
                cb, ck = _parse_base_ks(vals[i])
                vals[i] = combo
            for i in range(r_lo, r_hi+1):
                vals[i] = combo
            continue

        # 케이스 B) 왼쪽이 KS-only, 오른쪽은 base → 양쪽을 base+KS로 통일
        if (l_base == "" and l_ks) and r_b:
            # 왼쪽에서 KS-only 연속 구간 확장
            # (scan_left_block는 끝점을 기준으로 base가 없는 연속만 포함하므로 lo..hi 그대로 사용)
            r_lo, r_hi, r_base, _ = scan_right_block(right_start, expect_base=r_b)
            combo = f"{r_b}{l_ks}"
            for i in range(l_lo, l_hi+1):
                vals[i] = combo
            for i in range(r_lo, r_hi+1):
                vals[i] = combo
            continue

        # 그 외: 아무 것도 하지 않음 (이미 base+KS이거나, 두 쪽 모두 base-only 등)

    df[col] = vals
    return df

# === (3) 같은 종별 구간 내에서 상세1·시험종목_1 빈값을 위 행 값으로 채움 ===
def fill_details_within_same_jong(df: pd.DataFrame,
                                  jong_col: str = "종별",
                                  detail_cols = ("종별_상세1", "시험종목_1","시험방법")) -> pd.DataFrame:
    """
    같은 '종별'이 연속되는 구간에서만 detail_cols의 빈값을
    바로 위 행 값으로 채운다. (in-place 성격, df를 반환)
    순차 스캔 방식.
    """
    def _empty(x):
        return pd.isna(x) or str(x).strip() == ""

    if df.empty or jong_col not in df.columns:
        return df

    for i in range(1, len(df)):
        cur_j = df.at[i, jong_col] if jong_col in df.columns else ""
        prv_j = df.at[i-1, jong_col] if jong_col in df.columns else ""
        if (not _empty(cur_j)) and (not _empty(prv_j)) and (str(cur_j).strip() == str(prv_j).strip()):
            for col in detail_cols:
                if col in df.columns and _empty(df.at[i, col]):
                    df.at[i, col] = df.at[i-1, col]
    return df


In [13]:

# ===== 실행: 문서 전체 =====
def main():
    all_frames = []
    with pdfplumber.open(PDF_PATH) as pdf:
        # 시작 공종은 무조건 "토공사 및 기초공사"
        current_idx = 0  # TRIGGER_FLOW[0] = 토공사 및 기초공사
        for i, page in enumerate(pdf.pages, start=1):
            dfp, current_idx = parse_tables_from_page(page, i, current_idx)
            if not dfp.empty:
                all_frames.append(dfp)

    if all_frames:
        merged = pd.concat(all_frames, ignore_index=True)
        # 전역 중복 제거(심플): 전체 컬럼 기준
        merged = merged.drop_duplicates(keep="first").reset_index(drop=True)

        # 줄바꿈을 띄워쓰기로 변환
        merged = merged.applymap(lambda x: str(x).replace("\n", " ") if pd.notna(x) else x)

        #페이지 경계에서 끊어진 '종별' 병합/통일
        merged = unify_jong_by_page_split(merged, col="종별")
        
        # 종별 빈값을 위 행 값으로 채우기
        merged["종별"] = merged["종별"].replace(r'^\s*$', pd.NA, regex=True).ffill()

        # 종별이 같을 때, 상세1·시험종목_1 컬럼 빈값을 위 행 값으로 채움
        merged = fill_details_within_same_jong(merged, jong_col="종별", detail_cols=("종별_상세1", "시험종목_1", "시험방법"))

    else:
        merged = pd.DataFrame(columns=COLS)

    # 저장
    merged.to_excel(OUT_XLSX, index=False, engine="xlsxwriter")
    merged.to_csv(OUT_csv, index=False, encoding="utf-8-sig")

    print(f"[DONE] {OUT_XLSX} (rows={len(merged)})")
    print(f"[DONE] {OUT_csv} (rows={len(merged)})")

    return merged   # DataFrame 반환


In [14]:
if __name__ == "__main__":
    df_final = main()
    # 마지막에 DataFrame 출력
    pd.set_option("display.max_rows", 200)   # 필요시 행 제한 늘리기
    pd.set_option("display.max_columns", None)
    pd.set_option("display.width", None)
    df_final


[DONE] parsed_tables_all_pages_with_ordered_triggers.xlsx (rows=1852)
[DONE] parsed_tables_all_pages_with_ordered_triggers.csv (rows=1852)


In [6]:
df_final

Unnamed: 0,페이지,공종,종별,종별_상세1,시험종목_1,시험종목_2,시험종목_3,시험방법,시험빈도,비고
0,1,토공사 및 기초공사,성토용 흙,,함수비,,,KS F 2306,·토취장마다 ·재질변화시마다,
1,1,토공사 및 기초공사,성토용 흙,,입도,,,KS F 2302,·토취장마다 ·재질변화시마다,
2,1,토공사 및 기초공사,성토용 흙,,세립토 비율,,,KS F 2309,·토취장마다 ·재질변화시마다,
3,1,토공사 및 기초공사,성토용 흙,,밀 도,,,KS F 2308,·토취장마다 ·재질변화시마다,
4,1,토공사 및 기초공사,성토용 흙,,액성한계·소성한계,,,KS F 2303,·토취장마다 ·재질변화시마다,
...,...,...,...,...,...,...,...,...,...,...
1847,52,방수공사,초산비닐수지 에멀션목재 접착제 (KS M 3700),,겉모양,,,KS M 3704,·제조회사별 ·제품규격별,
1848,52,방수공사,초산비닐수지 에멀션목재 접착제 (KS M 3700),,점도,,,,·제조회사별 ·제품규격별,
1849,52,방수공사,초산비닐수지 에멀션목재 접착제 (KS M 3700),,회분,,,,·제조회사별 ·제품규격별,
1850,52,방수공사,초산비닐수지 에멀션목재 접착제 (KS M 3700),,pH,,,KS M 3705,·제조회사별 ·제품규격별,
