# Clean the Corpus

This notebook analyzes a corpus for files with the wrong filename type.

## Imports

In [None]:
from __future__ import annotations

from dataclasses import dataclass, asdict
import datetime
import fnmatch
import json
import os
import pathlib
import random
import re
import sys


from pathlib import Path

from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm


Third-party modules

In [None]:
import dotenv
from openai import OpenAI
import tiktoken

Switch to the parent directory so paths can resolve and we write to the right directories.

In [None]:
cwd = pathlib.Path.cwd().resolve()
project_root = cwd.parent if cwd.name == "notebooks" else cwd
scripts_dir = project_root / "scripts"
if scripts_dir.is_dir():
    if cwd != project_root:
        print(f"Changing working directory from {cwd} to {project_root}")
        os.chdir(project_root)  # Change to the project root directory.
print("Working directory:", pathlib.Path.cwd())

Add imports from within the project (depends on prior cell)

In [None]:
from lcats import constants
from lcats import stories
from lcats import utils

from lcats.analysis import corpus_surveyor
from lcats.analysis import graph_plotters
from lcats.analysis import llm_extractor
from lcats.analysis import scene_analysis
from lcats.analysis import story_analysis
from lcats.analysis import story_processors
from lcats.analysis import text_segmenter


In [None]:
from importlib import reload

RELOAD_MODULES = [
    constants,
    stories,
    corpus_surveyor,
    graph_plotters,
    llm_extractor,
    scene_analysis,
    story_analysis,
    story_processors,
    text_segmenter,
    utils,
]
def reloader():
    for module in RELOAD_MODULES:
        print("Reloading", module)
        reload(module)
    print("Reloading complete.")


## Project Setup

### Path Setup

In [None]:
# Where the notebook is executing (absolute, resolved)
CURRENT_PATH = pathlib.Path.cwd().resolve()

# Project root = formerly parent of notebooks/, now just current dir
# PROJECT_ROOT = CURRENT_PATH.parent 
PROJECT_ROOT = CURRENT_PATH

# Local data/output inside the project
DEV_CORPUS = (PROJECT_ROOT / "data")
DEV_OUTPUT = (PROJECT_ROOT / "output")

# Sibling-level resources (one level up from project root)
GIT_CORPUS = (PROJECT_ROOT.parent / "corpora")
OPENIA_API_KEYS_ENV = (PROJECT_ROOT.parent / ".secrets" / "openai_api_keys.env")

def check_path(path: pathlib.Path, description: str) -> None:
    if path.exists():
        print(f"Found {description} at: {path}")
    else:
        print(f"Missing {description} from: {path}")

check_path(DEV_CORPUS, "DEV_CORPUS")
check_path(DEV_OUTPUT, "DEV_OUTPUT")
check_path(GIT_CORPUS, "GIT_CORPUS")
check_path(OPENIA_API_KEYS_ENV, "OPENIA_API_KEYS_ENV")


In [None]:
# Working corpora
# CORPORA_ROOT = project_root / "data"
# Checked-in corpora
CORPORA_ROOT = project_root / ".." / "corpora"
CORPORA_ROOT = CORPORA_ROOT.resolve()  # Resolve to absolute path.

print("Corpora root:", CORPORA_ROOT)
print("Corpora top-level directories:", end=" ")
os.listdir(CORPORA_ROOT)

### OpenAI Client

Get the OpenAI API key.

In [None]:
dotenv.load_dotenv(OPENIA_API_KEYS_ENV)
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
print(OPENAI_API_KEY)

Verify that we can get a client.

In [None]:
client = OpenAI()
print(f"Loaded OpenAI client: {client} with version: {client._version}")

Verify the API is working. This week. And that you have credits.

In [None]:
response = client.responses.create(
    model="gpt-4o",
    input="Write a one-sentence bedtime story about a starship captain visiting a planet."
)

print(f"Story generated on: {datetime.date.today()}:")
utils.pprint(response.output_text)

## Corpora-level Analysis

### Story Corpora

In [None]:
# If run from within a notebook, the corpora root is two paths up from the notebook's location.
CORPORA_ROOT = GIT_CORPUS  # Checked-in corpora
# CORPORA_ROOT = DEV_CORPUS  # Command line working corpora

# Now load the corpora
corpora = stories.Corpora(CORPORA_ROOT)

print("Loaded corpora:")
print(f" - root: {corpora.corpora_root}")
print(f" - corpora: {len(corpora.corpora)}")
print(f" - stories: {len(corpora.stories)}")
print()

print(f"Example story: corpora.stories[0]:")
example_story = corpora.stories[0]
print(f"Story type: {type(example_story)} with a body of {len(example_story.body)} characters.")
print(example_story)


### JSON Corpora Load

In [None]:
json_stories = corpus_surveyor.find_corpus_stories(CORPORA_ROOT)
len(json_stories)
print(utils.sml(json_stories))
print("Type of path element:", type(json_stories[0]))

## Corpora Cleaner

The following code should not be run automatically as it processes a whole corpus.

In [None]:
raise Exception("The following code should not be run automatically.")

In [None]:
_VALID_BASENAME_RE_TEMPLATE = r"^[a-z0-9]+(?:_[a-z0-9]+)*$"

@dataclass
class FileRecord:
    rel_dir: str
    old_basename: str
    old_ext: str
    new_basename: Optional[str] = None  # set if repaired
    new_ext: Optional[str] = None       # defaults to lower of old_ext
    reason: Optional[str] = None        # if invalid or changed

@dataclass
class OperationLog:
    params: dict
    candidates: List[str]              # relative paths
    excluded: List[str]                # relative paths
    valid: List[str]                   # relative paths (kept as-is)
    invalid: List[str]                 # relative paths (need repair)
    repairs: List[dict]                # dict per repaired file (old -> new)
    errors: List[Tuple[str, Optional[str], str]]  # (orig_rel, rename_rel|None, reason)
    performed_writes: List[str]        # relative paths written (dry_run=False)
    performed_skips: List[str]         # relative paths skipped
    # Note: path to CHANGES.tab is stored in params["changes_tsv"] if written.

def rename_and_fix_json_files(
    input_dir: str | pathlib.Path,
    output_dir: str | pathlib.Path,
    *,
    ext: str = ".json",                      # file extension to process (case-insensitive)
    dry_run: bool = True,                    # list-only by default
    max_basename_len: int = 72,              # max length excluding extension
    exclude_basenames: Optional[List[str]] = None,   # exact basenames to exclude (no dir)
    exclude_globs: Optional[List[str]] = None,       # globs against basename (e.g. "*.md")
    exclude_path_prefixes: Optional[List[str]] = None,  # path prefixes (posix, relative to input)
    metadata_name_includes_extension: bool = False,  # whether metadata["name"] includes extension
    copy_valid_when_writing: bool = True,    # copy valid files into output when dry_run=False
) -> OperationLog:
    """
    Scan input_dir recursively, select files with the given extension (case-insensitive),
    exclude according to rules, validate basenames, attempt repairs, detect collisions,
    and optionally write fixed JSON files to output_dir (mirroring subtree) while
    updating data["metadata"]["name"] for repaired files.

    Also writes CHANGES.tab (TSV) in output_dir when dry_run=False, listing all planned repairs:
      columns: from_rel  to_rel  old_basename  new_basename  old_ext  new_ext

    Acceptable filename rules (basename only, no extension):
      - lowercase letters and digits only, separated by single underscores
      - no leading or trailing underscores
      - length <= max_basename_len
    """
    input_dir = pathlib.Path(input_dir).resolve()
    output_dir = pathlib.Path(output_dir).resolve()
    params_base = {
        "input_dir": str(input_dir),
        "output_dir": str(output_dir),
        "ext": ext,
        "dry_run": dry_run,
        "max_basename_len": max_basename_len,
        "exclude_basenames": list({*(exclude_basenames or []), "LICENSE"}),
        "exclude_globs": list({*(exclude_globs or []), "*.md"}),
        "exclude_path_prefixes": [p.rstrip("/") + "/" for p in (exclude_path_prefixes or ["cache"])],
        "metadata_name_includes_extension": metadata_name_includes_extension,
        "copy_valid_when_writing": copy_valid_when_writing,
    }

    if not input_dir.exists():
        raise FileNotFoundError(f"Input directory not found: {input_dir}")

    exclude_basenames = params_base["exclude_basenames"]
    exclude_globs = params_base["exclude_globs"]
    exclude_path_prefixes = params_base["exclude_path_prefixes"]

    valid_basename_re = re.compile(_VALID_BASENAME_RE_TEMPLATE)

    def is_candidate(p: pathlib.Path) -> bool:
        return p.is_file() and p.suffix.lower() == ext.lower()

    def is_excluded(rel_path: pathlib.Path) -> bool:
        rel_posix = rel_path.as_posix()
        for pref in exclude_path_prefixes:
            if rel_posix.startswith(pref) or rel_posix.startswith("./" + pref):
                return True
        if rel_path.name in exclude_basenames:
            return True
        name_lower = rel_path.name.lower()
        for g in exclude_globs:
            if fnmatch.fnmatch(name_lower, g.lower()):
                return True
        return False

    def is_valid_basename(basename: str) -> bool:
        return (
            len(basename) > 0
            and len(basename) <= max_basename_len
            and bool(valid_basename_re.match(basename))
        )

    def repair_basename(raw: str) -> str:
        """
        Lowercase, non [a-z0-9] -> '_', collapse runs, strip edge underscores, enforce max length.
        """
        s = raw.lower()
        s = re.sub(r"[^a-z0-9]+", "_", s)  # collapse specials to single underscores
        s = s.strip("_")                   # no leading/trailing underscores
        if len(s) > max_basename_len:
            s = s[:max_basename_len]
            s = s.strip("_")               # ensure no edge underscores after truncation
        s = re.sub(r"_+", "_", s)          # re-collapse just in case
        return s

    # Walk and collect candidates
    candidates: List[str] = []
    excluded: List[str] = []
    valid: List[str] = []
    invalid: List[str] = []
    records_by_rel: Dict[str, FileRecord] = {}

    for p in input_dir.rglob("*"):
        if not p.is_file():
            continue
        if not is_candidate(p):
            continue
        rel = p.relative_to(input_dir)
        rel_str = rel.as_posix()
        if is_excluded(rel):
            excluded.append(rel_str)
            continue
        candidates.append(rel_str)
        base = p.stem
        ext_actual = p.suffix  # keep original extension (may be .JSON)
        rec = FileRecord(rel_dir=rel.parent.as_posix(), old_basename=base, old_ext=ext_actual)
        # valid only if basename meets rule and extension already lowercase
        if is_valid_basename(base) and ext_actual == ext_actual.lower():
            valid.append(rel_str)
        else:
            invalid.append(rel_str)
            rec.reason = "invalid"
        records_by_rel[rel_str] = rec

    print(f"[scan] candidates: {len(candidates)}, excluded: {len(excluded)}, "
          f"valid: {len(valid)}, invalid: {len(invalid)}")

    # Plan repairs for invalid files; detect collisions
    errors: List[Tuple[str, Optional[str], str]] = []
    planned_new_paths: Dict[str, str] = {}  # rel_new -> rel_old
    repairs: List[dict] = []
    valid_set = set(valid)

    for rel_str in invalid:
        rec = records_by_rel[rel_str]
        new_base = repair_basename(rec.old_basename)
        new_ext = rec.old_ext.lower()  # normalize extension to lowercase

        if not new_base:
            errors.append((rel_str, None, "blank_after_repair"))
            continue

        rel_dir = rec.rel_dir
        new_rel = f"{rel_dir}/{new_base}{new_ext}" if rel_dir not in ("", ".") else f"{new_base}{new_ext}"

        # Collision with existing valid?
        if new_rel in valid_set and new_rel != rel_str:
            errors.append((rel_str, new_rel, "collision_with_existing_valid"))
            continue

        # Collision with another repair?
        if new_rel in planned_new_paths and planned_new_paths[new_rel] != rel_str:
            errors.append((rel_str, new_rel, "collision_with_another_repair"))
            continue

        rec.new_basename = new_base
        rec.new_ext = new_ext
        planned_new_paths[new_rel] = rel_str
        repairs.append({
            "from": rel_str,
            "to": new_rel,
            "old_basename": rec.old_basename,
            "new_basename": new_base,
            "old_ext": rec.old_ext,
            "new_ext": new_ext
        })

    # Abort on errors
    if errors:
        print(f"[abort] {len(errors)} error(s) detected; no files written.")
        for orig, newp, reason in errors:
            print(f"  - {reason}: {orig}" + (f" -> {newp}" if newp else ""))
        params = dict(params_base)
        return OperationLog(
            params=params,
            candidates=candidates,
            excluded=excluded,
            valid=valid,
            invalid=invalid,
            repairs=repairs,
            errors=errors,
            performed_writes=[],
            performed_skips=[],
        )

    # Dry-run: print plan and return
    if dry_run:
        for r in repairs:
            print(f"[plan] {r['from']} -> {r['to']}")
        print(f"[dry-run] {len(repairs)} rename(s) would be performed; {len(valid)} already ok.")
        params = dict(params_base)
        return OperationLog(
            params=params,
            candidates=candidates,
            excluded=excluded,
            valid=valid,
            invalid=invalid,
            repairs=repairs,
            errors=[],
            performed_writes=[],
            performed_skips=[],
        )

    # Perform writes
    performed_writes: List[str] = []
    performed_skips: List[str] = []

    output_dir.mkdir(parents=True, exist_ok=True)

    def desired_metadata_name(new_base: str, new_ext: str) -> str:
        return f"{new_base}{new_ext}" if metadata_name_includes_extension else new_base

    # Optionally copy valid files unchanged (to mirror the whole corpus)
    if copy_valid_when_writing:
        for rel_str in valid:
            src = input_dir / rel_str
            dst = output_dir / rel_str
            dst.parent.mkdir(parents=True, exist_ok=True)
            with open(src, "rb") as fsrc, open(dst, "wb") as fdst:
                fdst.write(fsrc.read())
            performed_skips.append(rel_str)

    # Write repaired files with updated metadata.name
    for r in repairs:
        orig_rel = r["from"]
        new_rel = r["to"]
        src = input_dir / orig_rel
        dst = output_dir / new_rel
        dst.parent.mkdir(parents=True, exist_ok=True)

        with open(src, "r", encoding="utf-8") as f:
            try:
                data = json.load(f)
            except Exception as e:
                raise ValueError(f"Failed to parse JSON: {orig_rel}: {e}") from e

        # Expect data["metadata"]["name"]
        try:
            md = data["metadata"]
            _ = md["name"]
        except Exception:
            raise KeyError(f'missing data["metadata"]["name"] in {orig_rel}')

        md["name"] = desired_metadata_name(r["new_basename"], r["new_ext"])

        with open(dst, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
            f.write("\n")

        print(f"[write] {orig_rel} -> {new_rel} (metadata.name={md['name']})")
        performed_writes.append(new_rel)

    # Write CHANGES.tab (TSV) with planned repairs
    changes_path = output_dir / "CHANGES.tab"
    with open(changes_path, "w", encoding="utf-8", newline="") as f:
        f.write("from_rel\tto_rel\told_basename\tnew_basename\told_ext\tnew_ext\n")
        for r in repairs:
            f.write(
                f"{r['from']}\t{r['to']}\t"
                f"{r['old_basename']}\t{r['new_basename']}\t"
                f"{r['old_ext']}\t{r['new_ext']}\n"
            )
    print(f"[changes] wrote {changes_path}")

    print(f"[done] wrote {len(performed_writes)} file(s); "
          f"copied {len(performed_skips)} unchanged (copy_valid_when_writing={copy_valid_when_writing}).")

    params = dict(params_base)
    params["changes_tsv"] = str(changes_path)

    return OperationLog(
        params=params,
        candidates=candidates,
        excluded=excluded,
        valid=valid,
        invalid=invalid,
        repairs=repairs,
        errors=[],
        performed_writes=performed_writes,
        performed_skips=performed_skips,
    )


In [None]:
OUTPUT_ROOT = DEV_OUTPUT / "cleaned_corpus"
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

print(f"Input dir: {CORPORA_ROOT}")
print(f"Output dir: {OUTPUT_ROOT}")

In [None]:
operation_log = rename_and_fix_json_files(
    CORPORA_ROOT,
    OUTPUT_ROOT,
    dry_run=True,  # Set to False to perform writes
    metadata_name_includes_extension=True,
)

In [None]:
operation_log

In [None]:
operation_log = rename_and_fix_json_files(
    CORPORA_ROOT,
    OUTPUT_ROOT,
    dry_run=False,
    metadata_name_includes_extension=False,
)

## Story Classifier

Reload the corpus as we may have changed it from the earlier load.

In [None]:
json_stories = corpus_surveyor.find_corpus_stories(CORPORA_ROOT)
len(json_stories)
print(utils.sml(json_stories))
print("Type of path element:", type(json_stories[0]))

In [None]:
sample_story = json_stories[0]
print("Sample story path:", sample_story)
with open(sample_story, "r", encoding="utf-8") as f:
    sample_data = json.load(f)
print("Sample story metadata:", sample_data["metadata"])
sample_body = sample_data["body"]
print(f"Sample story text length: {len(sample_body)} characters.")

In [None]:
print(sample_body)

In [None]:
story_classifier = story_analysis.make_doc_classification_extractor(client)

In [None]:
sample_output = story_classifier(sample_body)
sample_output

In [None]:
random_sample = random.sample(json_stories, 100)
random_sample

In [None]:
DEV_OUTPUT

In [None]:
summary = corpus_surveyor.process_files(
    random_sample,
    corpora_root=CORPORA_ROOT,
    output_root=DEV_OUTPUT,
    processor_function=story_classifier,
    job_label="story_classes",
    verbose=True,
)

In [None]:
more_summary = corpus_surveyor.process_files(
    json_stories,
    corpora_root=CORPORA_ROOT,
    output_root=DEV_OUTPUT,
    processor_function=story_classifier,
    job_label="story_classes",
    verbose=True,
)

In [None]:
missing_stories = [
    CORPORA_ROOT / 'mass_quantities/george_walker_at_suez.json',
    CORPORA_ROOT / 'mass_quantities/give_back_a_world.json',
]

missing_summary = corpus_surveyor.process_files(
    missing_stories,
    corpora_root=CORPORA_ROOT,
    output_root=DEV_OUTPUT,
    processor_function=story_classifier,
    job_label="story_classes",
    verbose=True,
)

In [None]:
1+2

In [None]:
reloader()