# Dataset Pipeline Skeleton (KnowSLM-style)

This notebook gives a **skeleton pipeline** for:
- loading Delhi-food and poetry source documents,
- parsing raw files into clean text,
- chunking for RAG/indexing,
- generating synthetic conversation pairs (DSPy-style 2-step prompting),
- exporting training-ready JSONL.

You can fill in model-specific choices and run end-to-end incrementally.


In [None]:
# Optional installs (run once)
# %pip install -q pypdf beautifulsoup4 trafilatura litellm python-dotenv


In [3]:
from pathlib import Path
import os
import re
import json
from dataclasses import dataclass, asdict
from typing import List, Dict, Any

#from dotenv import load_dotenv
#load_dotenv()

PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATASET_ROOT = PROJECT_ROOT / 'data' / 'dataset'
DELHI_DIR = DATASET_ROOT / 'delhi-food'
POETRY_DIR = DATASET_ROOT / 'poetry'
WORK_DIR = DATASET_ROOT / '_processed'

PARSED_DIR = WORK_DIR / 'parsed_text'
CHUNKS_DIR = WORK_DIR / 'chunks'
EXPORT_DIR = WORK_DIR / 'exports'

for d in [PARSED_DIR, CHUNKS_DIR, EXPORT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print('PROJECT_ROOT:', PROJECT_ROOT)
print('DELHI_DIR exists:', DELHI_DIR.exists())
print('POETRY_DIR exists:', POETRY_DIR.exists())


PROJECT_ROOT: c:\Users\Pranav\Desktop\proj\github_projects\knowsLM_implementation
DELHI_DIR exists: True
POETRY_DIR exists: True


In [4]:
# File inventory

def list_files(base: Path, exts=None):
    exts = exts or ['.pdf', '.html', '.htm', '.txt', '.md']
    files = [p for p in base.rglob('*') if p.is_file() and p.suffix.lower() in exts]
    return sorted(files)

delhi_files = list_files(DELHI_DIR)
poetry_files = list_files(POETRY_DIR)

print(f'Delhi files: {len(delhi_files)}')
print(f'Poetry files: {len(poetry_files)}')
print('Sample Delhi files:')
for p in delhi_files[:5]:
    print('-', p.name)


Delhi files: 21
Poetry files: 8
Sample Delhi files:
- 01_delhi-tourism_small-walk_brochure.pdf
- 02_delhi-tourism_heritage-walk_itinerary.pdf
- 03_holidify_delhi-street-food-guide.pdf
- 04_holidify_cafes-in-delhi.pdf
- 05_holidify_delhi_seasonal-travel-food-context.pdf


In [None]:
# Parsing utilities (loss-minimizing skeleton)

from bs4 import BeautifulSoup
from pypdf import PdfReader

try:
    import trafilatura
except Exception:
    trafilatura = None


def read_pdf(path: Path) -> str:
    reader = PdfReader(str(path))
    pages = []
    for i, page in enumerate(reader.pages, start=1):
        txt = page.extract_text() or ''
        pages.append(f"\n\n[PAGE {i}]\n" + txt)
    return ''.join(pages).strip()


def read_html(path: Path) -> str:
    raw = path.read_text(encoding='utf-8', errors='ignore')
    if trafilatura:
        extracted = trafilatura.extract(raw, include_comments=False, include_tables=True)
        if extracted:
            return extracted.strip()

    soup = BeautifulSoup(raw, 'html.parser')
    for tag in soup(['script', 'style', 'noscript']):
        tag.decompose()
    text = soup.get_text(separator='')
    text = re.sub(r'{3,}', '', text)
    return text.strip()


def read_text_file(path: Path) -> str:
    return path.read_text(encoding='utf-8', errors='ignore').strip()


def parse_document(path: Path) -> Dict[str, Any]:
    ext = path.suffix.lower()
    if ext == '.pdf':
        text = read_pdf(path)
    elif ext in {'.html', '.htm'}:
        text = read_html(path)
    elif ext in {'.txt', '.md'}:
        text = read_text_file(path)
    else:
        text = ''

    return {
        'source_path': str(path),
        'source_name': path.name,
        'ext': ext,
        'char_count': len(text),
        'text': text,
    }


In [None]:
# Parse and persist plain text snapshots

def save_parsed(doc: Dict[str, Any], out_dir: Path):
    stem = Path(doc['source_name']).stem
    out_path = out_dir / f"{stem}.txt"
    payload = doc['text']
    out_path.write_text(payload, encoding='utf-8')
    return out_path

all_docs = []
for p in delhi_files + poetry_files:
    parsed = parse_document(p)
    all_docs.append(parsed)
    save_parsed(parsed, PARSED_DIR)

print('Parsed docs:', len(all_docs))
short_docs = [d for d in all_docs if d['char_count'] < 800]
print('Potentially weak extractions (<800 chars):', len(short_docs))
for d in short_docs[:10]:
    print('-', d['source_name'], d['char_count'])


In [None]:
# Chunking utilities (heading-agnostic baseline; replace with semantic splitter later)

@dataclass
class Chunk:
    chunk_id: str
    source_name: str
    source_path: str
    corpus: str  # delhi-food / poetry
    start_char: int
    end_char: int
    text: str


def simple_char_chunk(text: str, size: int = 1800, overlap: int = 300) -> List[Dict[str, Any]]:
    if not text:
        return []
    chunks = []
    i = 0
    n = len(text)
    while i < n:
        j = min(i + size, n)
        chunks.append({'start': i, 'end': j, 'text': text[i:j]})
        if j == n:
            break
        i = max(0, j - overlap)
    return chunks


def corpus_of(path: str) -> str:
    p = Path(path)
    if 'delhi-food' in p.parts:
        return 'delhi-food'
    if 'poetry' in p.parts:
        return 'poetry'
    return 'unknown'

all_chunks: List[Chunk] = []
for doc in all_docs:
    parts = simple_char_chunk(doc['text'], size=1800, overlap=300)
    c = corpus_of(doc['source_path'])
    for idx, part in enumerate(parts):
        all_chunks.append(
            Chunk(
                chunk_id=f"{Path(doc['source_name']).stem}::chunk_{idx}",
                source_name=doc['source_name'],
                source_path=doc['source_path'],
                corpus=c,
                start_char=part['start'],
                end_char=part['end'],
                text=part['text'],
            )
        )

print('Total chunks:', len(all_chunks))
print('Delhi chunks:', sum(1 for c in all_chunks if c.corpus == 'delhi-food'))
print('Poetry chunks:', sum(1 for c in all_chunks if c.corpus == 'poetry'))


In [None]:
# Save chunk artifacts for LangChain/LlamaIndex ingestion

chunks_jsonl = CHUNKS_DIR / 'all_chunks.jsonl'
with chunks_jsonl.open('w', encoding='utf-8') as f:
    for c in all_chunks:
        f.write(json.dumps(asdict(c), ensure_ascii=False) + '\n')

print('Saved:', chunks_jsonl)


## DSPy-Style Synthetic Dialogue Skeleton

Paper-style usage:
1. Generate a conversation-starting user query from a knowledge chunk.
2. Generate a concise assistant answer + one follow-up question.

Below is a minimal implementation skeleton using LiteLLM.
You can later swap this with native DSPy signatures/modules.


In [None]:
from litellm import completion

LLM_MODEL = os.getenv('SYNTHETIC_GEN_MODEL', 'gemini/gemini-2.0-flash')

QUESTION_PROMPT = (
    "Generate one conversation-initiating statement in English/Hinglish based on the knowledge. "
    "Use varied starts like why/when/where/how. Keep it natural and unique.\n\n"
    "KNOWLEDGE:\n{knowledge}"
)

ANSWER_PROMPT = (
    "Give an informative response in English in 2 lines, grounded in the knowledge. "
    "Then ask one thoughtful follow-up question in English/Hinglish. "
    "Do not generate more turns.\n\n"
    "KNOWLEDGE:\n{knowledge}\n\n"
    "USER_QUESTION:\n{question}"
)


def llm_call(prompt: str, model: str = LLM_MODEL, temperature: float = 0.4) -> str:
    resp = completion(
        model=model,
        messages=[{'role': 'user', 'content': prompt}],
        temperature=temperature,
    )
    return resp.choices[0].message.content.strip()


def generate_synthetic_pair(knowledge_text: str) -> Dict[str, str]:
    q = llm_call(QUESTION_PROMPT.format(knowledge=knowledge_text[:4000]), temperature=0.7)
    a = llm_call(ANSWER_PROMPT.format(knowledge=knowledge_text[:4000], question=q), temperature=0.4)
    return {'question': q, 'answer': a}


In [None]:
# Dry run on a small sample (set RUN_SYNTHETIC=True when ready)

RUN_SYNTHETIC = False
SAMPLE_N = 8

synthetic_rows = []
if RUN_SYNTHETIC:
    sample_chunks = [c for c in all_chunks if c.corpus == 'delhi-food'][:SAMPLE_N]
    for c in sample_chunks:
        pair = generate_synthetic_pair(c.text)
        synthetic_rows.append({
            'chunk_id': c.chunk_id,
            'source_name': c.source_name,
            'corpus': c.corpus,
            **pair,
        })

print('Synthetic rows:', len(synthetic_rows))
if synthetic_rows:
    print(synthetic_rows[0])


In [None]:
# Export training-ready JSONL skeleton

out_jsonl = EXPORT_DIR / 'synthetic_dialogues.jsonl'
if synthetic_rows:
    with out_jsonl.open('w', encoding='utf-8') as f:
        for row in synthetic_rows:
            f.write(json.dumps(row, ensure_ascii=False) + '\n')
    print('Saved:', out_jsonl)
else:
    print('No rows exported. Set RUN_SYNTHETIC=True and rerun generation cell.')


## Next Steps

- Replace `simple_char_chunk` with heading-aware/semantic chunking.
- Add OCR fallback for scanned PDFs.
- Add deduplication + quality filters for generated dialogues.
- Add separate generation configs for `delhi-food` vs `poetry` styles.
- Add train/val/test split exports.
