In [1]:
import os
import pandas as pd
import numpy as np
from pypdf import PdfReader, PdfWriter
from transformers import AutoTokenizer, pipeline
from sentence_transformers import SentenceTransformer
from pathlib import Path
from dataclasses import dataclass
import json
import faiss
from docx import Document
from openpyxl import load_workbook

## Configuration
Define the immutable config dataclass for all pipeline parameters.

In [2]:
@dataclass(frozen=True)
class config:
    files_dir: Path
    output_dir: Path
    max_tokens: int
    token_overlap: int
    tokenizer_name: str
    embedding_model: str
    llm_model: str

## Path Helpers
Output file path generators with automatic directory creation.

In [3]:
def corpus_path(output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir / "corpus.jsonl"


def chunks_path(output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir / "chunks.jsonl"


def embeddings_path(output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir / "embeddings.npy"


def metadata_path(output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir / "metadata.jsonl"


def faiss_index_path(output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir / "faiss_index.index"

## Stage 1: PDF Extraction
Extract text from all PDF pages with graceful error handling.

In [None]:
def extract_text_from_pdfs(files_dir: Path) -> str:
    """Extract text from all pages in a PDF file."""
    reader = PdfReader(str(files_dir))
    pages = [page.extract_text() or "" for page in reader.page]
    return "\n".join(pages)


def extract_text_from_docx(files_dir: Path) -> str:
    """Extract text from a DOCX file."""
    doc = Document(str(files_dir))
    paragraphs = [para.text for para in doc.paragraphs]
    # Also extract from tables if present
    for table in doc.tables:
        for row in table.rows:
            for cell in row.cells:
                paragraphs.append(cell.text)
    return "\n".join(paragraphs)


def extract_text_from_excel(files_dir: Path) -> str:
    """Extract text from an Excel file."""
    workbook = load_workbook(str(files_dir))
    text_content = []
    
    for sheet_name in workbook.sheetnames:
        sheet = workbook[sheet_name]
        text_content.append(f"\n=== Sheet: {sheet_name} ===\n")
        
        for row in sheet.iter_rows(values_only=True):
            row_text = [str(cell) if cell is not None else "" for cell in row]
            text_content.append("\t".join(row_text))
    
    return "\n".join(text_content)


def extract_text_by_filetype(file_path: Path) -> str:
    """Automatically detect file type and extract text."""
    suffix = file_path.suffix.lower()
    
    if suffix == ".pdf":
        return extract_text_from_pdfs(file_path)
    elif suffix == ".docx":
        return extract_text_from_docx(file_path)
    elif suffix in [".xlsx", ".xls"]:
        return extract_text_from_excel(file_path)
    else:
        raise ValueError(f"Unsupported file type: {suffix}. Supported: .pdf, .docx, .xlsx, .xls")

## Stage 2: Corpus Creation
Save extracted text to corpus.jsonl for persistence.

In [5]:
def create_corpus(cfg: config, pdf_file: Path = None, input_files: list = None) -> str:
    """Create corpus from one or multiple files (PDF, DOCX, XLSX)."""
    corpus_text = ""
    corpus_metadata = []
    
    # Handle single file or multiple files
    files_to_process = []
    if input_files:
        files_to_process = input_files if isinstance(input_files, list) else [input_files]
    elif pdf_file:
        files_to_process = [pdf_file]
    else:
        raise ValueError("Either pdf_file or input_files must be provided")
    
    # Extract text from all files
    for file_path in files_to_process:
        file_path = Path(file_path)
        print(f"Processing {file_path.name}...")
        
        text = extract_text_by_filetype(file_path)
        corpus_text += f"\n\n[SOURCE: {file_path.name}]\n{text}"
        
        corpus_metadata.append({
            "source": file_path.name,
            "file_type": file_path.suffix.lower(),
            "length": len(text)
        })
    
    # Save corpus with metadata
    corpus_file = corpus_path(cfg.output_dir)
    with open(corpus_file, 'w') as f:
        json.dump({
            "files": corpus_metadata,
            "text": corpus_text,
            "total_length": len(corpus_text)
        }, f, indent=2)
    
    print(f"Corpus saved to {corpus_file} ({len(corpus_text)} characters from {len(files_to_process)} file(s))")
    return corpus_text

## Stage 3: Tokenization & Chunking
Split corpus into overlapping chunks with token-level precision.

In [6]:
def create_chunks(cfg: config, corpus_text: str, source_file: str, page: int = 0) -> list:
    """Create overlapping chunks respecting max_tokens and token_overlap."""
    tokenizer = AutoTokenizer.from_pretrained(cfg.tokenizer_name)
    
    # Tokenize full corpus
    tokens = tokenizer.encode(corpus_text)
    print(f"Total tokens: {len(tokens)}")
    
    chunks = []
    chunk_id = 0
    start_token = 0
    
    while start_token < len(tokens):
        # Define chunk boundaries
        end_token = min(start_token + cfg.max_tokens, len(tokens))
        chunk_tokens = tokens[start_token:end_token]
        chunk_text = tokenizer.decode(chunk_tokens)
        
        chunks.append({
            "chunk_id": chunk_id,
            "text": chunk_text,
            "source_file": source_file,
            "page": page,
            "token_count": len(chunk_tokens),
            "start_token": start_token,
            "end_token": end_token
        })
        
        # Move to next chunk with overlap
        start_token = end_token - cfg.token_overlap if cfg.token_overlap > 0 else end_token
        chunk_id += 1
    
    # Save chunks
    chunks_file = chunks_path(cfg.output_dir)
    with open(chunks_file, 'w') as f:
        for chunk in chunks:
            f.write(json.dumps(chunk) + '\n')
    
    print(f"Created {len(chunks)} chunks, saved to {chunks_file}")
    return chunks

## Stage 4: Embedding Generation
Convert chunks to semantic vectors using SentenceTransformer.

In [7]:
def create_embeddings(cfg: config, chunks: list) -> np.ndarray:
    """Generate embeddings for all chunks."""
    model = SentenceTransformer(cfg.embedding_model)
    
    chunk_texts = [chunk["text"] for chunk in chunks]
    embeddings = model.encode(chunk_texts, convert_to_numpy=True)
    
    print(f"Embeddings shape: {embeddings.shape}")
    
    # Save embeddings
    embeddings_file = embeddings_path(cfg.output_dir)
    np.save(embeddings_file, embeddings)
    print(f"Embeddings saved to {embeddings_file}")
    
    # Save metadata
    metadata_file = metadata_path(cfg.output_dir)
    with open(metadata_file, 'w') as f:
        for i, chunk in enumerate(chunks):
            f.write(json.dumps({
                "chunk_id": chunk["chunk_id"],
                "embedding_index": i,
                "embedding_dim": embeddings.shape[1],
                "model": cfg.embedding_model
            }) + '\n')
    print(f"Metadata saved to {metadata_file}")
    
    return embeddings

## Stage 5: FAISS Indexing
Build vector similarity index for fast retrieval.

In [8]:
def create_faiss_index(cfg: config, embeddings: np.ndarray) -> faiss.Index:
    """Create FAISS index for vector similarity search."""
    embedding_dim = embeddings.shape[1]
    
    # Create index (L2 distance)
    index = faiss.IndexFlatL2(embedding_dim)
    index.add(embeddings.astype(np.float32))
    
    print(f"FAISS index created with {index.ntotal} vectors")
    
    # Save index
    index_file = faiss_index_path(cfg.output_dir)
    faiss.write_index(index, str(index_file))
    print(f"FAISS index saved to {index_file}")
    
    return index

## Stage 6: Similarity Search & Retrieval
Query the index to retrieve relevant chunks.

In [9]:
def search_similar_chunks(cfg: config, query: str, k: int = 5) -> list:
    """Search for top-k similar chunks."""
    # Load model and index
    model = SentenceTransformer(cfg.embedding_model)
    index = faiss.read_index(str(faiss_index_path(cfg.output_dir)))
    
    # Encode query
    query_embedding = model.encode([query], convert_to_numpy=True)[0]
    query_embedding = query_embedding.reshape(1, -1).astype(np.float32)
    
    # Search
    distances, indices = index.search(query_embedding, k)
    
    # Load chunks and return top-k
    results = []
    chunks_file = chunks_path(cfg.output_dir)
    chunks_list = []
    with open(chunks_file, 'r') as f:
        chunks_list = [json.loads(line) for line in f]
    
    for i, idx in enumerate(indices[0]):
        results.append({
            "rank": i + 1,
            "distance": float(distances[0][i]),
            "chunk": chunks_list[idx]
        })
    
    return results

## Example Usage
Run the complete RAG pipeline.

In [None]:
# Example 1: Single file (PDF, DOCX, or XLSX)
# cfg = config(
#     files_dir=Path("./Attention_is_all_you_need.pdf"),
#     output_dir=Path("./output"),
#     max_tokens=512,
#     token_overlap=50,
#     tokenizer_name="gpt2",
#     embedding_model="all-MiniLM-L6-v2",
#     llm_model="gpt2"
# )

# corpus_text = create_corpus(cfg, pdf_file=cfg.files_dir)

# ============================================================

# Example 2: Multiple files of different types
# cfg = config(
#     files_dir=Path("./documents"),
#     output_dir=Path("./output_multi"),
#     max_tokens=512,
#     token_overlap=50,
#     tokenizer_name="gpt2",
#     embedding_model="all-MiniLM-L6-v2",
#     llm_model="gpt2"
# )

# input_files = [
#     Path("./Attention_is_all_you_need.pdf"),
#     Path("./EU AI Act Doc.docx"),
#     Path("./Inflation Calculator.xlsx")
# ]

# # Pipeline execution with multiple files
# corpus_text = create_corpus(cfg, input_files=input_files)
# chunks = create_chunks(cfg, corpus_text, source_file="multi-document", page=0)
# embeddings = create_embeddings(cfg, chunks)
# index = create_faiss_index(cfg, embeddings)

# # Search example
# results = search_similar_chunks(cfg, "What is inflation?", k=3)
# for result in results:
#     print(f"\nRank {result['rank']} (distance: {result['distance']:.4f})")
#     print(result['chunk']['text'][:200] + "...")

## Practical Example - Ready to Run
Complete working example using the actual files in this directory.

In [10]:
from pathlib import Path

# Step 1: Configure the pipeline
cfg = config(
    files_dir=Path("./Attention_is_all_you_need (1) (3) (1).pdf"),
    output_dir=Path("./rag_output"),
    max_tokens=512,
    token_overlap=50,
    tokenizer_name="gpt2",
    embedding_model="all-MiniLM-L6-v2",
    llm_model="gpt2"
)

print("✓ Configuration ready")
print(f"  Output directory: {cfg.output_dir}")
print(f"  Max tokens per chunk: {cfg.max_tokens}")
print(f"  Token overlap: {cfg.token_overlap}")
print(f"  Embedding model: {cfg.embedding_model}")


✓ Configuration ready
  Output directory: rag_output
  Max tokens per chunk: 512
  Token overlap: 50
  Embedding model: all-MiniLM-L6-v2


### Stage 1: Extract & Create Corpus

In [11]:
corpus_text = create_corpus(cfg, pdf_file=cfg.files_dir)

Processing Attention_is_all_you_need (1) (3) (1).pdf...
Corpus saved to rag_output/corpus.jsonl (39683 characters from 1 file(s))


### Stage 2: Create Chunks

In [None]:
chunks = create_chunks(cfg, corpus_text, source_file=cfg.files_dir.name)

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Token indices sequence length is longer than the specified maximum sequence length for this model (10599 > 1024). Running this sequence through the model will result in indexing errors


Total tokens: 10599


### Stage 3: Generate Embeddings

In [None]:
embeddings = create_embeddings(cfg, chunks)

### Stage 4: Build FAISS Index

In [None]:
index = create_faiss_index(cfg, embeddings)

### Stage 5: Query the Index

In [None]:
query = "What is the attention mechanism?"
results = search_similar_chunks(cfg, query, k=3)

print(f"\n{'='*70}")
print(f"QUERY: {query}")
print(f"{'='*70}\n")

for result in results:
    rank = result['rank']
    distance = result['distance']
    chunk_text = result['chunk']['text'][:300]
    source = result['chunk']['source_file']
    
    print(f"[Rank {rank}] Distance: {distance:.4f} | Source: {source}")
    print(f"{chunk_text}...")
    print("-" * 70 + "\n")

## Multi-Document RAG Example
Process both Attention & Deepseek PDFs together for unified semantic search.

In [None]:
# Configure for multi-document RAG
cfg_multi = config(
    files_dir=Path("./"),
    output_dir=Path("./rag_output_multi"),
    max_tokens=512,
    token_overlap=50,
    tokenizer_name="gpt2",
    embedding_model="all-MiniLM-L6-v2",
    llm_model="gpt2"
)

# Process both PDFs
input_files = [
    Path("./Attention_is_all_you_need (1) (3) (1).pdf"),
    Path("./Deepseek-r1 (1).pdf")
]

print("✓ Multi-document configuration ready")
print(f"  Processing {len(input_files)} PDFs:")
for f in input_files:
    print(f"    - {f.name}")

In [None]:
# Extract and create unified corpus
corpus_text_multi = create_corpus(cfg_multi, input_files=input_files)

In [None]:
# Create chunks, embeddings, and index
chunks_multi = create_chunks(cfg_multi, corpus_text_multi, source_file="multi-pdf", page=0)
embeddings_multi = create_embeddings(cfg_multi, chunks_multi)
index_multi = create_faiss_index(cfg_multi, embeddings_multi)

In [None]:
# Search across both documents
query = "What is reasoning and model architecture?"
results_multi = search_similar_chunks(cfg_multi, query, k=5)

print(f"\n{'='*70}")
print(f"MULTI-DOCUMENT QUERY: {query}")
print(f"Searching across {len(input_files)} PDFs")
print(f"{'='*70}\n")

for result in results_multi:
    rank = result['rank']
    distance = result['distance']
    chunk_text = result['chunk']['text'][:250]
    source = result['chunk']['source_file']
    
    print(f"[Rank {rank}] Distance: {distance:.4f} | From: {source}")
    print(f"{chunk_text}...")
    print("-" * 70 + "\n")