In [73]:
%pip install openai faiss-cpu python-dotenv numpy pyyaml

Note: you may need to restart the kernel to use updated packages.


In [91]:
from __future__ import annotations
import os
import re
import json
import time
import hashlib
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict

import numpy as np
import faiss
from dotenv import load_dotenv
from openai import OpenAI
from tqdm import tqdm

In [92]:
# ------------------ Config ------------------
load_dotenv()
API_KEY = os.getenv("OPENAI_API_KEY")
if not API_KEY:
    raise RuntimeError("OPENAI_API_KEY missing; set it in env or .env")

client = OpenAI(api_key=API_KEY)

MARKDOWN_DIR = Path("markdown")
OUT = Path("out")
OUT.mkdir(parents=True, exist_ok=True)

CHUNKS_JSONL = OUT / "chunks.jsonl"
META_JSON = OUT / "meta.json"
EMB_PATH = OUT / "embeddings.npy"
IDS_PATH = OUT / "ids.npy"
FAISS_INDEX_PATH = OUT / "index.faiss"

EMBED_MODEL = "text-embedding-3-small"
EMB_DIM_EXPECTED = 1536
BATCH_SIZE = 16

IMAGE_MD_RE = re.compile(r'!\[[^\]]*\]\(([^)]+)\)', flags=re.I)
PAGE_MARKER_RE = re.compile(r'<!--\s*PAGE\s*\d*\s*-->', flags=re.I)

In [93]:
# ------------------ Helpers ------------------
def short_id(base: str, page: int) -> str:
    h = hashlib.sha1(f"{base}:{page}".encode("utf-8")).hexdigest()
    return f"chunk_{h[:8]}"

def ensure_dir(p: Path) -> Path:
    p.mkdir(parents=True, exist_ok=True)
    return p


In [94]:
# ------------------ Data model ------------------
@dataclass
class Chunk:
    id: str
    source_file: str
    converted_at: Optional[str]
    notes: Optional[str]
    filename: str
    lang: Optional[str]
    text: str
    md: str
    images: List[Dict[str,str]]
    page: Optional[int]
    start_char: int
    end_char: int


In [95]:
# ------------------ Markdown parsing ------------------
def extract_frontmatter(md_text: str) -> Tuple[Dict[str, Any], str]:
    s = md_text
    m = re.match(r'^\s*---\s*\n(.*?)\n---\s*\n(.*)$', s, flags=re.S)
    if m:
        fm_raw = m.group(1).strip()
        remainder = m.group(2)
    else:
        parts = s.split('---', 2)
        if len(parts) >= 3:
            fm_raw = parts[1].strip()
            remainder = parts[2]
        else:
            return {}, md_text
    try:
        fm = json.loads(fm_raw)
    except Exception:
        try:
            import yaml
            fm = yaml.safe_load(fm_raw) or {}
        except Exception:
            fm = {}
    return fm, remainder

def split_pages(md_body: str) -> List[str]:
    if PAGE_MARKER_RE.search(md_body):
        parts = re.split(PAGE_MARKER_RE, md_body)
        if parts and not parts[0].strip():
            parts = parts[1:]
        return [p.strip() for p in parts]
    else:
        return [md_body.strip()]

def parse_page_images_and_text(page_md: str) -> Tuple[str, List[Dict[str,str]]]:
    images: List[Dict[str,str]] = []
    def repl(m):
        link = m.group(1).strip()
        images.append({"link": link, "description": ""})
        return " [image] "
    text = IMAGE_MD_RE.sub(repl, page_md)
    text = text.replace("[image]", " [image] ")
    text_for_emb = " ".join(text.split())
    return text_for_emb, images

In [96]:
# ------------------ Chunk builders ------------------
def build_chunks_from_markdown_file(md_path: Path, doc_prefix: Optional[str] = None) -> List[Chunk]:
    if not md_path.exists():
        print(f"[build_chunks] missing file: {md_path}")
        return []
    s = md_path.read_text(encoding="utf-8")
    fm, body = extract_frontmatter(s)
    converted_at = fm.get("converted_at")
    notes = fm.get("notes")
    fm_source = fm.get("source_file")
    if fm_source:
        src_name = str(fm_source)
        if doc_prefix:
            prefix = doc_prefix.rstrip('/')
            if not src_name.startswith(f"{prefix}/"):
                source_file = f"{prefix}/{src_name}"
            else:
                source_file = src_name
        else:
            source_file = src_name
    else:
        source_file = f"{doc_prefix.rstrip('/')}/{md_path.name}" if doc_prefix else md_path.name
    lang = fm.get("lang") if fm.get("lang") else None
    pages = split_pages(body)
    chunks: List[Chunk] = []
    for idx, page_md in enumerate(pages, start=1):
        text_for_emb, images = parse_page_images_and_text(page_md)
        cid = short_id(source_file, idx)
        c = Chunk(
            id=cid,
            source_file=source_file,
            converted_at=converted_at,
            notes=notes,
            filename=md_path.name,
            lang=lang,
            text=text_for_emb,
            md=page_md.strip(),
            images=images.copy(),
            page=idx,
            start_char=0,
            end_char=len(text_for_emb)
        )
        chunks.append(c)
    print(f"[build_chunks] {md_path.name}: pages={len(pages)} chunks={len(chunks)}")
    return chunks

def build_all_chunks(md_dir: Path = MARKDOWN_DIR, doc_prefix: Optional[str] = None) -> List[Chunk]:
    md_files = sorted(md_dir.glob("*.md"))
    if not md_files:
        print(f"[warn] no markdown files in {md_dir.resolve()}")
        return []
    all_chunks: List[Chunk] = []
    for md in md_files:
        all_chunks.extend(build_chunks_from_markdown_file(md, doc_prefix=doc_prefix))
    # write chunks.jsonl
    with CHUNKS_JSONL.open("w", encoding="utf-8") as fh:
        for c in all_chunks:
            fh.write(json.dumps(asdict(c), ensure_ascii=False) + "\n")
    # build meta.json keyed by numeric id
    meta = {}
    for c in all_chunks:
        nid = int(hashlib.sha1(c.id.encode()).hexdigest()[:15], 16)
        meta[str(nid)] = asdict(c)
    META_JSON.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"[out] wrote {len(all_chunks)} chunks -> {CHUNKS_JSONL}, meta -> {META_JSON}")
    return all_chunks

In [97]:
# ------------------ Embeddings ------------------
def embed_batch_safe(texts: List[str], model: str = EMBED_MODEL, max_retries:int = 6, backoff_base: float = 0.5):
    tries = 0
    while True:
        try:
            resp = client.embeddings.create(model=model, input=texts)
            out = []
            for item in resp.data:
                emb = getattr(item, "embedding", None) or (item.get("embedding") if isinstance(item, dict) else None)
                if emb is None:
                    raise RuntimeError("embedding parse error")
                out.append(emb)
            return out
        except Exception as e:
            tries += 1
            if tries > max_retries:
                raise
            wait = backoff_base * (2 ** (tries-1))
            print(f"[emb] retry {tries}/{max_retries}, sleeping {wait:.1f}s: {e}")
            time.sleep(wait)

def build_or_resume_embeddings(chunks: List[Chunk], batch_size:int = BATCH_SIZE) -> Tuple[np.ndarray, np.ndarray]:
    if not chunks:
        raise RuntimeError("No chunks to embed.")
    numeric_ids = [int(hashlib.sha1(c.id.encode()).hexdigest()[:15], 16) for c in chunks]
    existing_map: Dict[int, np.ndarray] = {}
    if EMB_PATH.exists() and IDS_PATH.exists():
        try:
            arr = np.load(EMB_PATH)
            ids = np.load(IDS_PATH).astype(np.int64)
            if arr.ndim == 2 and arr.shape[1] == EMB_DIM_EXPECTED and ids.shape[0] == arr.shape[0]:
                for i, nid in enumerate(ids):
                    existing_map[int(nid)] = arr[i].astype(np.float32)
                print(f"[emb] loaded existing embeddings into map: {len(existing_map)} entries")
            else:
                print("[emb] existing embeddings file shape mismatch — ignoring")
        except Exception as e:
            print("[emb] failed to load existing embeddings — ignoring:", e)
            existing_map = {}

    final_vecs: List[Optional[np.ndarray]] = [None] * len(numeric_ids)
    to_compute_indices: List[int] = []
    for i, nid in enumerate(numeric_ids):
        v = existing_map.get(nid)
        if v is not None:
            final_vecs[i] = v
        else:
            to_compute_indices.append(i)

    print(f"[emb] total_chunks={len(chunks)} reuse={len(chunks)-len(to_compute_indices)} compute={len(to_compute_indices)}")

    for i in range(0, len(to_compute_indices), batch_size):
        batch_idx = to_compute_indices[i:i+batch_size]
        batch_texts = [chunks[j].text if chunks[j].text.strip() else " " for j in batch_idx]
        print(f"[emb] computing batch {i//batch_size+1}: size {len(batch_texts)}")
        out = embed_batch_safe(batch_texts)
        for k, pos in enumerate(batch_idx):
            final_vecs[pos] = np.array(out[k], dtype=np.float32)
        time.sleep(0.05)

    missing = [i for i, v in enumerate(final_vecs) if v is None]
    if missing:
        raise RuntimeError(f"[emb] missing embeddings for indices: {missing}")

    arr = np.vstack([v.reshape(1, -1) for v in final_vecs]).astype(np.float32)
    ids_arr = np.array(numeric_ids, dtype=np.int64)

    # overwrite saved embeddings/ids so they match current chunks order
    np.save(EMB_PATH, arr)
    np.save(IDS_PATH, ids_arr)
    print(f"[emb] saved embeddings -> {EMB_PATH} shape={arr.shape}, ids -> {IDS_PATH} count={ids_arr.shape[0]}")

    return arr, ids_arr

In [98]:
# ------------------ FAISS ------------------
def build_and_save_faiss(emb_arr: np.ndarray, emb_ids: np.ndarray, index_path: Path = FAISS_INDEX_PATH):
    if emb_arr is None or emb_arr.shape[0] == 0:
        raise RuntimeError("No embeddings to index")
    arr_copy = emb_arr.astype(np.float32, copy=True)
    faiss.normalize_L2(arr_copy)
    dim = arr_copy.shape[1]
    index = faiss.IndexFlatIP(dim)
    idmap = faiss.IndexIDMap(index)
    idmap.add_with_ids(arr_copy, emb_ids.astype(np.int64))
    faiss.write_index(idmap, str(index_path))
    print(f"[faiss] saved index -> {index_path} (ntotal={idmap.ntotal})")
    return idmap

In [99]:
# ------------------ Search ------------------
def load_meta(out_dir: Path = OUT) -> Dict[str,Any]:
    if not (out_dir / "meta.json").exists():
        return {}
    return json.loads((out_dir / "meta.json").read_text(encoding="utf-8"))

def embed_query(q: str, model: str = EMBED_MODEL) -> np.ndarray:
    resp = client.embeddings.create(model=model, input=[q])
    item = resp.data[0]
    emb = getattr(item, "embedding", None) or (item.get("embedding") if isinstance(item, dict) else None)
    if emb is None:
        raise RuntimeError("Can't parse embedding response")
    arr = np.array(emb, dtype=np.float32)
    return arr

def search_topk(query: str, top_k: int = 5, index: faiss.Index = None, meta: Dict[str,Any] = None):
    if index is None:
        if not FAISS_INDEX_PATH.exists():
            raise RuntimeError("FAISS index not found")
        idx = faiss.read_index(str(FAISS_INDEX_PATH))
    else:
        idx = index
    if meta is None:
        meta = load_meta(OUT)
    if idx.ntotal == 0:
        print("[search] empty index")
        return []
    q = embed_query(query)
    q_arr = np.array([q], dtype=np.float32)
    faiss.normalize_L2(q_arr)
    search_k = min(max(64, top_k * 8), max(1, int(idx.ntotal)))
    print(f"[search] idx.ntotal={idx.ntotal} top_k={top_k} search_k={search_k}")
    D, I = idx.search(q_arr, search_k)
    results = []
    for sc, nid in zip(D[0].tolist(), I[0].tolist()):
        if nid == -1:
            continue
        key = str(int(nid))
        m = meta.get(key)
        if not m:
            # still warn (should not happen if emb/ids/meta aligned)
            print(f"[search] warn: missing meta for id {key}")
            continue
        results.append({"score": float(sc), "meta": m})
        if len(results) >= top_k:
            break
    return results

In [100]:
# ------------------ Main ------------------
def main():
    print("RAG pipeline start. MARKDOWN_DIR:", MARKDOWN_DIR.resolve(), "OUT:", OUT.resolve())
    chunks = build_all_chunks(MARKDOWN_DIR)
    if not chunks:
        raise RuntimeError("No chunks produced. Check markdown files and MARKDOWN_DIR.")
    emb_arr, emb_ids = build_or_resume_embeddings(chunks, batch_size=min(BATCH_SIZE, 32))
    print("[main] embeddings shape:", getattr(emb_arr, "shape", None), "ids shape:", getattr(emb_ids, "shape", None))
    index = build_and_save_faiss(emb_arr, emb_ids)
    meta = load_meta(OUT)

    # quick smoke queries (modify these or remove)
    queries = [
        "Как подать на ВНЖ?",
        "Где купить Marca da Bollo?",
        "Как подать в университет?",
        "How do I apply to university?"
    ]
    print("\n" + "-"*80)
    for q in queries:
        print("\nQuery:", q)
        res = search_topk(q, top_k=5, index=index, meta=meta)
        if not res:
            print(" -> No results")
            continue
        for i, r in enumerate(res, start=1):
            m = r["meta"]
            score = r["score"]
            # PREFER the frontmatter source_file (it may include path like 'italy/application_ru.pdf')
            fname = m.get("source_file") or m.get("filename")
            page = m.get("page", "n/a")
            snippet = (m.get("text") or m.get("md") or "")[:400].replace("\n", " ").strip()
            print(f"{i:02d}) score={score:.4f}  file={fname}  page={page}")
            print("    snippet:", (snippet[:220] + ("..." if len(snippet) > 220 else "")))
    print("\n[done]")

if __name__ == "__main__":
    main()

RAG pipeline start. MARKDOWN_DIR: /Users/rakymzhan/Documents/projects/src/distributed-agent/python/RAG/markdown OUT: /Users/rakymzhan/Documents/projects/src/distributed-agent/python/RAG/out
[build_chunks] application_ru.md: pages=10 chunks=10
[build_chunks] residence_permit_ru.md: pages=10 chunks=10
[out] wrote 20 chunks -> out/chunks.jsonl, meta -> out/meta.json
[emb] loaded existing embeddings into map: 20 entries
[emb] total_chunks=20 reuse=20 compute=0
[emb] saved embeddings -> out/embeddings.npy shape=(20, 1536), ids -> out/ids.npy count=20
[main] embeddings shape: (20, 1536) ids shape: (20,)
[faiss] saved index -> out/index.faiss (ntotal=20)

--------------------------------------------------------------------------------

Query: Как подать на ВНЖ?
[search] idx.ntotal=20 top_k=5 search_k=20
01) score=0.5205  file=italy/residence_permit_ru.pdf  page=10
    snippet: Этап 3: Забираете свой ВНЖ с полицейского участка [image] Permesso di Soggiorno пример Приносите свой паспорт и все б