# Jerusalem RAG Explorer - Ingestion Pipeline

**By Yotam Nachtomy-Katz** | ID: 211718366 | Haifa University

This notebook demonstrates the ingestion pipeline: chunking documents, translating non-English text, generating embeddings, and building the FAISS index.

## 1. Setup and Imports

In [None]:
import os
import re
import json
import hashlib
import time
from pathlib import Path
from typing import Optional

import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from tqdm.notebook import tqdm
from dotenv import load_dotenv
from langdetect import detect, LangDetectException

# Load environment variables
load_dotenv("../.env")

# Check for API key
if not os.getenv("GEMINI_API_KEY"):
    print("WARNING: GEMINI_API_KEY not found. Translation will be skipped.")
else:
    print("GEMINI_API_KEY loaded successfully.")

## 2. Configuration

In [None]:
# Paths
DATA_DIR = Path("../data/raw")
INDEX_DIR = Path("../data/index_v2")
TRANSLATION_CACHE_DIR = Path("../data/translations")

# Chunking parameters
CHUNK_SIZE = 2000  # characters per chunk
OVERLAP = 300      # overlap between chunks

# Embedding model
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"

# Language mappings
LANGUAGE_NAMES = {
    "en": "English",
    "la": "Latin",
    "ar": "Arabic",
    "el": "Greek",
    "fr": "French",
    "hy": "Armenian",
}

def get_language_name(code: str) -> str:
    return LANGUAGE_NAMES.get(code, code.upper())

## 3. Text Chunking

Documents are split into overlapping segments to preserve context across chunk boundaries.

In [None]:
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = OVERLAP) -> list[str]:
    """Split text into overlapping chunks."""
    chunks = []
    i = 0
    while i < len(text):
        part = text[i : i + chunk_size].strip()
        if part:
            chunks.append(part)
        i += chunk_size - overlap
    return chunks

# Demonstrate chunking
sample_text = "A" * 5000
sample_chunks = chunk_text(sample_text)
print(f"Sample: {len(sample_text)} chars -> {len(sample_chunks)} chunks")
print(f"Chunk sizes: {[len(c) for c in sample_chunks]}")

## 4. Metadata Extraction

We extract metadata from the document headers that were added during fetching.

In [None]:
def extract_metadata_from_header(text: str) -> dict:
    """Extract metadata from document header."""
    metadata = {}
    lines = text[:2000].split("\n")

    for line in lines:
        if line.startswith("---"):
            break
        if ":" in line:
            key, _, value = line.partition(":")
            key = key.strip().lower()
            value = value.strip()
            if key == "title":
                metadata["title"] = value
            elif key == "author":
                metadata["author"] = value
            elif key == "language":
                metadata["language"] = value
            elif key == "url":
                metadata["source_url"] = value
            elif key == "source":
                metadata["source_repository"] = value.lower()

    return metadata


def detect_repository(filepath: Path) -> str:
    """Detect source repository from file path."""
    path_str = str(filepath).lower()
    if "gallica" in path_str:
        return "gallica"
    if "archive" in path_str:
        return "archive"
    if "wiki" in path_str:
        return "wiki"
    return "unknown"

## 5. Language Detection

In [None]:
def is_english(text: str) -> bool:
    """Check if text is primarily English."""
    try:
        sample_start = min(500, len(text) // 4)
        sample = text[sample_start : sample_start + 2000]
        return detect(sample) == "en"
    except LangDetectException:
        return True


def detect_language(text: str) -> str:
    """Detect language of text. Returns ISO 639-1 code."""
    # Check for explicit language markers in header
    text_lower = text[:500].lower()
    if "language: la" in text_lower:
        return "la"
    if "language: ar" in text_lower:
        return "ar"
    if "language: el" in text_lower or "language: greek" in text_lower:
        return "el"
    if "language: fr" in text_lower:
        return "fr"

    # Use langdetect for automatic detection
    try:
        sample_start = min(500, len(text) // 4)
        sample = text[sample_start : sample_start + 2000]
        detected = detect(sample)
        # Map common misdetections
        if detected in ("hr", "sr"):  # Often misdetects Latin
            if any(w in text.lower() for w in ["rex", "deus", "anno", "ecclesia"]):
                return "la"
        return detected
    except LangDetectException:
        return "en"

## 6. Translation Pipeline

Non-English chunks are translated using the Gemini API with caching to avoid redundant API calls.

In [None]:
class TranslationCache:
    """Simple file-based cache for translations."""

    def __init__(self, cache_dir: Path):
        self.cache_dir = cache_dir
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.index_file = self.cache_dir / "index.json"
        self.index = self._load_index()

    def _load_index(self) -> dict:
        if self.index_file.exists():
            return json.loads(self.index_file.read_text(encoding="utf-8"))
        return {}

    def _save_index(self):
        self.index_file.write_text(
            json.dumps(self.index, ensure_ascii=False, indent=2), encoding="utf-8"
        )

    def _hash_text(self, text: str) -> str:
        return hashlib.sha256(text.encode()).hexdigest()[:16]

    def get(self, text: str, source_lang: str) -> Optional[str]:
        key = f"{source_lang}_{self._hash_text(text)}"
        if key in self.index:
            cache_file = self.cache_dir / f"{key}.txt"
            if cache_file.exists():
                return cache_file.read_text(encoding="utf-8")
        return None

    def put(self, text: str, source_lang: str, translation: str):
        key = f"{source_lang}_{self._hash_text(text)}"
        cache_file = self.cache_dir / f"{key}.txt"
        cache_file.write_text(translation, encoding="utf-8")
        self.index[key] = {
            "source_lang": source_lang,
            "original_len": len(text),
            "translated_len": len(translation),
        }
        self._save_index()


# Initialize cache
translation_cache = TranslationCache(TRANSLATION_CACHE_DIR)
print(f"Translation cache has {len(translation_cache.index)} entries")

In [None]:
# Translation function using Gemini API
from google import genai
from google.genai import types

def translate_text(text: str, source_lang: str, cache: TranslationCache) -> str:
    """Translate text using Gemini API with caching."""
    # Check cache first
    cached = cache.get(text, source_lang)
    if cached:
        return cached
    
    # Rate limit
    time.sleep(15)  # 4 requests per minute
    
    # Build translation prompt
    lang_name = get_language_name(source_lang)
    prompt = f"""You are a scholarly translator specializing in medieval texts.
Translate the following {lang_name} text to English.

IMPORTANT:
- Preserve proper nouns (names of people, places) in their common English forms
- Keep historical terms with brief clarification if needed
- Maintain the tone and style of medieval chronicles
- If text contains OCR errors, do your best to interpret the intended meaning

TEXT TO TRANSLATE:
{text}

ENGLISH TRANSLATION:"""

    try:
        client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
        config = types.GenerateContentConfig(
            temperature=0.1,
            max_output_tokens=4096,
        )
        response = client.models.generate_content(
            model="gemini-3-flash-preview",
            contents=prompt,
            config=config,
        )
        translation = response.text or ""
        
        # Cache the result
        if translation:
            cache.put(text, source_lang, translation)
        
        return translation
    except Exception as e:
        print(f"Translation error: {e}")
        return ""

## 7. Load and Process Documents

In [None]:
def load_file(filepath: Path) -> list[dict]:
    """Load a single file and create chunks with metadata."""
    text = filepath.read_text(encoding="utf-8", errors="ignore")

    # Extract metadata from header
    metadata = extract_metadata_from_header(text)
    repository = detect_repository(filepath)

    # Detect or use declared language
    declared_lang = metadata.get("language", "").lower()
    if declared_lang in ("la", "latin"):
        lang = "la"
    elif declared_lang in ("ar", "arabic"):
        lang = "ar"
    elif declared_lang in ("el", "greek"):
        lang = "el"
    elif declared_lang in ("fr", "french"):
        lang = "fr"
    elif is_english(text):
        lang = "en"
    else:
        lang = detect_language(text)

    # Create chunks
    prefix = re.sub(r"[^\w\-]", "_", filepath.stem)
    raw_chunks = chunk_text(text)

    chunks = []
    for i, chunk_text_content in enumerate(raw_chunks):
        chunk = {
            "chunk_id": f"{prefix}_chunk_{i:03d}",
            "source": str(filepath),
            "text": chunk_text_content,
            "language": lang,
            "language_name": get_language_name(lang),
            "is_translation": False,
            "original_language": None,
            "original_text": None,
            "author": metadata.get("author"),
            "title": metadata.get("title"),
            "source_url": metadata.get("source_url"),
            "source_repository": repository,
        }
        chunks.append(chunk)

    return chunks

In [None]:
# Load all text files
files = list(DATA_DIR.rglob("*.txt"))
print(f"Found {len(files)} text files")

all_chunks = []
for filepath in tqdm(files, desc="Loading files"):
    file_chunks = load_file(filepath)
    all_chunks.extend(file_chunks)

print(f"Created {len(all_chunks)} chunks")

# Count by language
lang_counts = {}
for c in all_chunks:
    lang = c.get("language", "en")
    lang_counts[lang] = lang_counts.get(lang, 0) + 1
    
print("\nChunks by language:")
for lang, count in sorted(lang_counts.items(), key=lambda x: -x[1]):
    print(f"  {get_language_name(lang)}: {count}")

## 8. Translate Non-English Chunks

**Note:** This step may take a long time due to API rate limits. You can limit the number of translations or skip this step entirely.

In [None]:
# Configuration for translation
TRANSLATE = True  # Set to False to skip translation
MAX_TRANSLATE = 15  # Limit translations (set to None for unlimited)

if TRANSLATE and os.getenv("GEMINI_API_KEY"):
    non_english = [c for c in all_chunks if c.get("language", "en") != "en"]
    print(f"Non-English chunks to translate: {len(non_english)}")
    
    if MAX_TRANSLATE and MAX_TRANSLATE < len(non_english):
        print(f"Limiting to {MAX_TRANSLATE} chunks")
        non_english = non_english[:MAX_TRANSLATE]
    
    translated_count = 0
    for chunk in tqdm(non_english, desc="Translating"):
        lang = chunk.get("language", "en")
        original_text = chunk["text"]
        
        translated = translate_text(original_text, lang, translation_cache)
        
        if translated:
            chunk["original_text"] = original_text
            chunk["text"] = translated
            chunk["is_translation"] = True
            chunk["original_language"] = lang
            translated_count += 1
    
    print(f"\nTranslated {translated_count} chunks")
else:
    print("Skipping translation (disabled or no API key)")

## 9. Generate Embeddings

In [None]:
# Load embedding model
print(f"Loading embedding model: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)

# Get text from each chunk
texts = [c["text"] for c in all_chunks]
print(f"Embedding {len(texts)} chunks...")

# Generate embeddings
embeddings = model.encode(
    texts,
    normalize_embeddings=True,
    show_progress_bar=True,
)
embeddings = np.array(embeddings, dtype="float32")

print(f"Embeddings shape: {embeddings.shape}")

## 10. Build FAISS Index

In [None]:
# Build FAISS index
dim = embeddings.shape[1]
print(f"Building FAISS index with {dim} dimensions...")

index = faiss.IndexFlatIP(dim)  # Inner product for cosine similarity
index.add(embeddings)

print(f"FAISS index built with {index.ntotal} vectors")

## 11. Save Index and Metadata

In [None]:
# Create output directory
INDEX_DIR.mkdir(parents=True, exist_ok=True)

# Save FAISS index
faiss.write_index(index, str(INDEX_DIR / "faiss.index"))
print(f"Saved FAISS index to {INDEX_DIR / 'faiss.index'}")

# Save chunks with full metadata
with open(INDEX_DIR / "chunks.json", "w", encoding="utf-8") as f:
    json.dump(all_chunks, f, ensure_ascii=False, indent=2)
print(f"Saved {len(all_chunks)} chunks to {INDEX_DIR / 'chunks.json'}")

# Save config
config = {
    "chunk_size": CHUNK_SIZE,
    "overlap": OVERLAP,
    "embedding_model": MODEL_NAME,
    "embedding_dim": int(dim),
    "total_chunks": len(all_chunks),
}
with open(INDEX_DIR / "config.json", "w", encoding="utf-8") as f:
    json.dump(config, f, indent=2)
print(f"Saved config to {INDEX_DIR / 'config.json'}")

## 12. Test the Index

In [None]:
# Test query
test_query = "Battle of Hattin"
q_embedding = model.encode([test_query], normalize_embeddings=True)
q_embedding = np.array(q_embedding, dtype="float32")

# Search
scores, indices = index.search(q_embedding, 3)

print(f"Test query: '{test_query}'\n")
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
    chunk = all_chunks[idx]
    print(f"Result {i+1} (score: {score:.3f})")
    print(f"  Chunk ID: {chunk['chunk_id']}")
    print(f"  Language: {chunk['language_name']}")
    print(f"  Preview: {chunk['text'][:200]}...")
    print()

## Next Steps

The index is now ready! Proceed to:
- **03_retrieval.ipynb** - Query the index and generate answers with Gemini