# README ‚Äî SFT Data Generation (supervised_fine_tuning)

**Purpose:** Generate high-quality SFT training examples from PDF text using a teacher + (optional) auditor pipeline. This notebook supports caching, batching, and configurable concurrency so you can balance speed vs. strict auditing. 

### Quick start ‚úÖ
- Install dependencies (local, no docker required):

```bash
pip install -r requirements.txt
```

- Copy `.env.example` to `.env` and edit values (do NOT commit your `.env`):

```bash
cp .env.example .env
# edit .env to match your environment
```

- Prompts and templates are loaded at runtime. By default the notebook reads `config/prompts.json`, but you can override this using the environment variable `SFT_PROMPTS_PATH` (for example: `SFT_PROMPTS_PATH=config/prompt.local.json` in your local `.env`). Do **not** commit your private prompts to the repository; use `config/prompts.example.json` as the editable, generic illustration.

- `.env` recommendations:

```text
OLLAMA_URL=http://localhost:11434
TEACHER_MODEL=qwen2.5:72b-instruct
AUDITOR_MODEL=deepseek-r1:70b
# Optional: run Redis for shared cache
REDIS_URL=redis://localhost:6379/0
MAX_LLM_CONCURRENCY=8
USE_SINGLE_CALL=1         # recommended for speed
USE_BATCHING=0            # optional: 0/1
BATCH_SIZE=4
AUDIT_SAMPLE_RATE=0.05    # sample strict audits when using single-call
# Optional: override prompts file for local development
# SFT_PROMPTS_PATH=config/prompt.local.json
```

### Prompts & Templates ‚úÖ
- Location: `config/prompts.json` by default (override with `SFT_PROMPTS_PATH`).
- What it contains: system prompts and small templates used by the pipeline (keys include `system_gen`, `gen_prompt_template`, `audit_system`, `audit_prompt_template`, `single_call_system`, `single_call_prompt_template`, `batch_system`, `batch_block_template`).
- How it works: the notebook will first attempt to load the file at `SFT_PROMPTS_PATH` (if set) or `config/prompts.json`; if missing it will prefer `config/prompts.example.json` (editable example), and finally fall back to well‚Äëtested built-in defaults.
- Editing tips:
  - Copy `config/prompts.example.json` to `config/prompts.json` or create a `config/prompt.local.json` for local changes and set `SFT_PROMPTS_PATH` to point to it.
  - **Do not** commit private prompt files ‚Äî add them to `.gitignore` (this repo already ignores `config/prompts.json` and `config/prompt.local.json`).
  - Templates use `{chunk}` and `{generated}` placeholders for prompt composition (these are substituted when the notebook runs).
  - After editing, re-run the top configuration cells (or restart the kernel and run top cells) to pick up changes.
- Example: modify the `system_gen` value to shift the teacher's style or constraints, or adjust `audit_prompt_template` to change strictness.


## Install dependencies (helper) ‚öôÔ∏è

**Purpose:** Install required Python packages for this project. Run this cell when setting up the environment or when `requirements.txt` changes.

**Usage:** `!pip install -r requirements.txt` ‚Äî run once per environment. Avoid running in CI; prefer reproducible environments or lockfiles.

In [None]:
# Install dependencies (one-off helper cell)
# Run this in the notebook when you need to install packages for this project
# Preferably in a virtual environment. create a virtual environment using:
# python3 -m venv .venv
# source .venv/bin/activate
#python -m ipykernel install --user --name .venv --display-name "SFT Data Gen (.venv)"

#%pip install -r requirements.txt

## Imports & Configuration (overview) üîß

**Purpose:** Load environment variables, set default configuration values, and establish paths used throughout the notebook (e.g., `OLLAMA_URL`, cache settings, chunking constants and data directories).

**Usage:** Run this cell first after starting the kernel so all subsequent helper functions have access to these constants.

In [None]:
# Imports & Configuration (canonical, single cell)
import os, sys, json, time, re, random
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
load_dotenv()

# Core endpoints & models
OLLAMA_URL = os.getenv("OLLAMA_URL")
if OLLAMA_URL is None:
    raise RuntimeError("Error: OLLAMA_URL is not set in the environment variables. Please add OLLAMA_URL to your .env file")
OLLAMA_URL = OLLAMA_URL.rstrip("/")
TEACHER_MODEL = os.getenv("TEACHER_MODEL", "qwen2.5:72b-instruct")
AUDITOR_MODEL = os.getenv("AUDITOR_MODEL", "deepseek-r1:70b")

# Cache & concurrency defaults
REDIS_URL = os.getenv("REDIS_URL") or None
CACHE_DIR = os.getenv("CACHE_DIR", str(Path.cwd() / "cache"))
CHUNK_TTL = int(os.getenv("CHUNK_TTL", 60 * 60 * 24))
SFT_TTL = int(os.getenv("SFT_TTL", 60 * 60 * 24 * 7))
MAX_LLM_CONCURRENCY = int(os.getenv("MAX_LLM_CONCURRENCY", 8))
USE_SINGLE_CALL = os.getenv("USE_SINGLE_CALL", "1") in ["1","true","True", True]
AUDIT_SAMPLE_RATE = float(os.getenv("AUDIT_SAMPLE_RATE", "0.05"))
USE_BATCHING = os.getenv("USE_BATCHING", "0") in ["1","true","True", True]
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "4"))
BATCH_CONCURRENCY = int(os.getenv("BATCH_CONCURRENCY", "2"))
MAX_BATCH_CHARS = int(os.getenv("MAX_BATCH_CHARS", "20000"))
# Chunking defaults (chars)
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "2000"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "200"))
# Optional override to force cache backend: 'redis' or 'disk'
CACHE_BACKEND = os.getenv("CACHE_BACKEND", "").lower()  # set to 'disk' to force DiskCache

# Paths
try:
    SCRIPT_DIR = Path(__file__).parent.resolve()
except NameError:
    SCRIPT_DIR = Path.cwd().resolve()
RAW_DATA_DIR = SCRIPT_DIR / "data" / "raw" / "in-progress"
RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)

TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

PROCESSED_DIR = SCRIPT_DIR / "data" / "processed" / TIMESTAMP
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

## Prompts loader & defaults üßæ


**Purpose:** The prompt file is to instruct model to generate as well as audit data where objective of dataset is defined. 

**Action:** Create a prompt templates in `config/` folder `prompt.local.json`. An illustrative example is provided `prompts.example.json`


In [None]:
# Prompts (load or defaults)
# Respect SFT_PROMPTS_PATH if set and non-empty; otherwise use config/prompts.json
_env_path = os.environ.get("SFT_PROMPTS_PATH")
PROMPTS_PATH = Path(_env_path) if _env_path else Path("config") / "prompts.json"
# If an environment override is provided (non-empty), surface it so users know where prompts are coming from
if _env_path:
    print(f"SFT_PROMPTS_PATH set -> loading prompts from {PROMPTS_PATH}")

PROMPTS = None
# Only load when PROMPTS_PATH is a file (guards against empty env -> current dir)
if PROMPTS_PATH.is_file():
    try:
        with PROMPTS_PATH.open("r", encoding="utf-8") as f:
            PROMPTS = json.load(f)
    except Exception as e:
        print(f"Failed to read SFT prompts file {PROMPTS_PATH}: {e}. Falling back to example/defaults.")

if PROMPTS is None:
    # Prefer an example prompts file if present (editable and safe to commit)
    EXAMPLE_PATH = Path("config") / "prompts.example.json"
    if EXAMPLE_PATH.is_file():
        with EXAMPLE_PATH.open("r", encoding="utf-8") as f:
            PROMPTS = json.load(f)
        print(f"Loaded prompts from example: {EXAMPLE_PATH}")
    else:
        # Built-in safe defaults (generic templates) ‚Äî edit `config/prompts.example.json` to customize
        PROMPTS = {
            "system_gen": "You are an expert assistant. Create ONE high-quality supervised fine-tuning example based only on the provided source text. Output ONLY valid JSON in this exact format: {\"instruction\": \"A clear task\", \"output\": \"A precise response\"}",
            "gen_prompt_template": "Using only the following source text, create one high-quality training example for fine-tuning:\\n\\n{chunk}",
            "audit_system": "You are a meticulous auditor. Verify that the generated instruction-output pair is factually accurate and faithful to the source text. Correct any errors and return only the final valid JSON.",
            "audit_prompt_template": "Source Text:\\n{chunk}\\n\\nGenerated Pair:\\n{generated}\\n\\nVerify factual accuracy against the source and return only the corrected JSON.",
            "single_call_system": "You are an expert assistant and auditor. Create and self-audit one high-quality example; output only the final JSON.",
            "single_call_prompt_template": "Source Text:\\n{chunk}\\n\\nCreate the training example and self-audit it; return only the final JSON.",
            "batch_system": "For each SOURCE block, create one high-quality example and ensure it is fact-checked. Return a JSON array or newline-separated JSON objects.",
            "batch_block_template": "--- SOURCE {i} ---\\n{chunk}\\n"
        }
        print("Loaded built-in prompt defaults")


## Cache backend initialization üíæ

**Purpose:** Initialize the caching client. Prefers Redis when available and healthy, otherwise falls back to DiskCache. Also provides `_using_redis()` for backend checks.

**Notes:** Install `redis` or `diskcache` packages in your environment if you want the respective backends.

In [None]:
# Cache backend init + helper to detect Redis usage
try:
    import redis
except Exception:
    redis = None
try:
    from diskcache import Cache as DiskCache
except Exception:
    DiskCache = None

_cache_client = None

def init_cache():
    """Initialize a cache client. Prefer Redis (when available and healthy),
    otherwise fall back to DiskCache. Honours CACHE_BACKEND env override.
    """
    global _cache_client
    if _cache_client is not None:
        return
    _info = globals().get("log", print)

    # Force disk backend when requested
    if CACHE_BACKEND == "disk":
        _info("CACHE_BACKEND=disk -> forcing DiskCache backend")
        if DiskCache is None:
            _info("DiskCache not available. Install with `pip install diskcache`")
            raise RuntimeError("No cache backend available")
        _cache_client = DiskCache(CACHE_DIR)
        _info(f"Using DiskCache at {CACHE_DIR} (forced by CACHE_BACKEND)")
        return

    # Try Redis when configured
    if REDIS_URL and redis is not None:
        try:
            client = redis.from_url(REDIS_URL, socket_connect_timeout=2, socket_timeout=2, decode_responses=True)
            # require a ping to verify health
            client.ping()
            _cache_client = client
            _info(f"Using Redis cache at {REDIS_URL}")
            return
        except Exception as e:
            _info(f"Could not use Redis at {REDIS_URL} ({type(e).__name__}: {e}). Falling back to DiskCache.")
            try:
                client.close()
            except Exception:
                pass
            _cache_client = None

    # Fall back to DiskCache
    if DiskCache is None:
        _info("DiskCache not available. Install with `pip install diskcache` or set REDIS_URL to a running Redis server.")
        raise RuntimeError("No cache backend available")

    _cache_client = DiskCache(CACHE_DIR)
    _info(f"Using DiskCache at {CACHE_DIR}")


def _using_redis():
    """Return True when the active cache client is Redis-backed."""
    return bool(REDIS_URL and redis is not None and _cache_client is not None and not isinstance(_cache_client, DiskCache))

## Hashing, caching helpers & chunking üß©

**Purpose:** Provide utilities to compute PDF and chunk hashes (`pdf_sha256`, `chunk_sha256`), store and retrieve cached chunks/SFT pairs, and split long text into reasonable chunks using paragraph-aware logic.

**Usage:** These helpers are used by the processing pipeline to avoid reprocessing work and ensure chunk stability across runs.

In [None]:
# Hashing & cache helpers (robust to Redis bytes/str)
import hashlib

def pdf_sha256(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

def chunk_sha256(chunk: str) -> str:
    return hashlib.sha256(chunk.encode("utf-8")).hexdigest()

def cache_chunks(pdf_hash: str, chunks, ttl=CHUNK_TTL):
    global _cache_client
    init_cache()
    key = f"chunks:{pdf_hash}"
    if _using_redis():
        try:
            _cache_client.set(key, json.dumps(chunks, ensure_ascii=False), ex=ttl)
            return
        except Exception as e:
            # on redis failure, fall back to diskcache
            print(f"Redis cache_chunks error: {e}; falling back to DiskCache")
            try:
                _cache_client.close()
            except Exception:
                pass
            _cache_client = None
            init_cache()
    # DiskCache path
    _cache_client.set(key, chunks, expire=ttl)

def get_cached_chunks(pdf_hash: str):
    global _cache_client
    init_cache()
    key = f"chunks:{pdf_hash}"
    if _using_redis():
        try:
            v = _cache_client.get(key)
        except Exception as e:
            print(f"Redis get_cached_chunks error: {e}; falling back to DiskCache")
            try:
                _cache_client.close()
            except Exception:
                pass
            _cache_client = None
            init_cache()
            return _cache_client.get(key)
        if not v:
            return None
        try:
            return json.loads(v)
        except Exception:
            return None
    else:
        return _cache_client.get(key)

def cache_sft_pair(chunk_hash: str, pair, ttl=SFT_TTL):
    global _cache_client
    init_cache()
    key = f"sft:{chunk_hash}"
    if _using_redis():
        try:
            _cache_client.set(key, json.dumps(pair, ensure_ascii=False), ex=ttl)
            return
        except Exception as e:
            print(f"Redis cache_sft_pair error: {e}; falling back to DiskCache")
            try:
                _cache_client.close()
            except Exception:
                pass
            _cache_client = None
            init_cache()
    _cache_client.set(key, pair, expire=ttl)

def get_cached_sft_pair(chunk_hash: str):
    global _cache_client
    init_cache()
    key = f"sft:{chunk_hash}"
    if _using_redis():
        try:
            v = _cache_client.get(key)
        except Exception as e:
            print(f"Redis get_cached_sft_pair error: {e}; falling back to DiskCache")
            try:
                _cache_client.close()
            except Exception:
                pass
            _cache_client = None
            init_cache()
            return _cache_client.get(key)
        if not v:
            return None
        try:
            return json.loads(v)
        except Exception:
            return None
    else:
        return _cache_client.get(key)

# Text chunking helper

def chunk_text_to_chunks(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
    """Split `text` into chunks of approximately `chunk_size` characters with `overlap`.

    Strategy:
    - Split text into paragraphs on two newlines
    - Accumulate paragraphs until adding would exceed chunk_size
    - If a single paragraph is larger than chunk_size, split it into slices with overlap
    - Return list of chunk strings (stripped)
    """
    if not text:
        return []

    paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
    chunks = []
    current = []
    current_len = 0

    def flush_current():
        nonlocal current, current_len
        if current:
            chunk = "\n\n".join(current).strip()
            if chunk:
                chunks.append(chunk)
        current = []
        current_len = 0

    for p in paragraphs:
        p_len = len(p)
        if current_len + p_len + (2 if current else 0) <= chunk_size:
            current.append(p)
            current_len += p_len + (2 if current else 0)
        else:
            flush_current()
            if p_len <= chunk_size:
                current.append(p)
                current_len = p_len
            else:
                # paragraph itself is larger than chunk_size; split it
                start = 0
                while start < p_len:
                    end = min(start + chunk_size, p_len)
                    slice_ = p[start:end].strip()
                    if slice_:
                        chunks.append(slice_)
                    if end >= p_len:
                        break
                    start = max(0, end - overlap)
    # flush remaining
    flush_current()
    return chunks

## HTTP session pooling & METRICS üì°

**Purpose:** Set up a shared `requests.Session` with retries and connection pooling to reuse HTTP connections for Ollama calls. Also provides `record_call` and `summarise_metrics` for basic instrumentation and debugging.

**Usage:** Reconfigure session concurrency via `_reconfigure_session_for_concurrency()` when benchmarking or autotuning.

In [None]:
# HTTP session (pooling & retries) and METRICS
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests

SESSION = requests.Session()
OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY")
if OLLAMA_API_KEY:
    SESSION.headers.update({"Authorization": f"Bearer {OLLAMA_API_KEY}"})
retries = Retry(total=3, backoff_factor=0.6, status_forcelist=[429, 500, 502, 503, 504])
adapter = HTTPAdapter(pool_connections=MAX_LLM_CONCURRENCY*2, pool_maxsize=MAX_LLM_CONCURRENCY*2, max_retries=retries)
SESSION.mount("http://", adapter)
SESSION.mount("https://", adapter)

METRICS = {"calls": []}
def record_call(model: str, duration: float, success: bool, error: str | None = None):
    METRICS["calls"].append({"model": model, "duration": duration, "success": bool(success), "error": str(error) if error else None})
def summarise_metrics():
    import statistics
    by_model = {}
    for c in METRICS["calls"]:
        m = c["model"]
        by_model.setdefault(m, []).append(c)
    lines = []
    for m, calls in by_model.items():
        durations = [c["duration"] for c in calls if c["duration"] is not None]
        successes = sum(1 for c in calls if c["success"])
        total = len(calls)
        mean = statistics.mean(durations) if durations else 0
        p95 = sorted(durations)[int(len(durations) * 0.95)] if durations else 0
        lines.append(f"{m}: calls={total} success={successes} mean={mean:.2f}s p95={p95:.2f}s")
    return "\n".join(lines)

## LLM call wrapper & JSON parsing ü§ñ

**Purpose:** `call_ollama` performs robust calls to the Ollama API (with retries and timeout), strips surrounding markdown fences, and attempts to parse JSON. `_parse_json_array_or_objects` extracts JSON objects from potentially messy outputs.

**Notes:** The helper records metrics and returns `None` on repeated failures; callers should handle `None` results accordingly.

In [None]:
# LLM call + parsing helpers
def call_ollama(model, prompt, system_prompt="", session=None):
    session = session or SESSION
    url = f"{OLLAMA_URL}/api/chat"
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": system_prompt} if system_prompt else None,
            {"role": "user", "content": prompt}
        ],
        "stream": False,
        "options": {"temperature": 0.1, "num_predict": 1024}
    }
    payload["messages"] = [m for m in payload["messages"] if m is not None]
    for attempt in range(3):
        start = time.time()
        try:
            response = session.post(url, json=payload, timeout=300)
            response.raise_for_status()
            duration = time.time() - start
            data = response.json()
            raw_content = data["message"]["content"]
            print("call_ollama: raw response (truncated):", raw_content[:2000])
            content = raw_content.strip()
            if content.startswith("```json"):
                content = content[7:]
            if content.endswith("```"):
                content = content[:-3]
            content = content.strip()
            try:
                parsed = json.loads(content)
            except json.JSONDecodeError:
                match = re.search(r'\{.*\}', content, re.DOTALL)
                if match:
                    parsed = json.loads(match.group(0))
                else:
                    record_call(model, duration, False, error="invalid-json")
                    return None
            record_call(model, duration, True)
            return parsed
        except Exception as e:
            duration = time.time() - start
            record_call(model, duration, False, error=str(e))
            time.sleep(1)
    record_call(model, None, False, error="all attempts failed")
    return None

def _parse_json_array_or_objects(text: str):
    text = text.strip()
    try:
        parsed = json.loads(text)
        if isinstance(parsed, dict):
            return [parsed]
        if isinstance(parsed, list):
            return parsed
    except Exception:
        objs = re.findall(r"\{(?:[^{}]|(?R))*\}", text, flags=re.DOTALL)
        results = []
        for o in objs:
            try:
                results.append(json.loads(o))
            except Exception:
                continue
        if results:
            return results
    return None

## Generation & Auditing Pipelines üß™

**Purpose:** Provide three generation modes:
- `generate_and_audit`: two-step teacher ‚Üí auditor (strict accuracy)
- `generate_and_audit_single`: single-call with self-audit (faster)
- `generate_and_audit_batch`: batch multiple chunks into one request

**Also includes:** PDF extraction helpers with fallbacks (`docling`, `fitz`, `pdfminer`) to ensure robust text extraction.

In [None]:
# Generate & audit pipelines

# PDF text extraction fallbacks (docling -> fitz -> pdfminer)

def extract_text_with_fitz(pdf_path: Path):
    try:
        import fitz
    except Exception:
        return None
    try:
        doc = fitz.open(str(pdf_path))
        texts = []
        for p in doc:
            texts.append(p.get_text("text"))
        return "\n\n".join(t.strip() for t in texts if t and t.strip())
    except Exception as e:
        print("fitz extraction failed:", e)
        return None


def extract_text_with_pdfminer(pdf_path: Path):
    try:
        from pdfminer.high_level import extract_text
    except Exception:
        return None
    try:
        return extract_text(str(pdf_path))
    except Exception as e:
        print("pdfminer extraction failed:", e)
        return None


def get_markdown_for_pdf(pdf_path: Path, converter=None):
    """Return markdown text for a PDF by trying docling conversion first, then fallbacks.

    Returns empty string if no usable text is found.
    """
    converter = converter or DocumentConverter()
    # try docling conversion
    try:
        result = converter.convert(pdf_path)
        md = result.document.export_to_markdown()
        if md and len(md.strip()) > 50:
            return md
        print("Docling conversion produced insufficient text; trying fallbacks.")
    except Exception as e:
        print("Docling conversion failed:", e)

    # try PyMuPDF (fitz)
    md = extract_text_with_fitz(pdf_path)
    if md and len(md.strip()) > 50:
        return md

    # try pdfminer
    md = extract_text_with_pdfminer(pdf_path)
    if md and len(md.strip()) > 50:
        return md

    print("Fallback extractors returned no usable text. Check OCR engines or install fitz/pdfminer.")
    return ""


def generate_and_audit(chunk):
    start = time.time()
    system_gen = PROMPTS.get("system_gen")
    gen_prompt = PROMPTS.get("gen_prompt_template").format(chunk=chunk)
    raw_pair = call_ollama(TEACHER_MODEL, gen_prompt, system_gen)
    if not raw_pair:
        return None
    audit_system = PROMPTS.get("audit_system")
    audit_prompt = PROMPTS.get("audit_prompt_template").format(chunk=chunk, generated=json.dumps(raw_pair, indent=2))
    final_pair = call_ollama(AUDITOR_MODEL, audit_prompt, audit_system)
    record_call("pipeline:generate_and_audit", time.time() - start, True if final_pair else False)
    return final_pair

def generate_and_audit_single(chunk):
    system_prompt = PROMPTS.get("single_call_system")
    prompt = PROMPTS.get("single_call_prompt_template").format(chunk=chunk)
    start = time.time()
    out = call_ollama(TEACHER_MODEL, prompt, system_prompt)
    record_call("pipeline:single_generate_and_audit", time.time() - start, True if out else False)
    return out

def generate_and_audit_batch(chunks: list[str]):
    total_chars = sum(len(c) for c in chunks)
    if total_chars > MAX_BATCH_CHARS:
        print("Batch too large")
        return None
    system_prompt = PROMPTS.get("batch_system")
    block_template = PROMPTS.get("batch_block_template")
    parts = [block_template.format(i=i, chunk=c) for i,c in enumerate(chunks, start=1)]
    prompt = "\n".join(parts)
    start = time.time()
    raw = call_ollama(TEACHER_MODEL, prompt, system_prompt)
    record_call("pipeline:batch_generate_and_audit", time.time() - start, True if raw else False)
    if not raw:
        raise RuntimeError("generate_and_audit_batch: call_ollama returned no result")
    if isinstance(raw, (list, dict)):
        return raw if isinstance(raw, list) else [raw]
    parsed = _parse_json_array_or_objects(str(raw))

    if parsed is None:
        # Log truncated raw content for debugging and raise so callers see a traceback
        print("generate_and_audit_batch: could not parse LLM response. Raw response (truncated):", repr(raw)[:2000])
        raise ValueError(f"generate_and_audit_batch: could not parse LLM response: {repr(raw)[:2000]}")
    return parsed

## Processing orchestration & helpers ‚öôÔ∏è

**Purpose:** Orchestrate end-to-end processing across PDFs: `process_chunk`, `process_pdfs` (single-call or two-step), and `process_pdfs_with_batching`. Manages concurrency, caching lookups, and writes results in deterministic order.

**Usage:** Use `process_pdfs(max_chunks_per_pdf)` for regular runs or the batching variant when `USE_BATCHING` is enabled.

In [None]:
# Processing orchestration (single-call and batching)
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import shutil

def process_chunk(chunk: str, idx: int, semaphore: threading.BoundedSemaphore):
    chunk_hash = chunk_sha256(chunk)
    cached_pair = get_cached_sft_pair(chunk_hash)
    if cached_pair:
        return idx, json.dumps(cached_pair, ensure_ascii=False) + "\n"
    with semaphore:
        if USE_BATCHING:
            raise RuntimeError("process_chunk shouldn't be used in BATCHING mode")
        if USE_SINGLE_CALL:
            sft_pair = generate_and_audit_single(chunk)
            if sft_pair and random.random() < AUDIT_SAMPLE_RATE:
                strict_pair = generate_and_audit(chunk)
                if strict_pair:
                    sft_pair = strict_pair
        else:
            sft_pair = generate_and_audit(chunk)
    if sft_pair and isinstance(sft_pair, dict) and "instruction" in sft_pair:
        cache_sft_pair(chunk_hash, sft_pair)
        return idx, json.dumps(sft_pair, ensure_ascii=False) + "\n"
    return idx, None

def process_pdfs(max_chunks_per_pdf: int = None):
    from docling.document_converter import DocumentConverter
    converter = DocumentConverter()
    pdf_files = list(RAW_DATA_DIR.glob("*.pdf"))
    if not pdf_files:
        print("No PDFs found")
        return
    for pdf_path in pdf_files:
        pdf_stem = pdf_path.stem
        output_file = PROCESSED_DIR / f"{pdf_stem}.train.jsonl"
        md_content = get_markdown_for_pdf(pdf_path, converter)
        if not md_content:
            print("Conversion failed or produced no text for", pdf_path)
            continue
        pdf_hash = pdf_sha256(pdf_path)
        chunks = get_cached_chunks(pdf_hash) or chunk_text_to_chunks(md_content)
        cache_chunks(pdf_hash, chunks)
        process_count = len(chunks) if max_chunks_per_pdf is None else min(len(chunks), max_chunks_per_pdf)
        uncached_indices = [i for i in range(process_count) if get_cached_sft_pair(chunk_sha256(chunks[i])) is None]
        results_buffer = [None] * process_count
        if uncached_indices:
            semaphore = threading.BoundedSemaphore(MAX_LLM_CONCURRENCY)
            with ThreadPoolExecutor(max_workers=MAX_LLM_CONCURRENCY) as ex:
                futures = {ex.submit(process_chunk, chunks[i], i, semaphore): i for i in uncached_indices}
                processed = 0
                total = len(uncached_indices)
                PRINT_EVERY = 10 
                TIME_INTERVAL = 30 
                last_print = time.time()
                for future in as_completed(futures):
                    idx, line = future.result()
                    if line:
                        results_buffer[idx] = line
                    processed += 1
                    if processed % PRINT_EVERY == 0 or (time.time() - last_print) >= TIME_INTERVAL:
                        print(f"Progress: processed {processed}/{total} chunks")
                        last_print = time.time()
        for i in range(process_count):
            if results_buffer[i] is None:
                cached_pair = get_cached_sft_pair(chunk_sha256(chunks[i]))
                if cached_pair:
                    results_buffer[i] = json.dumps(cached_pair, ensure_ascii=False) + "\n"
        # Ensure processed directory exists before writing output
        PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
        with open(output_file, "w", encoding="utf-8") as f:
            for line in results_buffer:
                if line:
                    f.write(line)
        print(f"Saved entries ‚Üí {output_file}")

        try:
            destination = PROCESSED_DIR / pdf_path.name
            shutil.move(str(pdf_path), destination)
            print(f"Moved PDF: {pdf_path.name} ‚Üí {PROCESSED_DIR}")
        except Exception as e:
            print(f"Failed to move PDF {pdf_path.name}: {e}")

def process_pdfs_with_batching(max_chunks_per_pdf: int = None):
    from docling.document_converter import DocumentConverter
    converter = DocumentConverter()
    pdf_files = list(RAW_DATA_DIR.glob("*.pdf"))
    if not pdf_files:
        print("No PDFs found")
        return
    for pdf_path in pdf_files:
        pdf_stem = pdf_path.stem
        output_file = PROCESSED_DIR / f"{pdf_stem}.train.jsonl"
        md_content = get_markdown_for_pdf(pdf_path, converter)
        if not md_content:
            print("Conversion failed or produced no text for", pdf_path)
            continue
        pdf_hash = pdf_sha256(pdf_path)
        chunks = get_cached_chunks(pdf_hash) or chunk_text_to_chunks(md_content)
        cache_chunks(pdf_hash, chunks)
        process_count = len(chunks) if max_chunks_per_pdf is None else min(len(chunks), max_chunks_per_pdf)
        uncached_indices = [i for i in range(process_count) if get_cached_sft_pair(chunk_sha256(chunks[i])) is None]
        batches = []
        current = []
        current_chars = 0
        for idx in uncached_indices:
            c = chunks[idx]
            if len(current) >= BATCH_SIZE or (current_chars + len(c)) > MAX_BATCH_CHARS:
                batches.append(current)
                current = []
                current_chars = 0
            current.append(idx)
            current_chars += len(c)
        if current:
            batches.append(current)
        results_buffer = [None] * process_count
        if batches:
            processed_batches = 0
            total_batches = len(batches)
            PRINT_EVERY_BATCH = 1      # print every N batches
            TIME_INTERVAL_BATCH = 30   # or every T seconds
            last_print_batch = time.time()
            with ThreadPoolExecutor(max_workers=BATCH_CONCURRENCY) as executor:
                future_to_batch = {}
                for batch_idxs in batches:
                    batch_chunks = [chunks[i] for i in batch_idxs]
                    future = executor.submit(generate_and_audit_batch, batch_chunks)
                    future_to_batch[future] = batch_idxs
                for future in as_completed(future_to_batch):
                    batch_idxs = future_to_batch[future]
                    # Let exceptions propagate so you get a full traceback when a batch fails
                    out_list = future.result()
                    if out_list and isinstance(out_list, list):
                        for idx_in_batch, obj in enumerate(out_list):
                            target_idx = batch_idxs[idx_in_batch] if idx_in_batch < len(batch_idxs) else None
                            if target_idx is not None and isinstance(obj, dict) and "instruction" in obj:
                                cache_sft_pair(chunk_sha256(chunks[target_idx]), obj)
                                results_buffer[target_idx] = json.dumps(obj, ensure_ascii=False) + "\n"
                    else:
                        print("Batch returned invalid output")
                    processed_batches += 1
                    if processed_batches % PRINT_EVERY_BATCH == 0 or (time.time() - last_print_batch) >= TIME_INTERVAL_BATCH:
                        print(f"Progress: processed {processed_batches}/{total_batches} batches")
                        last_print_batch = time.time()
        for i in range(process_count):
            if results_buffer[i] is None:
                cached_pair = get_cached_sft_pair(chunk_sha256(chunks[i]))
                if cached_pair:
                    results_buffer[i] = json.dumps(cached_pair, ensure_ascii=False) + "\n"
        # Ensure processed directory exists before writing output
        PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
        with open(output_file, "w", encoding="utf-8") as f:
            for line in results_buffer:
                if line:
                    f.write(line)
        print(f"Saved entries ‚Üí {output_file}")

        try:
            destination = PROCESSED_DIR / pdf_path.name
            shutil.move(str(pdf_path), destination)
            print(f"Moved PDF: {pdf_path.name} ‚Üí {PROCESSED_DIR}")
        except Exception as e:
            print(f"Failed to move PDF {pdf_path.name}: {e}")

## Autotuner & Benchmarks ‚öñÔ∏è

**Purpose:** Micro-benchmarks for the single-call and batching pipelines and a helper to reconfigure `SESSION` concurrency. Use these to measure throughput and pick sensible concurrency/batching settings for your environment.

**Tip:** Run small probes with realistic chunks to get meaningful recommendations before full-scale runs.

In [None]:
# Autotuner & benchmarks (single, batch)
def _reconfigure_session_for_concurrency(concurrency: int):
    global SESSION
    import requests
    from requests.adapters import HTTPAdapter
    from urllib3.util.retry import Retry
    SESSION = requests.Session()
    OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY")
    if OLLAMA_API_KEY:
        SESSION.headers.update({"Authorization": f"Bearer {OLLAMA_API_KEY}"})
    retries = Retry(total=3, backoff_factor=0.6, status_forcelist=[429, 500, 502, 503, 504])
    adapter = HTTPAdapter(pool_connections=concurrency*2, pool_maxsize=concurrency*2, max_retries=retries)
    SESSION.mount("http://", adapter)
    SESSION.mount("https://", adapter)

def benchmark_single_call(chunks, concurrency=1, repeat=1):
    if not chunks:
        return {"mode":"single_call","concurrency":concurrency,"throughput":0.0,"total_processed":0,"total_time":0.0}
    _reconfigure_session_for_concurrency(concurrency)
    METRICS["calls"].clear()
    def _run_once():
        t0 = time.perf_counter()
        with ThreadPoolExecutor(max_workers=concurrency) as ex:
            futures = [ex.submit(generate_and_audit_single, c) for c in chunks]
            results = [f.result() for f in futures]
        t1 = time.perf_counter()
        duration = t1 - t0
        success = sum(1 for r in results if r and isinstance(r, dict) and "instruction" in r)
        return duration, success
    runs = [_run_once() for _ in range(repeat)]
    total_processed = sum(s for _, s in runs)
    total_time = sum(d for d, _ in runs)
    throughput = total_processed / total_time if total_time > 0 else 0
    return {"mode": "single_call", "concurrency": concurrency, "throughput": throughput, "total_processed": total_processed, "total_time": total_time}

def benchmark_batch(chunks, batch_size=4, batch_concurrency=1, repeat=1):
    """Benchmark batching pipeline using generate_and_audit_batch and return throughput."""
    if not chunks:
        return {"mode":"batch","batch_size":batch_size,"batch_concurrency":batch_concurrency,"throughput":0.0,"total_processed":0,"total_time":0.0}

    # Build batches respecting batch_size and MAX_BATCH_CHARS
    batches = []
    current = []
    current_chars = 0
    for c in chunks:
        if len(current) >= batch_size or (current_chars + len(c)) > MAX_BATCH_CHARS:
            batches.append(current)
            current = []
            current_chars = 0
        current.append(c)
        current_chars += len(c)
    if current:
        batches.append(current)

    _reconfigure_session_for_concurrency(batch_concurrency)
    METRICS["calls"].clear()

    def _run_once():
        t0 = time.perf_counter()
        with ThreadPoolExecutor(max_workers=batch_concurrency) as ex:
            futures = [ex.submit(generate_and_audit_batch, b) for b in batches]
            results = [f.result() for f in futures]
        t1 = time.perf_counter()
        duration = t1 - t0
        processed = 0
        for res in results:
            if isinstance(res, list):
                processed += len(res)
            elif isinstance(res, dict):
                processed += 1
        return duration, processed

    runs = [_run_once() for _ in range(repeat)]
    total_processed = sum(p for _, p in runs)
    total_time = sum(d for d, _ in runs)
    throughput = total_processed / total_time if total_time > 0 else 0
    return {"mode":"batch","batch_size":batch_size,"batch_concurrency":batch_concurrency,"throughput":throughput,"total_processed":total_processed,"total_time":total_time}

## Diagnostics üîç

**Purpose:** Quick checks to validate the cache client and session health (Redis/DiskCache availability). Useful to run after starting the kernel or when the pipeline behaves unexpectedly.

**Usage:** Run this cell after the imports/configuration cell to confirm environment readiness.

In [None]:
# Diagnostics cell (run immediately after imports if anything seems off)
print("Diagnostics:")
print("ThreadPoolExecutor:", ThreadPoolExecutor)
init_cache()
print("Using Redis:", _using_redis())
print("Cache client type:", type(_cache_client))

## Main runner & CLI helper ‚ñ∂Ô∏è

**Purpose:** `run_all()` selects the appropriate pipeline (batching or single-call) and starts processing. This wrapper is convenient for notebook and CLI usage.

**Usage:** Call `run_all()` with optional `max_chunks_per_pdf` to limit processing for testing.

In [None]:
# Dry-run helper (process entire first PDF concurrently; writes .dryrun.jsonl to processed dir)
def dry_run_process_chunk(chunk: str, idx: int, semaphore: threading.BoundedSemaphore):
    try:
        from docling.document_converter import DocumentConverter
        from concurrent.futures import ThreadPoolExecutor, as_completed
        from tqdm import tqdm
        import threading

        converter = DocumentConverter()
        pdf_files = list(RAW_DATA_DIR.glob("*.pdf"))
        if not pdf_files:
            print("No PDFs found for dry-run.")
        else:
            pdf_path = pdf_files[0]
            print("Dry-run using:", pdf_path)
            md = get_markdown_for_pdf(pdf_path, converter)
            if not md:
                print("Dry-run: no text extracted for", pdf_path)
            else:
                chunks = chunk_text_to_chunks(md)
                n_chunks = len(chunks)
                print(f"PDF produced {n_chunks} chunks")
                output_file = PROCESSED_DIR / f"{pdf_path.stem}.dryrun.jsonl"

                # concurrency bounded by MAX_LLM_CONCURRENCY and number of chunks
                concurrency = min(MAX_LLM_CONCURRENCY, max(1, n_chunks))
                semaphore = threading.BoundedSemaphore(concurrency)
                results_buffer = [None] * n_chunks
                processed = 0
                skipped = 0

                with ThreadPoolExecutor(max_workers=concurrency) as ex:
                    future_to_idx = {ex.submit(process_chunk, chunks[i], i, semaphore): i for i in range(n_chunks)}
                    for fut in tqdm(as_completed(future_to_idx), total=n_chunks, desc="Dry-run chunks"):
                        try:
                            idx, line = fut.result()
                        except Exception as e:
                            print("Chunk job failed:", e)
                            continue
                        if line:
                            results_buffer[idx] = line
                            processed += 1
                        else:
                            cached = get_cached_sft_pair(chunk_sha256(chunks[idx]))
                            if cached:
                                results_buffer[idx] = json.dumps(cached, ensure_ascii=False) + "\n"
                                skipped += 1

                # Write results in order
                # Ensure processed directory exists before writing output
                PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
                with open(output_file, "w", encoding="utf-8") as out_f:
                    for line in results_buffer:
                        if line:
                            out_f.write(line)

                print(f"Dry-run complete: processed={processed} skipped_cached={skipped} written‚Üí{output_file}")
    except Exception as e:
        print("Dry-run skipped:", e)

## Main runner & CLI helper ‚ñ∂Ô∏è

**Purpose:** `run_all()` selects the appropriate pipeline (batching or single-call) and starts processing. This wrapper is convenient for notebook and CLI usage.

**Usage:** Call `run_all()` with optional `max_chunks_per_pdf` to limit processing for testing.

In [14]:
# Main run helper and dry-run example
def run_all(max_chunks_per_pdf: int = None):
    if USE_BATCHING:
        process_pdfs_with_batching(max_chunks_per_pdf)
    else:
        process_pdfs(max_chunks_per_pdf)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Generate SFT training data from PDFs")
    parser.add_argument(
        "--max-chunks-per-pdf",
        type=int,
        default=None,
        help="Limit number of chunks processed per PDF (for testing)"
    )
    args = parser.parse_known_args()
    run_all(args[0].max_chunks_per_pdf)

2026-01-07 14:40:51,831 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2026-01-07 14:40:51,852 - INFO - Going to convert document batch...
2026-01-07 14:40:51,856 - INFO - Initializing pipeline for StandardPdfPipeline with options hash e15bc6f248154cc62f8db15ef18a8ab7
2026-01-07 14:40:51,864 - INFO - Accelerator device: 'cuda:0'
[32m[INFO] 2026-01-07 14:40:52,004 [RapidOCR] base.py:22: Using engine_name: onnxruntime[0m
[32m[INFO] 2026-01-07 14:40:52,034 [RapidOCR] download_file.py:60: File exists and is valid: /home/rahul/dev/sft-data-gen/.sftEnv/lib/python3.12/site-packages/rapidocr/models/ch_PP-OCRv4_det_infer.onnx[0m
[32m[INFO] 2026-01-07 14:40:52,035 [RapidOCR] main.py:53: Using /home/rahul/dev/sft-data-gen/.sftEnv/lib/python3.12/site-packages/rapidocr/models/ch_PP-OCRv4_det_infer.onnx[0m
[32m[INFO] 2026-01-07 14:40:52,221 [RapidOCR] base.py:22: Using engine_name: onnxruntime[0m
[32m[INFO] 2026-01-07 14:40:52,235 [RapidOCR] download_file.py:60: File exists and is va

call_ollama: raw response (truncated): {"instruction": "Based on the provided regulatory text, what is the legislative authority that establishes the Prudential Standard?", "output": "The Prudential Standard is made under subsection 230A(1) of the Life Insurance Act 1995."}
call_ollama: raw response (truncated): {"instruction": "According to the provided regulatory text, what are the two specific conditions that define a Referable Reinsurance Arrangement under Attachment B, and how is the scope of such arrangements described in terms of contract structure?", "output": "Referable Reinsurance Arrangements are defined by two conditions: (a) they typically do not involve significant transfer of insurance risk; and/or (b) they involve significant financing elements. The scope includes arrangements that may consist of a single contract or a combination of two or more individual contracts, as well as side letters or other forms of agreement."}
call_ollama: raw response (truncated): {"instruct

2026-01-07 14:41:05,460 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2026-01-07 14:41:05,500 - INFO - Going to convert document batch...
2026-01-07 14:41:05,504 - INFO - Processing document EziCoverZurichLifeInsurance.pdf


call_ollama: raw response (truncated): {"instruction": "According to the regulatory text, what must a life company do if clause 5(a) is not available, and what specific elements must be included in the submission to APRA?", "output": "The life company must submit to APRA a comprehensive description of the proposed arrangement, including details of any risk transfer and financing elements, prepared with regard to reporting standards made under the Financial Sector (Collection of Data) Act 2001, and must ensure that the arrangement will not have an inappropriate adverse effect on the company's balance sheet and capital position in any one period or over the entire term, will not adversely affect the interests of policy owners, has been reviewed within the context of its overall risk management and control systems and its Internal Capital Adequacy Assessment Process, and will not overall adversely affect the interests of policy owners."}
Progress: processed 10/11 chunks
call_ollama: raw r

2026-01-07 14:41:26,972 - INFO - Finished converting document EziCoverZurichLifeInsurance.pdf in 21.51 sec.


call_ollama: raw response (truncated): {"instruction": "Based on the provided regulatory text, what are the key eligibility requirements and application methods for Ezicover Life Insurance?", "output": "You must be eligible to apply. You can apply by phone or online."}
call_ollama: raw response (truncated): {"instruction": "Based on the provided regulatory excerpt, what are the specific additional features of Ezicover Life Insurance that are listed after the initial benefit amounts?", "output": "Inflation protection, Future insurability, Cover suspension"}
call_ollama: raw response (truncated): {"instruction": "What is the issue date of the Ezicover ¬Æ Life Insurance Product Disclosure Statement and Policy Document?", "output": "6 October 2023"}
call_ollama: raw response (truncated): {"instruction": "Based on the provided regulatory text, explain the key information that must be disclosed to a customer about Ezicover Life Insurance before they apply, focusing on the claims process and 