##### This tutorial guides you through executing the Jupyter notebook step by step, forming a complete Retrieval-Augmented Generation (RAG) pipeline. The pipeline processes a PDF document ("Foundations of LLMs.pdf"), extracts and chunks text, embeds and indexes chunks for retrieval, generates answers using DSPy, evaluates performance with RAGAS metrics, optimizes prompts, and traces everything with Langfuse for observability.

In [2]:
import torch
import numpy as np
import os
import asyncio  # For potential async extensions
from sentence_transformers import SentenceTransformer
from typing import List, Tuple, Dict
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import DoclingDocument
import dspy
from dspy import Retrieve, ChainOfThought, Predict
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate import answer_exact_match
# from ragatouille import RAGPretrainedModel
from rouge_score import rouge_scorer
import ragas
from ragas.testset import TestsetGenerator
from datasets import Dataset
from dspy.evaluate import Evaluate
import groq
import warnings

# To ignore all warnings
warnings.filterwarnings("ignore")


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
# Step 0: Langfuse Setup (Enhanced for Production)
from langfuse import Langfuse  # Use Langfuse class for client
from langfuse import observe  # For context updates
from langfuse import get_client  

langfuse = get_client()
# Env vars (as before)
langfuse_client = Langfuse()  # Auto-loads from env
if langfuse_client.auth_check():
    print("Langfuse connected!")
else:
    print("Langfuse auth failed—check env vars.")

Langfuse connected!


In [5]:
# Instrument DSPy
from openinference.instrumentation.dspy import DSPyInstrumentor
DSPyInstrumentor().instrument()

In [6]:
# Step 1: Docling + Late Chunking (enhanced @observe with user/session)
@observe(name="Extract Full Text")
def extract_full_text_from_docling(doc: DoclingDocument) -> str:
    """
    Extract concatenated full text from a DoclingDocument, preserving structure via simple markdown-like formatting.
    """
    with langfuse_client.start_as_current_span(name="Extract Full Text") as span:
        try:
            span.update_trace(  # Set user/session
                user_id="user567",
                session_id="rag_session"
            )
            text_parts = []
            # Extract from texts (all text items: paragraphs, headings, etc.)
            for item in doc.texts:
                if hasattr(item, 'text') and item.text:
                    text_parts.append(item.text)
            # Optional: Include table text (join cells for simple extraction)
            for table in doc.tables:
                if hasattr(table, 'cells'):
                    table_text = '\n'.join([cell.text for cell in table.cells if hasattr(cell, 'text') and cell.text])
                    text_parts.append(table_text)
            full_text = '\n\n'.join(text_parts)
            span.update(output={"length": len(full_text)})  # Partial output
            return full_text
        except Exception as e:
            span.update(level="ERROR", status_message=str(e))
            raise

In [7]:
# Parse + Chunk
source = "C:/Users/jayit/Downloads/finance_docs/Principles_for_the_Management_of_Credit_Risk.pdf"
converter = DocumentConverter()
result = converter.convert(source)
doc = result.document
full_text = extract_full_text_from_docling(doc) 

2025-10-05 17:12:56,084 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2025-10-05 17:12:56,264 - INFO - Going to convert document batch...
2025-10-05 17:12:56,266 - INFO - Initializing pipeline for StandardPdfPipeline with options hash e647edf348883bed75367b22fbe60347
2025-10-05 17:12:56,293 - INFO - Loading plugin 'docling_defaults'
2025-10-05 17:12:56,297 - INFO - Registered picture descriptions: ['vlm', 'api']
2025-10-05 17:12:56,321 - INFO - Loading plugin 'docling_defaults'
2025-10-05 17:12:56,331 - INFO - Registered ocr engines: ['easyocr', 'ocrmac', 'rapidocr', 'tesserocr', 'tesseract']
2025-10-05 17:12:56,641 - INFO - Accelerator device: 'cpu'
2025-10-05 17:12:58,586 - INFO - Accelerator device: 'cpu'
2025-10-05 17:13:00,039 - INFO - Accelerator device: 'cpu'
2025-10-05 17:13:00,994 - INFO - Processing document Principles_for_the_Management_of_Credit_Risk.pdf
2025-10-05 17:13:45,722 - INFO - Finished converting document Principles_for_the_Management_of_Credit_Risk.pdf in

In [8]:
def late_chunk_text(text: str, model: SentenceTransformer, chunk_token_size: int = 512, overlap_tokens: int = 50, max_sequence_length: int = 8192) -> List[str]:
    """
    Apply late chunking: Embed full text, then pool embeddings for overlapping chunks.
    Args:
        text: Full document text.
        model: Pre-loaded embedding model.
        chunk_token_size: Approximate tokens per chunk.
        overlap_tokens: Tokens of overlap between chunks for smoothness.
    Returns:
        List of (chunk_text, chunk_embedding) tuples.
    """
    chunks = []
    # Convert text to sentences first to preserve some natural boundaries
    sentences = text.split('.')
    current_chunk = []
    current_length = 0
    
    for sentence in sentences:
        # Clean the sentence
        sentence = sentence.strip()
        if not sentence:
            continue
            
        # Tokenize single sentence
        tokens = model.tokenizer.encode(
            sentence,
            add_special_tokens=False,
            truncation=True,
            max_length=max_sequence_length
        )
        
        sentence_length = len(tokens)
        
        # If adding this sentence would exceed chunk size, store current chunk and start new one
        if current_length + sentence_length > chunk_token_size and current_chunk:
            # Decode current chunk to text
            chunk_text = model.tokenizer.decode(current_chunk, skip_special_tokens=True)
            if chunk_text.strip():
                chunks.append(chunk_text.strip())
            # Start new chunk with overlap
            if overlap_tokens > 0 and current_chunk:
                current_chunk = current_chunk[-overlap_tokens:]
                current_length = len(current_chunk)
            else:
                current_chunk = []
                current_length = 0
                
        # Add tokens to current chunk
        current_chunk.extend(tokens)
        current_length += sentence_length
        
        # If current chunk exceeds max length, force split
        while current_length > chunk_token_size:
            chunk_tokens = current_chunk[:chunk_token_size]
            chunk_text = model.tokenizer.decode(chunk_tokens, skip_special_tokens=True)
            if chunk_text.strip():
                chunks.append(chunk_text.strip())
            # Keep overlap for next chunk
            current_chunk = current_chunk[chunk_token_size-overlap_tokens:]
            current_length = len(current_chunk)
    
    # Don't forget the last chunk
    if current_chunk:
        chunk_text = model.tokenizer.decode(current_chunk, skip_special_tokens=True)
        if chunk_text.strip() and chunk_text not in chunks:
            chunks.append(chunk_text.strip())
    
    return chunks

In [9]:
model = SentenceTransformer('jinaai/jina-embeddings-v2-base-en')
chunk_texts = late_chunk_text(
            full_text, 
            model, 
            chunk_token_size=256,  # Smaller chunks
            overlap_tokens=25,     # Proportionally smaller overlap
            max_sequence_length=8192
        )
print(f"Total chunks created: {len(chunk_texts)}")

2025-10-05 17:13:45,771 - INFO - Use pytorch device_name: cpu
2025-10-05 17:13:45,772 - INFO - Load pretrained SentenceTransformer: jinaai/jina-embeddings-v2-base-en
Some weights of BertModel were not initialized from the model checkpoint at jinaai/jina-embeddings-v2-base-en and are newly initialized: ['embeddings.position_embeddings.weight', 'encoder.layer.0.intermediate.dense.bias', 'encoder.layer.0.intermediate.dense.weight', 'encoder.layer.0.output.LayerNorm.bias', 'encoder.layer.0.output.LayerNorm.weight', 'encoder.layer.0.output.dense.bias', 'encoder.layer.0.output.dense.weight', 'encoder.layer.1.intermediate.dense.bias', 'encoder.layer.1.intermediate.dense.weight', 'encoder.layer.1.output.LayerNorm.bias', 'encoder.layer.1.output.LayerNorm.weight', 'encoder.layer.1.output.dense.bias', 'encoder.layer.1.output.dense.weight', 'encoder.layer.10.intermediate.dense.bias', 'encoder.layer.10.intermediate.dense.weight', 'encoder.layer.10.output.LayerNorm.bias', 'encoder.layer.10.output.La

Total chunks created: 66


In [10]:
from sentence_transformers import util

class SimpleRetriever(dspy.Retrieve):
    def __init__(self, chunk_texts: List[str], model: SentenceTransformer, k: int = 3):
        self.chunks = chunk_texts
        self.model = model
        self.k = k
        # Pre-compute embeddings
        self.chunk_embeddings = self.model.encode(chunk_texts, convert_to_tensor=True)
    
    def forward(self, query: str) -> dspy.Example:
        query_emb = self.model.encode([query], convert_to_tensor=True)
        scores = util.pytorch_cos_sim(query_emb, self.chunk_embeddings)[0]
        top_k = torch.topk(scores, k=self.k)
        passages_with_idx = [
            {'text': self.chunks[idx], 'index': idx.item()}
            for idx in top_k.indices
        ]
        return dspy.Example(passages=passages_with_idx).with_inputs('question')


In [11]:
retriever = SimpleRetriever(chunk_texts, model, k=3)
# Test retriever with a sample question
question = "What are the key principles of credit risk management?"
retrieved = retriever(question)

# Print the top 3 retrieved chunks with their scores
print("\nTop 3 Retrieved Chunks:")
print("-" * 80)
for i, passage in enumerate(retrieved.passages, 1):
    print(f"\nChunk #{i}:")
    print(f"Text: {passage['text'][:200]}...")  # Show first 200 chars
    print(f"Index: {passage['index']}")

Batches: 100%|██████████| 3/3 [00:15<00:00,  5.15s/it]
Batches: 100%|██████████| 1/1 [00:00<00:00, 24.05it/s]


Top 3 Retrieved Chunks:
--------------------------------------------------------------------------------

Chunk #1:
Text: to reduce dependency on a 11 see footnote 5 21 credit risk management particular sector of the economy or group of related borrowers banks must be careful not to enter into transactions with borrowers...
Index: 42

Chunk #2:
Text: they are able to make sound credit decisions consistent with their credit strategy and meet competitive time, pricing and structuring pressures 44 each credit proposal should be subject to careful ana...
Index: 29

Chunk #3:
Text: capable of conducting the activity to the highest standards and in compliance with the bank's policies and procedures 10 credit risk management iii operating under a sound credit granting process prin...
Index: 19





In [12]:
# Test retriever with a sample question
question = "What are the key factors to consider in credit risk assessment and monitoring?"
retrieved = retriever(question)

# Print the top 3 retrieved chunks with their scores
print("\nTop 3 Retrieved Chunks:")
print("-" * 80)
for i, passage in enumerate(retrieved.passages, 1):
    print(f"\nChunk #{i}:")
    print(f"Text: {passage['text']}...")  # Show first 200 chars
    print(f"Index: {passage['index']}")

Batches: 100%|██████████| 1/1 [00:00<00:00, 22.63it/s]


Top 3 Retrieved Chunks:
--------------------------------------------------------------------------------

Chunk #1:
Text: should be adequate checks and balances in place to promote sound credit decisions 8 credit risk management complexity of the bank's activities the policies should be designed and implemented within the context of internal and external factors such as the bank's market position, trade area, staff capabilities and technology policies and procedures that are properly developed and implemented enable the bank to : ( i ) maintain sound credit - granting standards ; ( ii ) monitor and control credit risk ; ( iii ) properly evaluate new business opportunities ; and ( iv ) identify and administer problem credits 19 as discussed further in paragraphs 30 and 37 through 41 below, banks should develop and implement policies and procedures to ensure that the credit portfolio is adequately diversified given the bank's target markets and overall credit strategy in particular, suc




In [13]:
class RAG(dspy.Module):
    def __init__(self, retriever: SimpleRetriever, num_passages=3):
        super().__init__()
        self.retrieve = Retrieve(k=num_passages)
        # Fix the signature format
        self.generate_answer = ChainOfThought("context, question -> answer")
        self.retriever = retriever
    
    def forward(self, question: str):
        retrieved = self.retriever(question)
        passages = retrieved.passages
        passage_texts = [p['text'] for p in passages]
        prediction = self.generate_answer(             
            passages=str(passage_texts),
            question=question,
        )
        prediction.retrieved_indices = [p['index'] for p in passages]
        return dspy.Prediction(answer=prediction.answer, retrieved_indices=prediction.retrieved_indices)

In [14]:
groq_api_key = os.getenv("LLM_API_KEY")
lm = dspy.LM('qwen/qwen3-32b', api_key=groq_api_key, api_base='https://api.groq.com/openai/v1')
dspy.configure(lm=lm)

In [15]:
rag = RAG(retriever)

In [16]:
# Test RAG pipeline with sample questions
test_questions = [
    "What are the key components of credit risk management?",
    "How should banks monitor and control credit risk?",
    "What are the best practices for credit portfolio management?"
]

print("\n=== Testing RAG Pipeline ===")
print("-" * 80)

for question in test_questions:
    print(f"\nQuestion: {question}")
    prediction = rag(question=question)
    print("\nAnswer:", prediction.answer)
    print("\nRetrieved Passage Indices:", prediction.retrieved_indices)
    print("-" * 80)


=== Testing RAG Pipeline ===
--------------------------------------------------------------------------------

Question: What are the key components of credit risk management?


Batches: 100%|██████████| 1/1 [00:00<00:00, 29.35it/s]



Answer: The key components of credit risk management are:  
1. **Credit Assessment**: Evaluating borrowers' creditworthiness using financial analysis, credit scores, and scoring models.  
2. **Credit Limits**: Establishing appropriate credit thresholds based on risk appetite and borrower capacity.  
3. **Monitoring & Review**: Continuously tracking borrower performance and financial health.  
4. **Risk Mitigation**: Using collateral, guarantees, or insurance to reduce potential losses.  
5. **Diversification**: Spreading credit exposure across industries, geographies, and borrower types.  
6. **Recovery Processes**: Implementing strategies for debt restructuring or legal recovery in case of default.  
7. **Governance & Compliance**: Ensuring adherence to internal policies and regulatory requirements.  
8. **Stress Testing**: Simulating adverse scenarios to assess portfolio resilience.

Retrieved Passage Indices: [53, 15, 57]
------------------------------------------------------------

Batches: 100%|██████████| 1/1 [00:00<00:00, 17.98it/s]




Retrieved Passage Indices: [29, 10, 24]
--------------------------------------------------------------------------------

Question: What are the best practices for credit portfolio management?


Batches: 100%|██████████| 1/1 [00:00<00:00, 29.33it/s]



Answer: The best practices for credit portfolio management include:  
1. **Diversification**: Spread risk across industries, geographies, and borrower types to avoid overexposure.  
2. **Credit Risk Assessment**: Use robust underwriting standards, credit scoring models, and financial ratio analysis to evaluate borrower creditworthiness.  
4. **Stress Testing**: Simulate adverse scenarios (e.g., recession, market crashes) to assess portfolio resilience.  
5. **Liquidity Management**: Maintain a balance between short-term and long-term assets to meet obligations during downturns.  
6. **Regulatory Compliance**: Adhere to capital adequacy, reporting, and lending regulations (e.g., Basel III, local laws).  
7. **Risk Mitigation Tools**: Utilize credit derivatives (e.g., CDS), collateral requirements, and loan covenants to reduce exposure.  
8. **Technology Integration**: Leverage AI/ML for predictive analytics, real-time monitoring, and fraud detection.  
9. **Governance Frameworks**: Est

In [17]:
# Step 4: Metrics (enhanced to store scores in Langfuse for analytics)
def mrr_metric(example: dspy.Example, pred: dspy.Prediction, trace=None) -> float:
    if not hasattr(example, 'gold_relevant_indices') or not example.gold_relevant_indices:
        return 0.0
    gold_set = set(example.gold_relevant_indices)
    relevant_ranks = [rank + 1 for rank, idx in enumerate(pred.retrieved_indices) if idx in gold_set]
    score = 1.0 / relevant_ranks[0] if relevant_ranks else 0.0
    # Store as score (requires trace_id from context; for demo, assume accessible via langfuse_context)
    if 'trace_id' in locals():  # Placeholder; in prod, pass trace_id
        langfuse_client.score(name="MRR", value=score, trace_id=trace_id)
    return score

def rouge_l_metric(example: dspy.Example, pred: dspy.Prediction, trace=None) -> float:
    if not hasattr(example, 'answer') or not hasattr(pred, 'answer'):
        return 0.0
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    scores = scorer.score(example.answer, pred.answer)
    return scores['rougeL'].fmeasure

def faithfulness_metric(example: dspy.Example, pred: dspy.Prediction, trace=None) -> float:
    if not hasattr(pred, 'answer') or not hasattr(example, 'passages'):
        return 0.0
    passages_str = str([p['text'] for p in example.passages]) if hasattr(example, 'passages') else ""
    prompt = f"Passages: {passages_str}\nAnswer: {pred.answer}\nIs the answer fully grounded in the passages without hallucination? Answer 'yes' or 'no'."
    with dspy.context(lm=dspy.settings.lm):
        judgment = dspy.Predict("prompt -> judgment").forward(prompt=prompt).judgment.strip().lower()
    return 1.0 if 'yes' in judgment else 0.0

In [18]:
# Step 5: Real Dev Set with Ragas (wrap generation in @observe)
@observe()  # Traces QA generation as a span
def generate_ragas_devset(chunk_texts: List[str], embedding_model: SentenceTransformer, num_examples: int = 10) -> List[dspy.Example]:
    contexts = [{'context': chunk} for chunk in chunk_texts[:20]]
    dataset = Dataset.from_list(contexts)
    
    generator = TestsetGenerator(llm=lm, embedding_model=model)
    testset = generator.generate_with_langchain_docs(dataset, testset_size=num_examples)
    
    examples = []
    for row in testset.to_pandas().iterrows():
        question = row[1]['question']
        answer = row[1]['answer']
        
        q_emb = embedding_model.encode([question])
        chunk_embs = embedding_model.encode(chunk_texts)
        similarities = np.dot(chunk_embs, q_emb.T).flatten()
        top_indices = np.argsort(similarities)[-3:][::-1]
        
        ex = dspy.Example(question=question, answer=answer, 
                          gold_relevant_indices=list(top_indices)).with_inputs('question')
        examples.append(ex)
    
    return examples

In [19]:
# Step 6: Combined Metric (unchanged)
def combined_metric(example: dspy.Example, pred: dspy.Prediction, trace=None) -> float:
    return mrr_metric(example, pred) + rouge_l_metric(example, pred) + answer_exact_match(example, pred)

In [None]:
# devset = generate_ragas_devset(chunk_texts, model, num_examples=10)
        
# # Evaluations (store scores as above)
# evaluators = {
#             'MRR': Evaluate(devset=devset, metric=mrr_metric, num_threads=10),
#             'ROUGE-L': Evaluate(devset=devset, metric=rouge_l_metric, num_threads=10),
#             'Exact Match': Evaluate(devset=devset, metric=answer_exact_match, num_threads=10),
#             'Faithfulness': Evaluate(devset=devset, metric=faithfulness_metric, num_threads=10)
#         }
        
# baseline_scores = {name: eval_(rag) for name, eval_ in evaluators.items()}
# print("\n=== Baseline Scores ===")
# for name, score in baseline_scores.items():
#     print(f"{name}: {score:.3f}")

: 

In [None]:
import concurrent.futures
from typing import Dict, Any
import threading
from concurrent.futures import ThreadPoolExecutor

def evaluate_metric_thread(args: Dict[str, Any]) -> tuple:
    """
    Helper function to evaluate a single metric in a thread
    """
    name, evaluator, rag_model = args['name'], args['evaluator'], args['rag']
    try:
        score = evaluator(rag_model)
        return (name, score)
    except Exception as e:
        print(f"Error evaluating {name}: {str(e)}")
        return (name, 0.0)

def parallel_evaluate(evaluators: Dict, rag_model, max_workers: int = 4) -> Dict[str, float]:
    """
    Evaluate metrics in parallel using ThreadPoolExecutor
    """
    tasks = [
        {
            'name': name,
            'evaluator': evaluator,
            'rag': rag_model
        }
        for name, evaluator in evaluators.items()
    ]
    
    scores = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(evaluate_metric_thread, task) for task in tasks]
        for future in concurrent.futures.as_completed(futures):
            name, score = future.result()
            scores[name] = score
    
    return scores

# Generate dev set
print("Generating development set...")
devset = generate_ragas_devset(chunk_texts, model, num_examples=10)

# Setup evaluators
print("\nSetting up evaluators...")
evaluators = {
    # 'MRR': Evaluate(devset=devset, metric=mrr_metric, num_threads=10),
    # 'ROUGE-L': Evaluate(devset=devset, metric=rouge_l_metric, num_threads=10),
    'Exact Match': Evaluate(devset=devset, metric=answer_exact_match, num_threads=10),
    # 'Faithfulness': Evaluate(devset=devset, metric=faithfulness_metric, num_threads=10)
}

# Run parallel evaluation for baseline
print("\nEvaluating baseline scores in parallel...")
baseline_scores = parallel_evaluate(evaluators, rag, max_workers=4)

print("\n=== Baseline Scores ===")
for name, score in baseline_scores.items():
    print(f"{name}: {score:.3f}")

# # Run optimization
# print("\nOptimizing RAG model...")
# optimizer = BootstrapFewShot(metric=combined_metric)
# optimized_rag = optimizer.compile(rag, trainset=devset)

# # Run parallel evaluation for optimized model
# print("\nEvaluating optimized scores in parallel...")
# optimized_scores = parallel_evaluate(evaluators, optimized_rag, max_workers=4)

# print("\n=== Optimized Scores ===")
# for name, score in optimized_scores.items():
#     gain = optimized_scores[name] - baseline_scores[name]
#     print(f"{name}: {score:.3f} (gain: {gain:+.3f})")

Generating development set...


In [None]:
# Step 7: Analytics Export Example (post-eval)
def export_analytics(client: Langfuse, session_id: str = "rag_session"):
    """Query traces via SDK and compute custom analytics (e.g., avg MRR)."""
    # Fetch traces for session (per Metrics API patterns)
    traces = client.get_traces(session_id=session_id, limit=100)  # SDK query
    mrr_scores = []
    for trace in traces:
        # Assume scores attached; extract via trace.scores
        mrr_score = next((s.value for s in trace.scores if s.name == "MRR"), 0.0)
        mrr_scores.append(mrr_score)
    avg_mrr = np.mean(mrr_scores) if mrr_scores else 0.0
    print(f"Avg MRR across {len(traces)} traces: {avg_mrr:.3f}")
    # For advanced: Use Metrics API via HTTP (e.g., POST /api/public/projects/{project_id}/metrics/query)
    # with dimensions=["session_id"], metrics=["avg(MRR)"], filters={"session_id": session_id}

In [None]:
# # Optimization
# optimizer = BootstrapFewShot(metric=combined_metric)
# optimized_rag = optimizer.compile(rag, trainset=devset)
        
# optimized_scores = {name: eval_(optimized_rag) for name, eval_ in evaluators.items()}
# print("\n=== Optimized Scores ===")
# for name, score in optimized_scores.items():
#     gain = optimized_scores[name] - baseline_scores[name]
#     print(f"{name}: {score:.3f} (gain: {gain:+.3f})")

In [None]:
# Example Query
# question = "What is the main topic of the document?"
# prediction = optimized_rag(question=question)
# print(f"\nAnswer: {prediction.answer}")
        
# # Production Flush: Send batched data
langfuse_client.flush()
        
# # Analytics Export
export_analytics(langfuse_client, "rag_session")