In [0]:
# COMMAND ----------
%pip install pypdf


In [0]:
# COMMAND ----------
from dataclasses import dataclass
from typing import List
import re
import pypdf

@dataclass
class Chunk:
    text: str
    page_index: int  # 0-based
    page_number: int # 1-based


def load_pdf_and_chunk(pdf_path: str, max_chars: int = 1200) -> List[Chunk]:
    """
    PDF 1ファイルを複数チャンク（Chunk オブジェクトのリスト）にして返す簡易版。
    - pypdf でページごとにテキスト抽出
    - 空行で段落に分割
    - max_chars 文字まで段落を詰め込んで 1 チャンク
    """
    reader = pypdf.PdfReader(pdf_path)
    chunks: List[Chunk] = []

    for page_idx, page in enumerate(reader.pages):
        raw_text = page.extract_text() or ""
        # 連続改行を 2 つまでに圧縮
        normalized = re.sub(r"\n\s*\n+", "\n\n", raw_text.strip())

        # 空行で段落分割
        paragraphs = [p.strip() for p in normalized.split("\n\n") if p.strip()]
        if not paragraphs:
            continue

        buf = ""
        for para in paragraphs:
            # 段落を足しても max_chars 以内ならバッファに詰める
            if len(buf) + len(para) + 2 <= max_chars:
                buf = (buf + "\n\n" + para) if buf else para
            else:
                # いったんチャンクとして確定
                chunks.append(
                    Chunk(
                        text=buf,
                        page_index=page_idx,
                        page_number=page_idx + 1,
                    )
                )
                buf = para  # 次のチャンクを開始

        # 残りがあればチャンクとして追加
        if buf:
            chunks.append(
                Chunk(
                    text=buf,
                    page_index=page_idx,
                    page_number=page_idx + 1,
                )
            )

    return chunks


In [0]:
# COMMAND ----------
from pathlib import Path
from pyspark.sql import Row

PDF_DIR = "/dbfs/bronze/jsai2025/pdfs"
pdf_paths = [str(p) for p in Path(PDF_DIR).glob("*.pdf")]

# Delta の出力先（好きな場所に変えてOK）
CHUNK_TEXT_DIR = "dbfs:/silver/jsai2025/chunks_text"


def split_pdf_to_chunks(pdf_path: str):
    # ★ さっき定義した load_pdf_and_chunk をそのまま呼ぶ
    chunks = load_pdf_and_chunk(pdf_path)

    rows = []
    for i, chunk in enumerate(chunks):
        rows.append(
            Row(
                pdf_path=pdf_path,
                chunk_id=i,                    # PDF 内での通し番号
                page_index=chunk.page_index,   # 0 始まり
                page_number=chunk.page_number, # 1 始まり
                text=chunk.text,
            )
        )
    return rows


# PDF パスを RDD にばらまき、flatMap で 1 PDF → 複数チャンクに展開
pdf_rdd = spark.sparkContext.parallelize(pdf_paths, numSlices=len(pdf_paths))

chunk_df = pdf_rdd.flatMap(split_pdf_to_chunks).toDF()

display(chunk_df)

# ★ Delta / Parquet として保存（ここが「新しいコンテナに書き込む」部分）
(
    chunk_df
    .write
    .mode("overwrite")
    .format("delta")       # parquet でも OK
    .save(CHUNK_TEXT_DIR)
)
