In [15]:
# Step 1 — Raw PDF Extraction (all pages into a single JSON file)
from pathlib import Path
import pdfplumber
import json

pdf_path = Path("data/julius-caesar.pdf")
out_file = Path("data/raw_pages.json")

pages = []
with pdfplumber.open(pdf_path) as pdf:
    for i, page in enumerate(pdf.pages, start=1):
        text = page.extract_text(x_tolerance=2, y_tolerance=2) or ""
        pages.append({"page_num": i, "text": text})

out_file.write_text(json.dumps(pages, ensure_ascii=False, indent=2), encoding="utf-8")


161571

In [16]:
from pathlib import Path
import pdfplumber
import json

In [17]:
# Step 3 — Page-level Act/Scene detection (header-first assumption) — single cell
from pathlib import Path
import json
import re

DATA_DIR = Path("data")
CLEANED_PAGES = DATA_DIR / "cleaned_pages.json"
RAW_PAGES = DATA_DIR / "raw_pages.json"
OUT_FILE = DATA_DIR / "step3_structured_pages.json"

# choose source: prefer cleaned pages (artifact-removed)
if CLEANED_PAGES.exists():
    pages = json.loads(CLEANED_PAGES.read_text(encoding="utf-8"))
elif RAW_PAGES.exists():
    pages = json.loads(RAW_PAGES.read_text(encoding="utf-8"))
else:
    raise FileNotFoundError("No source pages json found. Run extraction (Step 1) first.")

# regex patterns (robust to variants)
act_pat = re.compile(r'\bACT\b[\s\.\-:]*([IVXLCDM]+|\d+)\b', re.IGNORECASE)
scene_pat = re.compile(r'\b(?:SC(?:\.|ENE)?|SCENE|Scene)\b[\s\.\-:]*([IVXLCDM]+|\d+)\b', re.IGNORECASE)

TOP_N_LINES = 12  # only inspect top N lines of each page (header-first assumption)

structured = []
current_act = None
current_scene = None
first_act_page = None

for pg in pages:
    page_num = pg.get("page_num")
    text = pg.get("clean_text") or pg.get("text") or ""
    # get top N non-empty lines
    lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
    top_lines = lines[:TOP_N_LINES]

    found_act = None
    found_scene = None
    heading_lines = []

    # scan only the top_lines for ACT/SCENE
    for ln in top_lines:
        m_act = act_pat.search(ln)
        if m_act:
            found_act = m_act.group(1).upper()
            heading_lines.append(ln)
            # do not break yet — a line may also contain scene
        m_scene = scene_pat.search(ln)
        if m_scene:
            found_scene = m_scene.group(1).upper()
            heading_lines.append(ln)

    # Apply header-first policy:
    # - If found_act in top lines -> update current_act and reset scene if not found_scene
    # - If found_scene in top lines -> update current_scene
    if found_act:
        current_act = found_act
        if first_act_page is None:
            first_act_page = page_num
        # if no scene found on same page, current_scene remains None until a scene header appears
    if found_scene:
        current_scene = found_scene

    # Determine flags
    is_front_matter = False
    if current_act is None:
        # still before first act — treat as front matter
        is_front_matter = True

    # assign page-level metadata (propagate last seen act/scene for play-content pages)
    page_entry = {
        "page_num": page_num,
        "clean_text": text,
        "act": current_act if not is_front_matter else None,
        "scene": current_scene if not is_front_matter else None,
        "heading_lines_top": heading_lines,
        "is_front_matter": is_front_matter
    }
    structured.append(page_entry)

# Save step3 output (stable filename)
OUT_FILE.write_text(json.dumps(structured, ensure_ascii=False, indent=2), encoding="utf-8")

# minimal print
print(f"Step 3 done: {len(structured)} pages processed, first_act_page = {first_act_page}")


Step 3 done: 106 pages processed, first_act_page = 2


In [18]:
# Step 4 — Remove Folger artifacts & reconstruct structured blocks -> data/step4.jsonl
from pathlib import Path
import json
import re

DATA_DIR = Path("data")
IN_JSON = DATA_DIR / "step3_structured_pages.json"
OUT_JSONL = DATA_DIR / "step4.jsonl"

if not IN_JSON.exists():
    raise FileNotFoundError("step3_structured_pages.json not found")

pages = json.loads(IN_JSON.read_text(encoding="utf-8"))

# patterns to remove
ftln_pat = re.compile(r'\bFTLN\b\s*\d{1,4}', re.IGNORECASE)
page_header_num_title_pat = re.compile(r'^\s*\d+\s+Julius\s+Caesar.*$', re.IGNORECASE)
page_num_only_pat = re.compile(r'^\s*(?:Page\s*)?\d{1,4}\s*$', re.IGNORECASE)
# speaker detection: lines beginning with UPPERCASE name possibly with numbers/periods, e.g., "BRUTUS", "BRUTUS,", "FIRST SOLDIER"
speaker_line_full = re.compile(r'^\s*([A-Z][A-Z0-9\-\s]+?)[\.:]?\s*(?:\s+(.+))?$')
# stage directions or entrances
stage_dir_pat = re.compile(r'^\s*(Enter|Exit|Exeunt|SCENE|ACT|Stage|Stage Directions|[A-Z][a-z]+ing)\b', re.IGNORECASE)

records = []
block_id = 0

def clean_page_text(raw_text):
    t = raw_text or ""
    t = ftln_pat.sub("", t)              # remove FTLN tokens
    # remove header/footer lines
    lines = [ln.rstrip() for ln in t.splitlines()]
    cleaned_lines = []
    for ln in lines:
        if not ln.strip():
            cleaned_lines.append("")  # preserve blank for paragraph boundaries
            continue
        if page_header_num_title_pat.match(ln):
            continue
        if page_num_only_pat.fullmatch(ln):
            continue
        cleaned_lines.append(ln)
    # collapse 3+ blanks to 2
    out = "\n".join(cleaned_lines)
    out = re.sub(r'\n{3,}', '\n\n', out)
    return out.strip() + "\n"

# iterate pages, build continuous stream of lines with page info
line_stream = []  # list of (page_num, line_text)
for pg in pages:
    pnum = pg.get("page_num")
    raw = pg.get("clean_text") or pg.get("text") or ""
    cleaned = clean_page_text(raw)
    for ln in cleaned.splitlines():
        line_stream.append((pnum, ln))

# now parse line_stream into speaker blocks
i = 0
n = len(line_stream)
current_block = None  # dict with keys: speaker, lines, start_page, end_page, act, scene, block_type

while i < n:
    pnum, ln = line_stream[i]
    if not ln.strip():
        # blank line => break current block (if any) and skip
        if current_block:
            current_block['end_page'] = current_block.get('end_page', pnum)
            text = "\n".join(current_block['lines']).strip()
            block_type = current_block.get('block_type','speech')
            block_id += 1
            records.append({
                "block_id": block_id,
                "act": current_block.get("act"),
                "scene": current_block.get("scene"),
                "speaker": current_block.get("speaker"),
                "text": text,
                "start_page": current_block.get("start_page"),
                "end_page": current_block.get("end_page"),
                "block_type": block_type
            })
            current_block = None
        i += 1
        continue

    # detect speaker inline
    m = speaker_line_full.match(ln)
    is_stage = bool(stage_dir_pat.match(ln))
    if m and (m.group(1).isupper() or is_stage):
        speaker = m.group(1).strip()
        rest = (m.group(2) or "").strip()
        # commit current block if different speaker
        if current_block:
            # if same speaker, append; else finish and start new
            if current_block.get("speaker") == speaker:
                if rest:
                    current_block['lines'].append(rest)
                current_block['end_page'] = pnum
            else:
                # finish previous
                current_block['end_page'] = current_block.get('end_page', pnum)
                text = "\n".join(current_block['lines']).strip()
                block_type = current_block.get('block_type','speech')
                block_id += 1
                records.append({
                    "block_id": block_id,
                    "act": current_block.get("act"),
                    "scene": current_block.get("scene"),
                    "speaker": current_block.get("speaker"),
                    "text": text,
                    "start_page": current_block.get("start_page"),
                    "end_page": current_block.get("end_page"),
                    "block_type": block_type
                })
                current_block = None
                # start new
                current_block = {
                    "speaker": speaker,
                    "lines": [rest] if rest else [],
                    "start_page": pnum,
                    "end_page": pnum,
                    "act": None,
                    "scene": None,
                    "block_type": "stage_direction" if is_stage else "speech"
                }
        else:
            # start new block
            current_block = {
                "speaker": speaker,
                "lines": [rest] if rest else [],
                "start_page": pnum,
                "end_page": pnum,
                "act": None,
                "scene": None,
                "block_type": "stage_direction" if is_stage else "speech"
            }
        # attach act/scene from nearest page metadata
        # find page metadata from pages list (fast lookup)
        # build once mapping
        i += 1
        continue
    else:
        # line does not start with speaker; treat as continuation of current block if exists
        if current_block:
            current_block['lines'].append(ln)
            current_block['end_page'] = pnum
        else:
            # orphan line: create an anonymous narrator block (e.g., stage direction or prose)
            current_block = {
                "speaker": None,
                "lines": [ln],
                "start_page": pnum,
                "end_page": pnum,
                "act": None,
                "scene": None,
                "block_type": "narration"
            }
    i += 1

# finish last block
if current_block:
    current_block['end_page'] = current_block.get('end_page')
    text = "\n".join(current_block['lines']).strip()
    block_type = current_block.get('block_type','speech')
    block_id += 1
    records.append({
        "block_id": block_id,
        "act": current_block.get("act"),
        "scene": current_block.get("scene"),
        "speaker": current_block.get("speaker"),
        "text": text,
        "start_page": current_block.get("start_page"),
        "end_page": current_block.get("end_page"),
        "block_type": block_type
    })

# attach act/scene metadata to records by looking up pages mapping (closest start_page)
page_meta = { p['page_num']: p for p in pages }
for r in records:
    sp = r.get("start_page")
    meta = page_meta.get(sp) or {}
    r['act'] = meta.get('act')
    r['scene'] = meta.get('scene')

# merge very short speaker fragments into previous block if same speaker to fix table splits
merged = []
for r in records:
    if merged and r['speaker'] == merged[-1]['speaker'] and len(r['text'].split()) < 6:
        # merge
        merged[-1]['text'] = merged[-1]['text'] + "\n " + r['text']
        merged[-1]['end_page'] = r['end_page']
    else:
        merged.append(r)

# write to JSONL
with OUT_JSONL.open('w', encoding='utf-8') as f:
    for r in merged:
        f.write(json.dumps(r, ensure_ascii=False) + "\n")


In [19]:
# Step 5 — Merge continuations & isolate stage directions -> data/step5.jsonl
from pathlib import Path
import json
import re

DATA_DIR = Path("data")
IN_FILE = DATA_DIR / "step4.jsonl"
OUT_FILE = DATA_DIR / "step5.jsonl"

if not IN_FILE.exists():
    raise FileNotFoundError("data/step4.jsonl not found. Run Step 4 first.")

# Load records
with IN_FILE.open("r", encoding="utf-8") as f:
    recs = [json.loads(line) for line in f.read().splitlines() if line.strip()]

# Helpers
stage_dir_pat = re.compile(
    r'^\s*(Enter|Exit|Exeunt|Exit,|Enter,|Exeunt,|[Tt]he\s+[A-Z][a-z]+\s+(comes|comes forward|comes on|enters|appears)|\[|{).*$',
    re.IGNORECASE)
act_scene_pat = re.compile(r'^\s*ACT\b|^\s*SC(?:\.|ENE)?\b', re.IGNORECASE)
speaker_heading_pat = re.compile(r'^[A-Z][A-Z0-9\-\s]+$')  # loose check for "BRUTUS", "FIRST SOLDIER"

def is_stage_direction_text(txt):
    if not txt or not isinstance(txt, str):
        return False
    txts = txt.strip()
    if not txts:
        return False
    # start with "The Soothsayer comes forward." or "Enter ..." or bracketed directions
    if stage_dir_pat.match(txts):
        return True
    # short parenthetical directions e.g., "(Aside)" or "(to Antony)"
    if txts.startswith("(") and txts.endswith(")"):
        return True
    # lines that are short and capitalized but are not full-sentence speeches
    if len(txts.split()) <= 5 and txts[0].isupper() and txts.endswith("."):
        # likely a stage direction (e.g., "Sennet.")
        return True
    return False

def clean_whitespace(s):
    return re.sub(r'\s+\n', '\n', (s or "")).strip()

# Merge pass
merged = []
for r in recs:
    # normalize fields
    r['speaker'] = None if r.get('speaker') in (None, "None", "") else r.get('speaker')
    r['text'] = (r.get('text') or "").strip()
    r['start_page'] = int(r.get('start_page') or 0)
    r['end_page'] = int(r.get('end_page') or r['start_page'])
    # classify stage direction if detected in speaker or text
    if r['speaker'] and is_stage_direction_text(r['speaker']):
        r['block_type'] = 'stage_direction'
        r['speaker'] = None
    elif is_stage_direction_text(r['text']):
        r['block_type'] = 'stage_direction'
        # keep speaker if valid, else none
        if not r['speaker']:
            r['speaker'] = None
    merged.append(r)

# Now merge continuations into previous block when safe
out = []
for idx, cur in enumerate(merged):
    if not out:
        out.append(cur)
        continue
    prev = out[-1]

    # Conditions to merge cur into prev:
    # 1) cur has no speaker (or speaker is None) AND prev has a speaker
    # 2) pages are same or consecutive (allow prev.end_page == cur.start_page or +1)
    # 3) cur is not a clear stage direction or ACT/SCENE heading or scene-break
    cond_pages = (cur['start_page'] == prev['end_page']) or (cur['start_page'] == prev['end_page'] + 1)
    cur_is_orphan = not cur.get('speaker')
    cur_is_stage = (cur.get('block_type') == 'stage_direction') or is_stage_direction_text(cur.get('text', ""))
    cur_is_act_scene = bool(act_scene_pat.search(cur.get('text', "")[:40]))

    if cur_is_orphan and prev.get('speaker') and cond_pages and (not cur_is_stage) and (not cur_is_act_scene):
        # Hyphenation fix: if prev ends with hyphenated token, join directly
        prev_text = prev.get('text', "")
        cur_text = cur.get('text', "")
        if prev_text.rstrip().endswith("-"):
            # remove hyphen and join without space
            prev['text'] = prev_text.rstrip()[:-1] + cur_text.lstrip()
        else:
            prev['text'] = prev_text.rstrip() + "\n" + cur_text.lstrip()
        prev['end_page'] = max(prev.get('end_page', prev.get('start_page')), cur.get('end_page', cur.get('start_page')))
        # keep prev.block_type as speech
        continue
    else:
        # small heuristic: if cur has speaker but it's identical to prev.speaker merge
        if cur.get('speaker') and prev.get('speaker') and cur.get('speaker') == prev.get('speaker'):
            # also allow merge if cur is very short (<6 words)
            if len(cur.get('text','').split()) < 6:
                prev['text'] = prev.get('text','').rstrip() + "\n" + cur.get('text','').lstrip()
                prev['end_page'] = max(prev.get('end_page', prev.get('start_page')), cur.get('end_page', cur.get('start_page')))
                continue
        out.append(cur)

# Final pass: attach orphan initial blocks (before first speaker) to next speaker if appropriate
final = []
i = 0
while i < len(out):
    r = out[i]
    if i == 0 and (not r.get('speaker')) and len(out) > 1:
        # if first block is orphan and next block has speaker, merge into next if orphan is short and not stage
        nxt = out[1]
        orphan_word_count = len((r.get('text') or "").split())
        if orphan_word_count <= 8 and not is_stage_direction_text(r.get('text','')):
            # merge into nxt (prepend)
            nxt['text'] = r.get('text','').rstrip() + "\n" + nxt.get('text','')
            nxt['start_page'] = min(nxt.get('start_page',9999), r.get('start_page',9999))
            i += 1
            continue
    final.append(r)
    i += 1

# Save to step5.jsonl
with OUT_FILE.open('w', encoding='utf-8') as f:
    for rec in final:
        # clean whitespace small
        rec['text'] = clean_whitespace(rec.get('text',''))
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

print("Step 5 done:", len(final), "blocks -> data/step5.jsonl")


Step 5 done: 854 blocks -> data/step5.jsonl


In [23]:
# Step 6: Add metadata → produce final RAG-ready chunks
# Input:  data/step5.jsonl
# Output: data/step6.jsonl
# RULE: SPEAKER IS NEVER MODIFIED.

import json, re
from pathlib import Path

DATA = Path("data")
IN_PATH = DATA / "step5.jsonl"
OUT_PATH = DATA / "step6.jsonl"

if not IN_PATH.exists():
    raise FileNotFoundError("step5.jsonl not found in /data folder")

# Roman → int mapping
roman_map = {
    'I':1,'II':2,'III':3,'IV':4,'V':5,'VI':6,'VII':7,'VIII':8,'IX':9,'X':10,
    'XI':11,'XII':12,'XIII':13,'XIV':14,'XV':15
}

def roman_to_int(s):
    if not s:
        return None
    s = str(s).strip().upper()
    if s.isdigit():
        return int(s)
    return roman_map.get(s)

# -------------------------
# LOAD BLOCKS (exactly as-is)
# -------------------------
blocks = []
with IN_PATH.open("r", encoding="utf-8") as f:
    for line in f:
        if not line.strip():
            continue
        rec = json.loads(line)

        # do NOT modify speaker
        text = (rec.get("text") or "").strip()

        # Normalize metadata, but DO NOT TOUCH SPEAKER
        block = {
            "block_type": rec.get("block_type"),
            "speaker": rec.get("speaker"),          # keep EXACT value
            "text": text,
            "act_raw": rec.get("act"),
            "scene_raw": rec.get("scene"),
            "act": roman_to_int(rec.get("act")),
            "scene": roman_to_int(rec.get("scene")),
            "start_page": int(rec.get("start_page") or 0),
            "end_page": int(rec.get("end_page") or rec.get("start_page") or 0)
        }

        block["word_count"] = len(block["text"].split())
        blocks.append(block)

# Sort blocks for consistent chunking
blocks = sorted(blocks, key=lambda b: (b["act"] or 0, b["scene"] or 0, b["start_page"], b["end_page"]))

# -------------------------
# GROUP BY SCENE
# -------------------------
scene_groups = {}
for b in blocks:
    key = (b["act"], b["scene"])
    if key == (None, None):
        continue
    scene_groups.setdefault(key, []).append(b)

# -------------------------
# BUILD CHUNKS
# -------------------------
chunks = []
chunk_id = 0

# ---- SCENE CHUNKS ----
for (act, scene), items in scene_groups.items():
    scene_text = "\n\n".join([x["text"] for x in items if x["text"]])

    chunk_id += 1
    chunks.append({
        "chunk_id": chunk_id,
        "chunk_type": "scene",
        "act": act,
        "scene": scene,
        "start_page": min(x["start_page"] for x in items),
        "end_page": max(x["end_page"] for x in items),
        "text": scene_text,
        "word_count": len(scene_text.split()),
        "speakers": sorted(set(x["speaker"] for x in items if x.get("speaker")))
    })

# ---- SPEECH/STAGE CHUNKS ----
for b in blocks:

    # Stage direction → separate chunk
    if b["block_type"] == "stage_direction":
        chunk_id += 1
        chunks.append({
            "chunk_id": chunk_id,
            "chunk_type": "stage_direction",
            "act": b["act"],
            "scene": b["scene"],
            "speaker": b["speaker"],              # unchanged
            "text": b["text"],
            "start_page": b["start_page"],
            "end_page": b["end_page"],
            "word_count": b["word_count"],
            "is_soliloquy": False
        })
        continue

    # Speech block
    wc = b["word_count"]
    is_sol = wc >= 120  # long monologues

    # Scene-only speaker rule
    sc_key = (b["act"], b["scene"])
    if sc_key in scene_groups:
        scene_spks = [x["speaker"] for x in scene_groups[sc_key] if x["speaker"]]
        if len(set(scene_spks)) == 1 and b["speaker"] in set(scene_spks):
            is_sol = True

    # Isolated rule (stage-direction before & after)
    idx = blocks.index(b)
    prev_b = blocks[idx - 1] if idx > 0 else None
    next_b = blocks[idx + 1] if idx + 1 < len(blocks) else None
    if (prev_b is None or prev_b["block_type"] == "stage_direction") and \
       (next_b is None or next_b["block_type"] == "stage_direction") and wc > 40:
        is_sol = True

    chunk_id += 1
    chunks.append({
        "chunk_id": chunk_id,
        "chunk_type": "speech",
        "act": b["act"],
        "scene": b["scene"],
        "speaker": b["speaker"],               # EXACT AS IN step5
        "text": b["text"],
        "start_page": b["start_page"],
        "end_page": b["end_page"],
        "word_count": wc,
        "is_soliloquy": is_sol
    })

# -------------------------
# SAVE OUTPUT FILE
# -------------------------
with OUT_PATH.open("w", encoding="utf-8") as f:
    for c in chunks:
        f.write(json.dumps(c, ensure_ascii=False) + "\n")

print(f"done → {OUT_PATH} (total chunks = {len(chunks)})")


done → data\step6.jsonl (total chunks = 871)
