In [82]:
import preprocess.pp_v4 as pp
from preprocess.pp_basic import BASE_DIR, RAW_DIR, DATA_DIR, docs

In [83]:
import re
import os
import json
from pathlib import Path
import pdfplumber

In [84]:
PDF_PATH = str(docs[0])

## 표 데이터 추출 이쁘게

In [85]:
import pandas as pd

# import docling.utils.model_downloader
# docling.utils.model_downloader.download_models()
from docling.datamodel.base_models import InputFormat
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import EasyOcrOptions, PdfPipelineOptions, TableFormerMode

pipeline_options = PdfPipelineOptions(do_table_structure=True)
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE  # use more accurate TableFormer model
pipeline_options.table_structure_options.do_cell_matching = True  # uses text cells predicted from table structure model

doc_converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
    }
)

doc = doc_converter.convert(PDF_PATH).document

In [86]:
new_dict = dict()
for tuple_ in list(doc):
    new_dict[tuple_[0]] = tuple_[1]

In [87]:
# 헬퍼 함수 정의
def export_table_to_markdown(table):
    """테이블을 마크다운으로 변환"""
    if hasattr(table, 'export_to_markdown'):
        return table.export_to_markdown()
    return "[테이블 변환 불가]"

def calculate_distance(bbox1, bbox2):
    """두 bbox 중심점 간 거리 계산"""
    center1_x = (bbox1.l + bbox1.r) / 2
    center1_y = (bbox1.t + bbox1.b) / 2
    center2_x = (bbox2.l + bbox2.r) / 2
    center2_y = (bbox2.t + bbox2.b) / 2
    
    return ((center1_x - center2_x)**2 + (center1_y - center2_y)**2)**0.5

def is_nearby(bbox1, bbox2, window_size=50):
    """두 bbox가 가까운지 확인"""
    distance = calculate_distance(bbox1, bbox2)
    return distance < window_size

def count_by_label(label_name):
    """특정 라벨의 개수 세기"""
    return sum(1 for text in new_dict['texts'] 
               if hasattr(text, 'label') and str(text.label) == label_name)


def extract_hierarchical_structure(doc):
    """섹션 → 서브섹션 → 문단 → 리스트 계층 보존"""
    
    sections = []
    current_section = None
    
    for text in new_dict['texts']:
        if text.label == 'section_header':
            # 새 섹션 시작
            if current_section:
                sections.append(current_section)
            
            current_section = {
                'title': text.text,
                'page': text.prov[0].page_no,
                'content': [],
                'tables': [],
                'images': []
            }
        
        elif text.label == 'text':
            # 본문 텍스트
            if current_section:
                current_section['content'].append({
                    'text': text.text,
                    'page': text.prov[0].page_no,
                    'bbox': text.prov[0].bbox
                })
        
        elif text.label == 'list_item':
            # 리스트 항목
            if current_section:
                current_section['content'].append({
                    'text': f"- {text.text}",
                    'type': 'list_item',
                    'page': text.prov[0].page_no
                })
    
    # 테이블을 섹션에 매핑 (페이지 번호 기반)
    for i, (key, table) in enumerate(new_dict['tables'].items()):
        table_page = table['prov'][0].page_no
        
        # 해당 페이지의 섹션 찾기
        for section in sections:
            if section['page'] <= table_page:
                section['tables'].append({
                    'table_id': i,
                    'page': table_page,
                    'markdown': export_table_to_markdown(table)
                })
    
    return sections
def extract_with_spatial_context(doc, target_bbox, window_size=50):
    """특정 위치 주변의 텍스트/테이블 추출"""
    
    context = {
        'texts': [],
        'tables': [],
        'images': []
    }
    
    # 같은 페이지의 근처 텍스트
    for text in new_dict['texts']:
        if text.prov:
            text_bbox = text.prov[0].bbox
            
            # BBox 거리 계산
            if is_nearby(text_bbox, target_bbox, window_size):
                context['texts'].append({
                    'text': text.text,
                    'label': text.label,
                    'distance': calculate_distance(text_bbox, target_bbox)
                })
    
    # 거리순 정렬
    context['texts'].sort(key=lambda x: x['distance'])
    
    return context
def create_smart_chunks(doc, chunk_strategy='semantic'):
    """라벨 인식 기반 의미 단위 청킹"""
    
    chunks = []
    
    if chunk_strategy == 'semantic':
        # 섹션 헤더로 구분
        current_chunk = []
        
        for text in new_dict['texts']:
            if text.label == 'section_header':
                # 이전 청크 저장
                if current_chunk:
                    chunks.append({
                        'content': '\n'.join(current_chunk),
                        'type': 'section'
                    })
                current_chunk = [f"# {text.text}"]
            
            elif text.label in ['text', 'list_item']:
                current_chunk.append(text.text)
            
            elif text.label in ['page_header', 'page_footer']:
                # 노이즈 제거
                continue
    
    elif chunk_strategy == 'fixed_size':
        # 토큰 수 기반 (라벨은 메타데이터로)
        pass
    
    return chunks
def generate_multimodal_metadata(doc):
    """RAG 인덱싱용 풍부한 메타데이터"""
    
    metadata = {
        'document': {
            'filename': new_dict['name'],
            'pages': len(new_dict['pages']),
            'sections': count_by_label('section_header'),
            'tables': len(new_dict['tables']),
            'images': len(new_dict['pictures'])
        },
        
        'content_map': {
            'sections': [],
            'tables': [],
            'images': []
        },
        
        'search_index': []
    }
    
    # 섹션별 인덱스
    for text in new_dict['texts']:
        if text.label == 'section_header':
            metadata['content_map']['sections'].append({
                'title': text.text,
                'page': text.prov[0].page_no,
                'bbox': text.prov[0].bbox
            })
    
    # 테이블 인덱스 (내용 프리뷰 포함)
    for i, (key, table) in enumerate(new_dict['tables'].items()):
        preview_cells = [
            cell.text for cell in table['data'].table_cells[:5]
        ]
        
        metadata['content_map']['tables'].append({
            'table_id': i,
            'page': table['prov'][0].page_no,
            'dimensions': f"{table['data'].num_rows}x{table['data'].num_cols}",
            'preview': ' | '.join(preview_cells),
            'bbox': table['prov'][0].bbox
        })
    
    # 검색 인덱스 (플랫 구조)
    for text in new_dict['texts']:
        if text.label not in ['page_header', 'page_footer']:
            metadata['search_index'].append({
                'content': text.text,
                'page': text.prov[0].page_no if text.prov else None,
                'type': text.label,
                'context': 'text'
            })
    
    # 테이블 행도 검색 가능하게
    for i, (key, table) in enumerate(new_dict['tables'].items()):
        for row_idx, row in enumerate(table['data'].grid):
            metadata['search_index'].append({
                'content': ' | '.join([cell.text for cell in row]),
                'page': table['prov'][0].page_no,
                'type': 'table_row',
                'context': f'table_{i}_row_{row_idx}'
            })
    
    return metadata


In [88]:
import json
from collections import Counter, defaultdict

# =============================================================================
# 헬퍼 함수들
# =============================================================================

def resolve_ref(doc, ref_string):
    """RefItem을 실제 객체로 변환"""
    if not ref_string or not ref_string.startswith('#/'):
        return None
    
    parts = ref_string.split('/')
    if len(parts) < 3:
        return None
    
    collection = parts[1]
    idx = int(parts[2])
    
    try:
        if collection == 'texts':
            return doc.texts[idx]
        elif collection == 'groups':
            return doc.groups[idx]
        elif collection == 'tables':
            return doc.tables[idx]
        elif collection == 'pictures':
            return doc.pictures[idx]
    except (IndexError, AttributeError):
        return None
    
    return None

def safe_export_markdown(table):
    """테이블을 마크다운으로 안전하게 변환"""
    try:
        if hasattr(table, 'export_to_markdown'):
            # 메서드 호출!
            result = table.export_to_markdown()
            # 문자열 확인
            if isinstance(result, str):
                return result
    except Exception as e:
        print(f"Warning: 테이블 마크다운 변환 실패 - {e}")
    return None

def validate_json_serializable(obj, path="root"):
    """JSON 직렬화 가능한지 재귀적으로 검증"""
    if isinstance(obj, (str, int, float, bool, type(None))):
        return True
    elif isinstance(obj, dict):
        for key, value in obj.items():
            if not validate_json_serializable(value, f"{path}.{key}"):
                print(f"❌ 직렬화 불가: {path}.{key} = {type(value)}")
                return False
        return True
    elif isinstance(obj, list):
        for i, item in enumerate(obj):
            if not validate_json_serializable(item, f"{path}[{i}]"):
                return False
        return True
    else:
        print(f"❌ 직렬화 불가 타입: {path} = {type(obj)}")
        return False

def count_by_label(label_name):
    """특정 라벨의 개수 세기"""
    return sum(1 for text in new_dict['texts'] 
               if hasattr(text, 'label') and str(text.label) == label_name)

# =============================================================================
# Part 1: 기본 정보 및 통계 (플랫 접근)
# =============================================================================

print("="*80)
print("Part 1: 문서 기본 정보 및 통계")
print("="*80)

# 1-1. new_dict 구조
print("\n=== new_dict 구조 ===")
for key, value in new_dict.items():
    if isinstance(value, dict):
        print(f"{key}: dict ({len(value)} items)")
    elif isinstance(value, list):
        print(f"{key}: list ({len(value)} items)")
    else:
        print(f"{key}: {type(value).__name__}")

# 1-2. 문서 메타데이터
print("\n=== 문서 메타데이터 ===")
print(f"파일명: {new_dict['name']}")
print(f"스키마: {new_dict['schema_name']}")
print(f"버전: {new_dict['version']}")
print(f"총 페이지: {len(new_dict['pages'])}")
print(f"총 텍스트 요소: {len(new_dict['texts'])}")
print(f"총 테이블: {len(new_dict['tables'])}")
print(f"총 이미지: {len(new_dict['pictures'])}")
print(f"총 그룹: {len(new_dict['groups'])}")

# 2. 텍스트 라벨 분석
print("\n=== 텍스트 라벨 분포 ===")
labels = [str(text.label) for text in new_dict['texts'] if hasattr(text, 'label')]
label_counts = Counter(labels)

for label, count in label_counts.most_common():
    print(f"{label}: {count}")

# 3. 노이즈 제거
exclude_labels = ['page_header', 'page_footer', 'checkbox_unselected']
clean_texts = [
    t for t in new_dict['texts'] 
    if hasattr(t, 'label') and str(t.label) not in exclude_labels
]

print(f"\n=== 노이즈 제거 ===")
print(f"원본 텍스트: {len(new_dict['texts'])}")
print(f"정제된 텍스트: {len(clean_texts)}")
print(f"제거된 텍스트: {len(new_dict['texts']) - len(clean_texts)}")

# =============================================================================
# Part 2: 계층 구조 추출 (Body 기반)
# =============================================================================

print("\n" + "="*80)
print("Part 2: 계층 구조 추출")
print("="*80)

def extract_hierarchical_document(doc):
    """Body → Groups/Sections → Text/Lists/Tables 완전한 계층 구조 추출"""
    
    document_structure = {
        'metadata': {
            'filename': doc.name,
            'pages': len(doc.pages),
            'schema': doc.schema_name,
            'version': doc.version
        },
        'content': []
    }
    
    for child_ref in doc.body.children:
        if not hasattr(child_ref, 'cref'):
            continue
        
        child = resolve_ref(doc, child_ref.cref)
        if not child:
            continue
        
        child_type = type(child).__name__
        
        if child_type == 'SectionHeaderItem':
            section = {
                'type': 'section',
                'title': child.text,
                'page': child.prov[0].page_no if child.prov else None,
                'content': []
            }
            document_structure['content'].append(section)
        
        elif child_type == 'TextItem':
            if hasattr(child, 'label') and str(child.label) not in ['page_header', 'page_footer']:
                text_item = {
                    'type': 'text',
                    'content': child.text,
                    'page': child.prov[0].page_no if child.prov else None,
                    'label': str(child.label)
                }
                
                if document_structure['content'] and document_structure['content'][-1]['type'] == 'section':
                    document_structure['content'][-1]['content'].append(text_item)
                else:
                    document_structure['content'].append(text_item)
        
        elif child_type == 'ListGroup':
            list_group = {
                'type': 'list',
                'items': [],
                'enumerated': bool(getattr(child, 'first_item_is_enumerated', False))  # bool로 변환
            }
            
            for list_child_ref in child.children:
                if hasattr(list_child_ref, 'cref'):
                    list_child = resolve_ref(doc, list_child_ref.cref)
                    if list_child and hasattr(list_child, 'text'):
                        list_group['items'].append({
                            'text': list_child.text,
                            'page': list_child.prov[0].page_no if hasattr(list_child, 'prov') and list_child.prov else None
                        })
            
            if document_structure['content'] and document_structure['content'][-1]['type'] == 'section':
                document_structure['content'][-1]['content'].append(list_group)
            else:
                document_structure['content'].append(list_group)
        
        elif child_type == 'TableItem':
            # 마크다운 미리 추출 (메서드가 아닌 결과값)
            md_result = safe_export_markdown(child)
            
            table_item = {
                'type': 'table',
                'table_id': doc.tables.index(child),
                'page': child.prov[0].page_no if child.prov else None,
                'dimensions': {
                    'rows': child.data.num_rows if hasattr(child, 'data') else None,
                    'cols': child.data.num_cols if hasattr(child, 'data') else None
                },
                'markdown': md_result  # 이미 문자열
            }
            
            if document_structure['content'] and document_structure['content'][-1]['type'] == 'section':
                document_structure['content'][-1]['content'].append(table_item)
            else:
                document_structure['content'].append(table_item)
        
        elif child_type == 'PictureItem':
            image_item = {
                'type': 'image',
                'image_id': doc.pictures.index(child),
                'page': child.prov[0].page_no if child.prov else None,
                'bbox': {
                    'left': float(child.prov[0].bbox.l),
                    'top': float(child.prov[0].bbox.t),
                    'right': float(child.prov[0].bbox.r),
                    'bottom': float(child.prov[0].bbox.b)
                } if child.prov else None
            }
            
            if document_structure['content'] and document_structure['content'][-1]['type'] == 'section':
                document_structure['content'][-1]['content'].append(image_item)
            else:
                document_structure['content'].append(image_item)
    
    return document_structure

hierarchical_doc = extract_hierarchical_document(doc)

print(f"\n총 최상위 항목: {len(hierarchical_doc['content'])}")
sections_count = sum(1 for item in hierarchical_doc['content'] if item['type'] == 'section')
print(f"섹션 수: {sections_count}")

# =============================================================================
# Part 3: 테이블 메타데이터
# =============================================================================

print("\n" + "="*80)
print("Part 3: 테이블 상세 정보")
print("="*80)

table_metadata = []

for i, table in enumerate(new_dict['tables']):
    metadata = {
        'table_id': i,
        'page': table.prov[0].page_no if hasattr(table, 'prov') and table.prov else None,
        'dimensions': {
            'rows': table.data.num_rows if hasattr(table, 'data') else None,
            'cols': table.data.num_cols if hasattr(table, 'data') else None
        }
    }
    
    if hasattr(table, 'prov') and table.prov:
        metadata['bbox'] = {
            'left': float(table.prov[0].bbox.l),
            'top': float(table.prov[0].bbox.t),
            'right': float(table.prov[0].bbox.r),
            'bottom': float(table.prov[0].bbox.b)
        }
    
    if hasattr(table, 'data') and hasattr(table.data, 'table_cells'):
        preview_texts = [cell.text for cell in table.data.table_cells[:5]]
        metadata['preview'] = ' | '.join(preview_texts)
    
    table_metadata.append(metadata)

print(f"총 테이블: {len(table_metadata)}")

# =============================================================================
# Part 4: RAG용 검색 인덱스
# =============================================================================

print("\n" + "="*80)
print("Part 4: RAG 검색 인덱스 생성")
print("="*80)

search_index = []

for text in clean_texts:
    search_index.append({
        'content': text.text,
        'metadata': {
            'page': text.prov[0].page_no if text.prov else None,
            'type': 'text',
            'label': str(text.label)
        }
    })

for i, table in enumerate(new_dict['tables']):
    page_no = table.prov[0].page_no if hasattr(table, 'prov') and table.prov else None
    
    if hasattr(table, 'data') and hasattr(table.data, 'grid'):
        for row_idx, row in enumerate(table.data.grid):
            row_text = ' | '.join([cell.text.strip() for cell in row])
            
            search_index.append({
                'content': row_text,
                'metadata': {
                    'page': page_no,
                    'type': 'table_row',
                    'table_id': i,
                    'row': row_idx
                }
            })

print(f"총 인덱스 항목: {len(search_index)}")

# =============================================================================
# Part 5: 파일 저장 (검증 포함)
# =============================================================================

print("\n" + "="*80)
print("Part 5: 파일 저장")
print("="*80)

# 검증
print("\n데이터 검증 중...")
if validate_json_serializable(hierarchical_doc, "hierarchical_doc"):
    print("✓ hierarchical_doc 검증 통과")
else:
    print("❌ hierarchical_doc 검증 실패")

# 1. 계층 구조 JSON
with open('document_hierarchical.json', 'w', encoding='utf-8') as f:
    json.dump(hierarchical_doc, f, ensure_ascii=False, indent=2)
print("✓ document_hierarchical.json 저장 완료")

# 2. 통합 메타데이터 JSON
full_metadata = {
    'document': {
        'filename': new_dict['name'],
        'total_pages': len(new_dict['pages']),
    },
    'statistics': {
        'texts': len(new_dict['texts']),
        'clean_texts': len(clean_texts),
        'sections': sections_count,
        'tables': len(new_dict['tables']),
        'images': len(new_dict['pictures'])
    },
    'text_labels': dict(label_counts),
    'tables': table_metadata
}

with open('document_metadata.json', 'w', encoding='utf-8') as f:
    json.dump(full_metadata, f, ensure_ascii=False, indent=2)
print("✓ document_metadata.json 저장 완료")

# 3. 검색 인덱스 JSON
with open('search_index.json', 'w', encoding='utf-8') as f:
    json.dump(search_index, f, ensure_ascii=False, indent=2)
print("✓ search_index.json 저장 완료")

print("\n완료! 생성된 파일:")
print("  - document_hierarchical.json (계층 구조)")
print("  - document_metadata.json (메타데이터)")
print("  - search_index.json (RAG 인덱스)")


Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument is deprecated.
Usage of TableItem.export_to_markdown() without `doc` argument i

Part 1: 문서 기본 정보 및 통계

=== new_dict 구조 ===
schema_name: str
version: str
name: str
origin: DocumentOrigin
furniture: GroupItem
body: GroupItem
groups: list (138 items)
texts: list (1546 items)
pictures: list (9 items)
tables: list (172 items)
key_value_items: list (0 items)
form_items: list (0 items)
pages: dict (131 items)

=== 문서 메타데이터 ===
파일명: (사)벤처기업협회_2024년 벤처확인종합관리시스템 기능 고도화 용역사업 
스키마: DoclingDocument
버전: 1.9.0
총 페이지: 131
총 텍스트 요소: 1546
총 테이블: 172
총 이미지: 9
총 그룹: 138

=== 텍스트 라벨 분포 ===
text: 757
list_item: 485
section_header: 150
page_footer: 129
page_header: 11
caption: 5
checkbox_unselected: 5
footnote: 4

=== 노이즈 제거 ===
원본 텍스트: 1546
정제된 텍스트: 1401
제거된 텍스트: 145

Part 2: 계층 구조 추출

총 최상위 항목: 146
섹션 수: 146

Part 3: 테이블 상세 정보
총 테이블: 172

Part 4: RAG 검색 인덱스 생성
총 인덱스 항목: 2651

Part 5: 파일 저장

데이터 검증 중...
✓ hierarchical_doc 검증 통과
✓ document_hierarchical.json 저장 완료
✓ document_metadata.json 저장 완료
✓ search_index.json 저장 완료

완료! 생성된 파일:
  - document