In [12]:
from dotenv import load_dotenv
load_dotenv()

import sys
sys.path.append('../')

In [25]:
SAMPLE = "../data/2025 대한민국 경제.pdf"

In [26]:
from typing import TypedDict

class GraphState(TypedDict):
    filepath: str
    filetype: str
    page_numbers: list[int]
    batch_size: int
    split_filepaths: list[str]
    analyzed_files: list[str]
    page_elements: dict[int, dict[str, list[dict]]]
    page_metadata: dict[int, dict]
    page_summary: dict[int, str]
    images: list[str]
    images_summary: list[str]
    tables: list[str]
    table_sumamry: dict[int, str]
    table_markdown: dict[int, str] # 신규 추가
    texts: list[str]
    text_summary: list[str]
    table_summary_data_batches: list[dict] # 신규 추가
    language: str

In [27]:
import os
import pymupdf
import json
import requests

In [28]:
dict_color = {"table": "red",
              "figure": "dodgerblue",
              "chart": "limegreen",
              "heading1": "darkorange",
              "header": "orangered",
              "footer": "mediumorchid",
              "caption": "deeppink",
              "paragraph": "crimson", 
              "equation": "darkturquoise",
              "list": "chocolate",
              "index": "forestgreen",
              "footnote": "navy"
              }


# https://console.upstage.ai/docs/capabilities/document-parse?utm_campaign=dp-launch-mkt&utm_source=web-blog&utm_medium=text-link
# https://matplotlib.org/stable/users/explain/colors/colors.html

In [29]:
def split_pdf(state: GraphState):
    """
    입력 PDF를 여러 개의 작은 PDF 파일로 분할합니다.

    :param state: GraphState 객체, PDF 파일 경로와 배치 크기 정보를 포함
    :return: 분할된 PDF 파일 경로 목록을 포함한 GraphState 객체
    """
    # PDF 파일 경로와 배치 크기 추출
    filepath = state["filepath"]
    batch_size = state["batch_size"]

    # PDF 파일 열기
    input_pdf = pymupdf.open(filepath)
    num_pages = len(input_pdf)
    print(f"총 페이지 수: {num_pages}")

    ret = []
    # PDF 분할 작업 시작
    for start_page in range(0, num_pages, batch_size):
        # 배치의 마지막 페이지 계산 (전체 페이지 수를 초과하지 않도록)
        end_page = min(start_page + batch_size, num_pages) - 1

        # 분할된 PDF 파일명 생성
        input_file_basename = os.path.splitext(filepath)[0]
        output_file = f"{input_file_basename}_{start_page:04d}_{end_page:04d}.pdf"
        print(f"분할 PDF 생성: {output_file}")

        # 새로운 PDF 파일 생성 및 페이지 삽입
        with pymupdf.open() as output_pdf:
            output_pdf.insert_pdf(input_pdf, from_page=start_page, to_page=end_page)
            output_pdf.save(output_file)
            ret.append(output_file)

    # 원본 PDF 파일 닫기
    input_pdf.close()

    # 분할된 PDF 파일 경로 목록을 포함한 GraphState 객체 반환
    return GraphState(split_filepaths=ret)

In [30]:
state = GraphState(filepath=SAMPLE, batch_size=11)
state_out = split_pdf(state)
state.update(state_out)
state

총 페이지 수: 21
분할 PDF 생성: ../data/2025 대한민국 경제_0000_0010.pdf
분할 PDF 생성: ../data/2025 대한민국 경제_0011_0020.pdf


{'filepath': '../data/2025 대한민국 경제.pdf',
 'batch_size': 11,
 'split_filepaths': ['../data/2025 대한민국 경제_0000_0010.pdf',
  '../data/2025 대한민국 경제_0011_0020.pdf']}

In [31]:
class DocumentParser:
    def __init__(self, api_key):
        """
        LayoutAnalyzer 클래스의 생성자

        :param api_key: Upstage API 인증을 위한 API 키
        """
        self.api_key = api_key
    
    def _upstage_document_parser(self, input_file):
        """
        Upstage의 Document Parser API를 호출하여 문서 분석 수행합니다.
        
        :param input_file: 분석할 PDF 파일의 경로
        :return: 분석 결과가 저장된 JSON 파일의 경로
        """
    
        # API 요청 헤더 설정
        header = {"Authorization": f"Bearer {self.api_key}"}

        # API 요청 데이터 설정
        data = {"ocr": "auto", "output_formats": "['html', 'text']"}

        # 분석할 PDF 파일 열기
        files = {"document": open(input_file, 'rb')}

        # API 요청 보내기
        response = requests.post(
            "https://api.upstage.ai/v1/document-ai/document-parse",
            headers=header,
            data=data,
            files=files,
        )

        # API 응답 처리 및 결과 저장
        if response.status_code == 200:
            # 분석 결과를 저장할 JSON 파일 경로 생성
            output_file = os.path.splitext(input_file)[0] + ".json"

            # 분석 결과를 JSON 파일로 저장
            with open(output_file, "w") as f:
                json.dump(response.json(), f, ensure_ascii=False)

            return output_file

        else:
            # API 요청이 실패한 경우 예외 발생
            print(response)
            raise ValueError(f"API 요청 실패. 상태 코드: {response.status_code}")
        
    def excute(self, input_file):
        """
        주어진 입력 파일에 대해 Document Parser를 실행합니다.

        :param input_file: 분석할 PDF 파일의 경로
        :return: 분석 결과가 저장된 JSON 파일의 경로
        """
        return self._upstage_document_parser(input_file)


In [32]:
def document_parser(state: GraphState):
    # 분활된 PDF 파일 목록을 가져옵니다.
    split_files = state["split_filepaths"]

    # DocumentParser 객체를 생성합니다. API 키는 환경 변수에서 가져옵니다.
    parser = DocumentParser(os.environ.get("UPSTAGE_API_KEY"))

    # 분석된 파일의 경로를 저장할 리스트를 초기화합니다.
    analyzed_files = []

    # 각 분할된 PDF 파일에 대해 Document Parser를 수행합니다.
    for file in split_files:
        # Document Parser를 실행하고 결과 파일 경로를 리스트에 추가합니다.
        analyzed_files.append(parser.excute(file))

    # 분석된 파일 경로들을 정렬하여 새로운 GraphState 객체를 생성하고 반환합니다.
    # 정렬은 파일들의 순서를 유지하기 위해 수행됩니다.
    return GraphState(analyzed_files=sorted(analyzed_files))

In [33]:
state_out = document_parser(state)
state.update(state_out)
state

{'filepath': '../data/2025 대한민국 경제.pdf',
 'batch_size': 11,
 'split_filepaths': ['../data/2025 대한민국 경제_0000_0010.pdf',
  '../data/2025 대한민국 경제_0011_0020.pdf'],
 'analyzed_files': ['../data/2025 대한민국 경제_0000_0010.json',
  '../data/2025 대한민국 경제_0011_0020.json']}

In [34]:
import PIL
from PIL import ImageDraw, ImageFont

class ImageCropper:
    @staticmethod
    def pdf_to_image(pdf_file, page_num, dpi=300):
        """
        PDF 파일의 특정 페이지를 이미지로 변환하는 메서드

        :param page_num: 변환할 페이지 번호 (1부터 시작)
        :param dpi: 이미지 해상도 (기본값: 300)
        :return: 변환된 이미지 객체
        """
        with pymupdf.open(pdf_file) as doc:
            page = doc[page_num].get_pixmap(dpi=dpi)
            target_page_size = [page.width, page.height]
            page_img = PIL.Image.frombytes("RGB", target_page_size, page.samples)
        return page_img
    
    @staticmethod
    def crop_image(img, coordinates, output_file):
        """
        이미지를 주어진 좌표에 따라 자르고 저장하는 정적 메서드

        :param img: 자를 이미지 객체
        :param coordinates: 자를 영역의 좌표 리스트
        :param output_file: 저장할 이미지 파일 경로
        """
        img_width, img_height = img.size
        x0, y0, x1, y1 = [int(coord * dim) for coord, dim in zip(coordinates, [img_width, img_height] * 2)]
        cropped_img = img.crop((x0, y0, x1, y1))
        cropped_img.save(output_file)

    @staticmethod
    def visualize_bbox(img, img_size, element):
        # 폰트에 대한 내용을 더 생각해봐야 함
        """
        이미지에 하나의 element에 대한 바운딩 박스와 annotation을 시각화하는 정적 메서드

        :param img: 시각화할 이미지 객체
        :param element: 
        :return: 
        """
        # 실제 좌표로 변경
        (x0, y0 ), (x1, y1) = [
            (coord['x'] * img_size[0], coord['y'] * img_size[1])
            for coord in [element['coordinates'][0], element['coordinates'][2]]
            ]
            
        draw = ImageDraw.Draw(img)
        draw.rectangle([x0, y0, x1, y1], outline=dict_color[element['category']], width=10)

        # font = ImageFont.truetype("Gargi", 40)
        font = ImageFont.load_default(40)
        text = "  " + element['category'] + f"   id: {element['id']}  " 
        text_box = draw.textbbox((x0, y0), text, font=font)
        text_width, text_height = text_box[2] - text_box[0], text_box[3] - text_box[1] + 10

        draw.rectangle([x0, y0 - text_height, x0 + text_width, y0], fill=dict_color[element['category']])
        draw.text((x0, y0 - text_height), text, fill="white", font=font)

        return img

In [37]:
from collections import deque
from abc import ABC, abstractmethod

class BaseNode(ABC):
    def __init__(self, verbose=False, **kwargs):
        self.name = self.__class__.__name__
        self.verbose = verbose

    @abstractmethod
    def execute(self, state: GraphState) -> GraphState:
        pass

    def log(self, message: str, **kwargs):
        if self.verbose:
            print(f"[{self.name}] {message}")
            for key, value in kwargs.items():
                print(f"  {key}: {value}")

    def __call__(self, state: GraphState) -> GraphState:
        return self.execute(state)


class DocumentParserVisualizerNode(BaseNode):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.name = "DocumentParserVisulizerNode"

    def execute(self, state: GraphState) -> GraphState:
        for i, json_path in enumerate(state["analyzed_files"]):
            with open(json_path) as jsonFile:
                json_data = json.load(jsonFile)

            page_list = []
            elements_deque = deque(json_data['elements'])

            for page in range(0, state["batch_size"]):
                try: 
                    img = ImageCropper.pdf_to_image(state["split_filepaths"][i], page)
                except:
                    break
                img_size = img.size

                for element in json_data['elements']:
                    if element['page'] == page + 1:
                        img = ImageCropper.visualize_bbox(img, img_size, element)
                        elements_deque.popleft()
                        
                page_list.append(img)

            page_list[0].save(os.path.splitext(json_path)[0] + "_bounding.pdf", save_all=True, append_images=page_list[1:])

        return state

In [38]:
node = DocumentParserVisualizerNode()
state = node.execute(state)
