In [None]:
#PDF処理コード - Granite Docling

# 必要なライブラリのインストール
!pip install vllm
!pip install Pillow
!pip install torch torchvision


In [None]:
!pip install transformers


In [None]:
!pip install docling_core

In [None]:
!pip install pdf2image

In [None]:
import time
import os
from pathlib import Path
from typing import List, Optional
import tempfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed

# PDF処理用
from pdf2image import convert_from_path
from PIL import Image

# モデル関連
from vllm import LLM, SamplingParams
from transformers import AutoProcessor

# Docling関連
from docling_core.types.doc import DoclingDocument
from docling_core.types.doc.document import DocTagsDocument

class PDFDoclingProcessor:
    """PDFをGranite Doclingモデルで処理するクラス"""

    def __init__(self,
                 model_path: str = "ibm-granite/granite-docling-258M",
                 dpi: int = 200,
                 max_workers: int = 2):
        """
        初期化

        Args:
            model_path: Hugging Faceモデルのパス
            dpi: PDF→画像変換時のDPI（高いほど高品質だが処理時間増）
            max_workers: 並列処理のワーカー数
        """
        self.model_path = model_path
        self.dpi = dpi
        self.max_workers = max_workers

        # プロンプト設定
        self.prompt_text = "Convert this page to docling format with high accuracy. Preserve all text structure, tables, and formatting."
        self.messages = [
            {
                "role": "user",
                "content": [
                    {"type": "image"},
                    {"type": "text", "text": self.prompt_text},
                ],
            },
        ]

        # モデル初期化（遅延読み込み）
        self.llm = None
        self.processor = None
        self.sampling_params = None

    def _initialize_model(self):
        """モデルを初期化（初回実行時のみ）"""
        if self.llm is None:
            print("モデルを初期化中...")
            self.llm = LLM(
                model=self.model_path,
                revision="untied",
                limit_mm_per_prompt={"image": 1}
            )
            self.processor = AutoProcessor.from_pretrained(self.model_path)
            self.sampling_params = SamplingParams(
                temperature=0.0,
                max_tokens=8192,
                skip_special_tokens=False,
            )
            print("モデル初期化完了!")

    def pdf_to_images(self, pdf_path: str, temp_dir: Optional[str] = None) -> List[Image.Image]:
        """
        PDFをページ画像に変換

        Args:
            pdf_path: PDFファイルのパス
            temp_dir: 一時ディレクトリ（Noneの場合は自動生成）

        Returns:
            PIL Imageオブジェクトのリスト
        """
        print(f"PDFを画像に変換中: {pdf_path}")

        try:
            # PDFをページ画像に変換
            images = convert_from_path(
                pdf_path,
                dpi=self.dpi,
                fmt='RGB',
                thread_count=self.max_workers
            )

            # RGBに変換して統一
            processed_images = []
            for i, img in enumerate(images):
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                processed_images.append(img)
                print(f"ページ {i+1}/{len(images)} 変換完了")

            print(f"PDF変換完了: {len(processed_images)} ページ")
            return processed_images

        except Exception as e:
            print(f"PDF変換エラー: {e}")
            raise

    def process_pdf(self,
                   pdf_path: str,
                   output_dir: str = "output",
                   batch_size: int = 4) -> str:
        """
        PDFファイルを処理してマークダウンを生成

        Args:
            pdf_path: 入力PDFファイルのパス
            output_dir: 出力ディレクトリ
            batch_size: バッチ処理サイズ

        Returns:
            出力されたマークダウンファイルのパス
        """
        # モデル初期化
        self._initialize_model()

        # 出力ディレクトリ作成
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)

        # PDFファイル名から出力ファイル名を生成
        pdf_name = Path(pdf_path).stem

        start_time = time.time()

        # PDFを画像に変換
        images = self.pdf_to_images(pdf_path)
        total_pages = len(images)

        print(f"処理開始: {total_pages} ページを処理します")

        # 全ページ分のマークダウンを結合するリスト
        all_markdown_content = []

        # バッチ処理
        for batch_start in range(0, total_pages, batch_size):
            batch_end = min(batch_start + batch_size, total_pages)
            batch_images = images[batch_start:batch_end]

            print(f"バッチ処理中: ページ {batch_start+1}-{batch_end}")

            # バッチ用入力データ準備
            batched_inputs = []
            for image in batch_images:
                prompt = self.processor.apply_chat_template(
                    self.messages,
                    add_generation_prompt=True
                )
                batched_inputs.append({
                    "prompt": prompt,
                    "multi_modal_data": {"image": image}
                })

            # バッチ推論実行
            batch_outputs = self.llm.generate(batched_inputs, self.sampling_params)

            # バッチ結果を処理
            for page_idx, (output, input_data) in enumerate(zip(batch_outputs, batched_inputs)):
                page_num = batch_start + page_idx + 1
                doctags = output.outputs[0].text

                # DocTagsからDoclingDocumentに変換
                try:
                    doctags_doc = DocTagsDocument.from_doctags_and_image_pairs(
                        [doctags],
                        [input_data["multi_modal_data"]["image"]]
                    )
                    doc = DoclingDocument.load_from_doctags(
                        doctags_doc,
                        document_name=f"{pdf_name}_page_{page_num}"
                    )

                    # マークダウンコンテンツを取得
                    markdown_content = doc.export_to_markdown()

                    # ページ区切りを追加
                    if page_num > 1:
                        all_markdown_content.append(f"\n\n---\n**Page {page_num}**\n---\n\n")
                    else:
                        all_markdown_content.append(f"**Page {page_num}**\n---\n\n")

                    all_markdown_content.append(markdown_content)

                    print(f"ページ {page_num} 処理完了")

                except Exception as e:
                    print(f"ページ {page_num} 処理エラー: {e}")
                    # エラーの場合は raw doctags を追加
                    all_markdown_content.append(f"\n\n---\n**Page {page_num} (Raw)**\n---\n\n")
                    all_markdown_content.append(doctags)

        # 最終的なマークダウンファイルを保存
        final_markdown_path = output_path / f"{pdf_name}_complete.md"
        with open(final_markdown_path, "w", encoding="utf-8") as f:
            f.write("".join(all_markdown_content))

        # 処理時間を表示
        total_time = time.time() - start_time
        print(f"\n処理完了!")
        print(f"総処理時間: {total_time:.2f}秒")
        print(f"平均処理時間/ページ: {total_time/total_pages:.2f}秒")
        print(f"出力ファイル: {final_markdown_path}")

        return str(final_markdown_path)

# 使用例とメイン関数
def main():
    """メイン処理関数"""

    # PDFプロセッサーを初期化
    processor = PDFDoclingProcessor(
        dpi=200,  # 画質設定（150-300推奨）
        max_workers=2  # 並列処理数
    )

    # Google Colab用のファイルアップロード
    try:
        from google.colab import files
        print("PDFファイルをアップロードしてください...")
        uploaded = files.upload()

        # アップロードされたファイルを処理
        for filename in uploaded.keys():
            if filename.lower().endswith('.pdf'):
                print(f"処理開始: {filename}")

                # PDFを処理
                output_file = processor.process_pdf(
                    pdf_path=filename,
                    output_dir="pdf_output",
                    batch_size=4  # メモリに応じて調整
                )

                print(f"処理完了: {output_file}")

                # 結果をダウンロード
                files.download(output_file)

            else:
                print(f"PDFファイルではありません: {filename}")

    except ImportError:
        # Google Colab以外の環境での使用例
        print("Google Colab環境ではありません。ローカルファイルを使用してください。")

        # ローカルファイルでのテスト例
        pdf_path = "/content/2024.acl-long.554.pdf"  # ここにPDFファイルパスを指定
        if os.path.exists(pdf_path):
            output_file = processor.process_pdf(
                pdf_path=pdf_path,
                output_dir="pdf_output"
            )
            print(f"処理完了: {output_file}")
        else:
            print(f"ファイルが見つかりません: {pdf_path}")

# Google Colabで実行する場合
if __name__ == "__main__":
    main()