In [5]:
# Semantic Search with FAISS - Practice Example
import numpy as np
%pip install faiss-cpu
%pip install torch
%pip install sentence-transformers
import faiss
import torch
from sentence_transformers import SentenceTransformer
import pandas as pd

# Step 1: Set up your data
# We'll create a small collection of documents to search through
documents = [
    "Machine learning is a subfield of artificial intelligence.",
    "Vector databases store high-dimensional vectors for similarity search.",
    "FAISS is an efficient similarity search library for dense vectors.",
    "Neural networks have transformed natural language processing.",
    "Embeddings capture semantic meaning in a vector space.",
    "Pinecone is a managed vector database service.",
    "Vector search enables semantic search capabilities.",
    "Weaviate is a vector search engine with GraphQL API.",
    "Azure AI Search provides cognitive search capabilities.",
    "Semantic search understands the intent behind user queries."
]

# Step 2: Install and import libraries
# You would typically run: pip install faiss-cpu torch sentence-transformers pandas
# (Use faiss-gpu instead if you have GPU support)

# Step 3: Convert documents to embeddings
print("Converting documents to embeddings...")
model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='./model_cache')  # A small, fast model
embeddings = model.encode(documents)
vector_dimension = embeddings.shape[1]  # Get the embedding dimension

# Step 4: Build the FAISS index
print(f"Building FAISS index with vector dimension: {vector_dimension}")
index = faiss.IndexFlatL2(vector_dimension)  # L2 distance (Euclidean)
# Convert embeddings to float32 (required by FAISS)
embeddings = embeddings.astype(np.float32)
index.add(embeddings)  # Add vectors to the index

# Step 5: Perform a search
def search(query_text, top_k=3):
    # Convert query to embedding
    query_vector = model.encode([query_text])[0].astype(np.float32)
    query_vector = np.array([query_vector])  # Reshape for FAISS
    
    # Search the index
    distances, indices = index.search(query_vector, top_k)
    
    # Return results
    results = []
    for i, idx in enumerate(indices[0]):
        if idx != -1:  # Valid index
            results.append({
                "document": documents[idx],
                "distance": distances[0][i]
            })
    
    return results

# Step 6: Test searches
print("\n--- Testing Semantic Search ---")
test_queries = [
    "How do vector databases work?",
    "What is artificial intelligence?",
    "Tell me about similarity search"
]

for query in test_queries:
    print(f"\nQuery: '{query}'")
    results = search(query)
    for i, res in enumerate(results):
        print(f"  {i+1}. {res['document']} (distance: {res['distance']:.4f})")

# Step 7: Bonus - Simple evaluation
print("\n--- Simple Evaluation ---")
# Create a dataframe to compare distances
eval_df = pd.DataFrame(columns=['query'] + documents)

for query in test_queries:
    query_vector = model.encode([query])[0].astype(np.float32)
    distances = []
    for doc in documents:
        doc_vector = model.encode([doc])[0].astype(np.float32)
        # Calculate L2 distance
        distance = np.linalg.norm(query_vector - doc_vector)
        distances.append(distance)
    
    eval_df.loc[query] = [query] + distances

print(eval_df.head())
print("\nThis evaluation shows the distance between each query and all documents.")
print("Lower distances indicate better semantic matches.")


# How to run this example:
# 1. Save this code to a file (e.g., faiss_practice.py)
# 2. Install the required packages: pip install faiss-cpu torch sentence-transformers pandas
# 3. Run the script: python faiss_practice.py
# 4. Experiment with different queries, models, or distance metrics

# Extensions to try:
# - Add more documents or load real data from a CSV/JSON file
# - Try different embedding models
# - Implement other FAISS index types like IndexIVFFlat for larger datasets
# - Add metadata to your documents and return it with search results
# - Compare results with other vector databases like Pinecone or Weaviate

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Converting documents to embeddings...


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


Building FAISS index with vector dimension: 384

--- Testing Semantic Search ---

Query: 'How do vector databases work?'
  1. Vector databases store high-dimensional vectors for similarity search. (distance: 0.7787)
  2. Vector search enables semantic search capabilities. (distance: 0.9240)
  3. Weaviate is a vector search engine with GraphQL API. (distance: 1.1274)

Query: 'What is artificial intelligence?'
  1. Machine learning is a subfield of artificial intelligence. (distance: 0.7114)
  2. Azure AI Search provides cognitive search capabilities. (distance: 1.1847)
  3. Neural networks have transformed natural language processing. (distance: 1.4932)

Query: 'Tell me about similarity search'
  1. Vector databases store high-dimensional vectors for similarity search. (distance: 0.6221)
  2. FAISS is an efficient similarity search library for dense vectors. (distance: 0.7312)
  3. Semantic search understands the intent behind user queries. (distance: 0.9937)

--- Simple Evaluation ---


In [20]:
from langgraph.graph import StateGraph, END
from typing import Dict, Any, TypedDict

# Define the state schema
class GraphState(TypedDict):
    query: str
    context: str
    response: str

# Define a simple RAG agent workflow
def generate(state: GraphState) -> Dict[str, Any]:
    query = state['query']
    context = state['context']
    # Simulate LLM call
    response = f"Based on the context '{context}', the answer to '{query}' is..."
    return {"response": response}

# Define a dummy retrieve function
def retrieve(state):
    return {"context": "Dummy context"}

# Create graph
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)

# Connect nodes
# Define a conditional check
def should_continue(state):
    # Example: Stop after one iteration
    return {"continue": False}

workflow.add_edge("retrieve", "generate")
workflow.add_node("continue", should_continue)
workflow.add_edge("generate", "continue")

# Add conditional edge
workflow.add_conditional_edges(
    "continue",
    lambda x: "generate" if x["continue"] else "end",
    {"generate": "generate", "end": END}
)

workflow.set_entry_point("retrieve")

# Compile and run
app = workflow.compile()
result = app.invoke({"query": "How do vector databases work?"})
print(result['response'])

Based on the context 'Dummy context', the answer to 'How do vector databases work?' is...


In [4]:
import whisper

# Convert audio to text
def transcribe_audio(audio_path):
    model = whisper.load_model("base")
    result = model.transcribe(audio_path)
    return result["text"]

try:
    transcription = transcribe_audio("audio_sample.mp3")
    print("Transcribed text:", transcription)
except Exception as e:
    print(f"An error occurred: {e}")
    print("Please ensure that ffmpeg is installed and accessible.")

An error occurred: [Errno 2] No such file or directory: 'ffmpeg'
Please ensure that ffmpeg is installed and accessible.




In [5]:
%pip install tensorflow
import tensorflow as tf # type: ignore
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Embedding
import numpy as np

# Create a simple LSTM model for text classification
vocab_size = 10000
embedding_dim = 128
max_length = 100
num_classes = 3

model = Sequential([
    Embedding(vocab_size, embedding_dim, input_length=max_length),
    LSTM(128, return_sequences=True),
    LSTM(64),
    Dense(32, activation='relu'),
    Dense(num_classes, activation='softmax')
])

model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# Generate dummy data
x_train = np.random.randint(0, vocab_size, size=(100, max_length))
y_train = np.random.randint(0, num_classes, size=(100,))

# Train model
model.fit(x_train, y_train, epochs=3, batch_size=16)
print("Model trained successfully")

Note: you may need to restart the kernel to use updated packages.
Epoch 1/3
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 40ms/step - accuracy: 0.3987 - loss: 1.0988
Epoch 2/3
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step - accuracy: 0.3628 - loss: 1.0836
Epoch 3/3
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step - accuracy: 0.4527 - loss: 0.9952
Model trained successfully


In [1]:
import re
import numpy as np
from collections import Counter

class SimpleBPETokenizer:
    def __init__(self, vocab_size=1000):
        self.vocab_size = vocab_size
        self.vocab = {}  # token to id
        self.inv_vocab = {}  # id to token
        self.merges = {}  # merge pairs to token
        
    def train(self, texts, initial_vocab=None):
        """Train BPE tokenizer on texts"""
        # Start with character vocabulary
        if initial_vocab is None:
            # Create character-level vocabulary
            chars = Counter()
            for text in texts:
                chars.update(text)
            
            # Initialize with characters
            self.vocab = {c: i for i, c in enumerate(chars)}
            self.inv_vocab = {i: c for i, c in enumerate(chars)}
            next_id = len(self.vocab)
        else:
            self.vocab = initial_vocab
            self.inv_vocab = {v: k for k, v in self.vocab.items()}
            next_id = max(self.inv_vocab.keys()) + 1
        
        # Prepare text for merges
        split_texts = [[c for c in text] for text in texts]
        
        # Iteratively merge most common pairs
        while len(self.vocab) < self.vocab_size:
            # Count pairs
            pairs = Counter()
            for text in split_texts:
                for i in range(len(text) - 1):
                    pair = (text[i], text[i + 1])
                    pairs[pair] += 1
            
            if not pairs:
                break
                
            # Find most frequent pair
            best_pair = max(pairs, key=pairs.get)
            new_token = best_pair[0] + best_pair[1]
            
            # Add to vocabulary
            self.vocab[new_token] = next_id
            self.inv_vocab[next_id] = new_token
            self.merges[best_pair] = new_token
            next_id += 1
            
            # Apply merge to all texts
            new_split_texts = []
            for text in split_texts:
                i = 0
                new_text = []
                while i < len(text):
                    if i < len(text) - 1 and (text[i], text[i + 1]) == best_pair:
                        new_text.append(new_token)
                        i += 2
                    else:
                        new_text.append(text[i])
                        i += 1
                new_split_texts.append(new_text)
            split_texts = new_split_texts
            
            if len(self.vocab) % 100 == 0:
                print(f"Vocabulary size: {len(self.vocab)}/{self.vocab_size}")
        
        return self.vocab
    
    def tokenize(self, text):
        """Tokenize text using learned merges"""
        # Start with characters
        tokens = [c for c in text]
        
        # Apply merges
        i = 0
        while i < len(tokens) - 1:
            pair = (tokens[i], tokens[i + 1])
            if pair in self.merges:
                tokens[i] = self.merges[pair]
                tokens.pop(i + 1)
            else:
                i += 1
        
        # Convert to token ids
        return [self.vocab.get(t, self.vocab.get('<unk>', 0)) for t in tokens]
    
    def decode(self, token_ids):
        """Convert token ids back to text"""
        return ''.join(self.inv_vocab.get(tid, '<unk>') for tid in token_ids)


# Simple language model for next token prediction
class SimpleNGramModel:
    def __init__(self, n=3):
        self.n = n
        self.context_counts = {}  # Store counts of n-grams
        self.context_next = {}    # Store next token distributions
    
    def train(self, token_sequences):
        """Train on token sequences"""
        for sequence in token_sequences:
            # Pad sequence for context
            padded = [0] * (self.n - 1) + sequence
            
            # Collect n-grams and next tokens
            for i in range(len(padded) - self.n):
                context = tuple(padded[i:i+self.n-1])
                next_token = padded[i+self.n-1]
                
                # Update counts
                if context not in self.context_counts:
                    self.context_counts[context] = 0
                    self.context_next[context] = {}
                
                self.context_counts[context] += 1
                
                if next_token not in self.context_next[context]:
                    self.context_next[context][next_token] = 0
                self.context_next[context][next_token] += 1
    
    def predict_next_token(self, context, temperature=1.0):
        """Predict next token given context"""
        # Get last n-1 tokens
        context = tuple(context[-(self.n-1):]) if len(context) >= self.n-1 else tuple([0] * (self.n-1 - len(context)) + context)
        
        # If context not seen, return random from vocabulary
        if context not in self.context_next:
            return None
        
        # Get distribution
        next_tokens = self.context_next[context]
        tokens = list(next_tokens.keys())
        counts = np.array(list(next_tokens.values()))
        
        # Apply temperature
        if temperature == 0:  # Greedy
            return tokens[np.argmax(counts)]
        else:
            # Convert counts to probabilities
            probs = counts / counts.sum()
            
            # Apply temperature
            logits = np.log(probs)
            logits_t = logits / temperature
            probs_t = np.exp(logits_t) / np.exp(logits_t).sum()
            
            # Sample
            return np.random.choice(tokens, p=probs_t)

# Example usage
texts = [
    "Hello world! This is an example of tokenization.",
    "Byte pair encoding is a data compression technique.",
    "Language models predict the next token in a sequence.",
    "Transformers use self-attention mechanisms.",
    "Neural networks learn from data."
]

# Train tokenizer
tokenizer = SimpleBPETokenizer(vocab_size=200)
tokenizer.train(texts)

# Tokenize texts
tokenized_texts = [tokenizer.tokenize(text) for text in texts]

# Train language model
lm = SimpleNGramModel(n=3)
lm.train(tokenized_texts)

# Generate some text
def generate_text(model, tokenizer, start_text="Hello", length=20, temperature=0.8):
    tokens = tokenizer.tokenize(start_text)
    
    for _ in range(length):
        next_token = model.predict_next_token(tokens, temperature=temperature)
        if next_token is None:
            break
        tokens.append(next_token)
    
    return tokenizer.decode(tokens)

# Generate text example
generated = generate_text(lm, tokenizer, start_text="Language", length=20)
print("Generated text:", generated)

# Mathematical explanation
print("\nMathematical explanation:")
print("1. BPE Tokenization:")
print("   - Starting with character vocabulary")
print("   - Iteratively finding most frequent adjacent pairs")
print("   - Merge operation: (a,b) -> ab")
print("   - Final vocabulary size:", len(tokenizer.vocab))

print("\n2. N-gram Language Model:")
print("   - Context: P(token_n | token_{n-2}, token_{n-1})")
print("   - Maximum likelihood estimation:")
print("     P(w_i | w_{i-2}, w_{i-1}) = count(w_{i-2}, w_{i-1}, w_i) / count(w_{i-2}, w_{i-1})")
print("   - Temperature sampling:")
print("     p_t(w_i) ∝ exp(log(p(w_i))/t)")
print("     t=1.0: Standard probabilities")
print("     t<1.0: More peaked/deterministic")
print("     t>1.0: More uniform/random")

Vocabulary size: 100/200
Vocabulary size: 200/200
Generated text: Language

Mathematical explanation:
1. BPE Tokenization:
   - Starting with character vocabulary
   - Iteratively finding most frequent adjacent pairs
   - Merge operation: (a,b) -> ab
   - Final vocabulary size: 200

2. N-gram Language Model:
   - Context: P(token_n | token_{n-2}, token_{n-1})
   - Maximum likelihood estimation:
     P(w_i | w_{i-2}, w_{i-1}) = count(w_{i-2}, w_{i-1}, w_i) / count(w_{i-2}, w_{i-1})
   - Temperature sampling:
     p_t(w_i) ∝ exp(log(p(w_i))/t)
     t=1.0: Standard probabilities
     t<1.0: More peaked/deterministic
     t>1.0: More uniform/random


In [2]:
import numpy as np
import faiss
import time
from sklearn.datasets import make_blobs

# Create synthetic data: 100,000 vectors of 128 dimensions
n_vectors = 100000
dimension = 128
n_clusters = 100
data, _ = make_blobs(n_samples=n_vectors, n_features=dimension, centers=n_clusters, random_state=42)

# Convert to float32 (required by FAISS)
data = data.astype(np.float32)

# Create query vectors
n_queries = 10
queries = np.random.random((n_queries, dimension)).astype(np.float32)

# Define indices to compare
index_types = {
    "Flat (Exact)": faiss.IndexFlatL2(dimension),
    "IVF Flat": faiss.IndexIVFFlat(faiss.IndexFlatL2(dimension), dimension, n_clusters),
    "IVF PQ": faiss.IndexIVFPQ(faiss.IndexFlatL2(dimension), dimension, n_clusters, 16, 8),
    "HNSW": faiss.IndexHNSWFlat(dimension, 32)
}

# Number of nearest neighbors to retrieve
k = 5

# Train and add vectors to indices
for name, index in index_types.items():
    print(f"\nBuilding {name} index...")
    start_time = time.time()
    
    # Train if needed
    if isinstance(index, faiss.IndexIVFFlat) or isinstance(index, faiss.IndexIVFPQ):
        print("  Training index...")
        index.train(data)
    
    # Add vectors
    print("  Adding vectors...")
    index.add(data)
    
    print(f"  Built index in {time.time() - start_time:.2f} seconds")
    
    # Perform search
    print("  Searching...")
    search_start = time.time()
    distances, indices = index.search(queries, k)
    search_time = time.time() - search_start
    
    # Report results
    print(f"  Average search time: {search_time / n_queries * 1000:.2f} ms per query")
    print(f"  Results for first query:")
    for i in range(k):
        print(f"    {i+1}. Vector {indices[0][i]}, Distance: {distances[0][i]:.4f}")
    
    # Mathematical details explanation
    print("\n  Mathematical details:")
    if name == "Flat (Exact)":
        print("    - Uses exact L2 distance: sqrt(sum((x_i - y_i)^2))")
        print("    - Compares each query against all 100,000 vectors")
    elif name == "IVF Flat":
        print("    - Partitions vectors into 100 Voronoi cells")
        print("    - Query identifies nearest centroids first")
        print("    - Only searches vectors in the nearest cells")
        print(f"    - nprobe parameter: {index.nprobe} (cells searched)")
    elif name == "IVF PQ":
        print("    - Combines IVF with Product Quantization")
        print("    - Splits 128D vectors into 16 subvectors of 8D each")
        print("    - Each subspace uses 8-bit codes (256 centroids)")
        print("    - Reduces storage from 512 bytes to ~16+8 bytes per vector")
    elif name == "HNSW":
        print("    - Hierarchical Navigable Small World graph")
        print("    - Creates multi-layered graph with 32 max connections")
        print("    - Greedy graph traversal from layer to layer")
        print("    - efConstruction:", index.hnsw.efConstruction)
        print("    - efSearch:", index.hnsw.efSearch)


Building Flat (Exact) index...
  Adding vectors...
  Built index in 0.01 seconds
  Searching...
  Average search time: 0.58 ms per query
  Results for first query:
    1. Vector 76340, Distance: 3342.2617
    2. Vector 94418, Distance: 3350.3750
    3. Vector 97173, Distance: 3430.5532
    4. Vector 78001, Distance: 3444.8921
    5. Vector 14813, Distance: 3444.9600

  Mathematical details:
    - Uses exact L2 distance: sqrt(sum((x_i - y_i)^2))
    - Compares each query against all 100,000 vectors

Building IVF Flat index...
  Training index...
  Adding vectors...
  Built index in 0.06 seconds
  Searching...
  Average search time: 0.02 ms per query
  Results for first query:
    1. Vector 26909, Distance: 3657.9131
    2. Vector 93396, Distance: 3788.4683
    3. Vector 51201, Distance: 3798.6841
    4. Vector 7662, Distance: 3801.4509
    5. Vector 36893, Distance: 3802.8486

  Mathematical details:
    - Partitions vectors into 100 Voronoi cells
    - Query identifies nearest centroi

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

class SelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads
        
        # Linear projections for Q, K, V
        self.query = nn.Linear(embed_size, embed_size)
        self.key = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)
        
    def forward(self, query, key, value, mask=None):
        # Get batch size
        N = query.shape[0]
        
        # Get sequence length
        query_len, key_len, value_len = query.shape[1], key.shape[1], value.shape[1]
        
        # 1. Linear projections
        Q = self.query(query)  # (N, query_len, embed_size)
        K = self.key(key)      # (N, key_len, embed_size)
        V = self.value(value)  # (N, value_len, embed_size)
        
        # 2. Split into multiple heads
        Q = Q.reshape(N, query_len, self.heads, self.head_dim)
        K = K.reshape(N, key_len, self.heads, self.head_dim)
        V = V.reshape(N, value_len, self.heads, self.head_dim)
        
        # 3. Transpose for matrix multiplication
        Q = Q.transpose(1, 2)  # (N, heads, query_len, head_dim)
        K = K.transpose(1, 2)  # (N, heads, key_len, head_dim)
        V = V.transpose(1, 2)  # (N, heads, value_len, head_dim)
        
        # 4. Calculate attention scores
        # Attention(Q,K,V) = softmax(QK^T/√d_k)V
        
        # Matrix multiplication: QK^T
        energy = torch.matmul(Q, K.transpose(-2, -1))  # (N, heads, query_len, key_len)
        
        # Scale: QK^T/√d_k
        scaling_factor = self.head_dim ** 0.5
        scaled_energy = energy / scaling_factor
        
        # Apply mask (optional)
        if mask is not None:
            scaled_energy = scaled_energy.masked_fill(mask == 0, float("-1e20"))
        
        # 5. Apply softmax to get attention weights
        attention = F.softmax(scaled_energy, dim=-1)  # (N, heads, query_len, key_len)
        
        # 6. Multiply by values
        out = torch.matmul(attention, V)  # (N, heads, query_len, head_dim)
        
        # 7. Reshape and concatenate heads
        out = out.transpose(1, 2)  # (N, query_len, heads, head_dim)
        out = out.reshape(N, query_len, self.embed_size)  # (N, query_len, embed_size)
        
        # 8. Final linear layer
        out = self.fc_out(out)  # (N, query_len, embed_size)
        
        return out, attention

# Example usage
def test_attention():
    # Parameters
    batch_size = 2
    sequence_length = 10
    embedding_dim = 512
    num_heads = 8
    
    # Create random input tensors
    x = torch.randn(batch_size, sequence_length, embedding_dim)
    
    # Initialize self-attention layer
    self_attention = SelfAttention(embedding_dim, num_heads)
    
    # Forward pass
    output, attention_weights = self_attention(x, x, x)
    
    # Output shapes
    print(f"Input shape: {x.shape}")
    print(f"Output shape: {output.shape}")
    print(f"Attention weights shape: {attention_weights.shape}")
    
    # Visualize one attention head for first sequence
    print("\nAttention weights (first head, first sequence):")
    print(attention_weights[0, 0].detach().numpy())

test_attention()

Input shape: torch.Size([2, 10, 512])
Output shape: torch.Size([2, 10, 512])
Attention weights shape: torch.Size([2, 8, 10, 10])

Attention weights (first head, first sequence):
[[0.08636466 0.08893646 0.09673692 0.11184515 0.09727592 0.14506705
  0.0883042  0.08184995 0.10140611 0.10221367]
 [0.16273554 0.12478302 0.07462385 0.11172861 0.06013608 0.07909601
  0.06422267 0.11619847 0.12035584 0.08611996]
 [0.12352862 0.10443546 0.07732977 0.03498055 0.06023482 0.0951709
  0.13528565 0.15707369 0.06947392 0.14248656]
 [0.07454232 0.15310286 0.09102074 0.09851746 0.09981946 0.14370447
  0.07783068 0.09990502 0.09184451 0.06971247]
 [0.09165742 0.10104419 0.11459634 0.0679939  0.094329   0.13018431
  0.08362576 0.05784969 0.13539869 0.12332077]
 [0.14267313 0.08702393 0.10666597 0.06055981 0.13048515 0.09498405
  0.1230271  0.05514885 0.09216403 0.10726799]
 [0.03731251 0.07823309 0.12634353 0.06967332 0.11493493 0.15215053
  0.100695   0.13017876 0.04352712 0.1469512 ]
 [0.08298459 0.120

In [4]:
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')  # 384-dimension embeddings

# Example document corpus
documents = [
    "Artificial intelligence is revolutionizing healthcare with predictive analytics.",
    "Machine learning models can detect patterns in medical images.",
    "Natural language processing helps in analyzing clinical notes.",
    "Deep learning systems are being used for drug discovery.",
    "Healthcare data privacy is essential when implementing AI systems."
]

# Create document embeddings
document_embeddings = model.encode(documents)

# User query
query = "How is AI used in medicine?"
query_embedding = model.encode([query])[0]

# Calculate cosine similarity between query and all documents
similarities = []
for i, doc_embedding in enumerate(document_embeddings):
    # Cosine similarity formula: cos(θ) = (A·B)/(||A||·||B||)
    dot_product = np.dot(query_embedding, doc_embedding)
    query_norm = np.linalg.norm(query_embedding)
    doc_norm = np.linalg.norm(doc_embedding)
    cosine_sim = dot_product / (query_norm * doc_norm)
    similarities.append((i, cosine_sim))

# Sort by similarity (highest first)
ranked_results = sorted(similarities, key=lambda x: x[1], reverse=True)

# Display ranked results
print("Query:", query)
print("\nRanked Results:")
for doc_id, similarity in ranked_results:
    print(f"Document {doc_id}: {documents[doc_id]}")
    print(f"Similarity Score: {similarity:.4f}")
    print("-" * 50)

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


Query: How is AI used in medicine?

Ranked Results:
Document 4: Healthcare data privacy is essential when implementing AI systems.
Similarity Score: 0.4998
--------------------------------------------------
Document 0: Artificial intelligence is revolutionizing healthcare with predictive analytics.
Similarity Score: 0.4890
--------------------------------------------------
Document 2: Natural language processing helps in analyzing clinical notes.
Similarity Score: 0.3850
--------------------------------------------------
Document 3: Deep learning systems are being used for drug discovery.
Similarity Score: 0.3765
--------------------------------------------------
Document 1: Machine learning models can detect patterns in medical images.
Similarity Score: 0.3513
--------------------------------------------------


In [None]:
import dash
from dash import html, dcc, Input, Output, State
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from rank_bm25 import BM25Okapi
import torch
from transformers import AutoTokenizer, AutoModel
import re
from typing import List, Dict, Tuple, Any, Optional

# Document class
class Document:
    def __init__(self, doc_id: str, text: str, metadata: Optional[Dict[str, Any]] = None):
        self.id = doc_id
        self.text = text
        self.metadata = metadata if metadata else {}

    def __str__(self):
        return f"Document(id={self.id}, text={self.text[:50]}...)"

# Chunker class
class Chunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def chunk_document(self, doc: Document) -> List[Document]:
        text = doc.text
        chunks = []
        start = 0
        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            sentence_end = max(
                text.rfind('. ', start, end),
                text.rfind('! ', start, end),
                text.rfind('? ', start, end)
            )
            if sentence_end > start + 100:
                end = sentence_end + 1
            chunk_text = text[start:end].strip()
            chunk_id = f"{doc.id}_chunk_{len(chunks)}"
            chunk_metadata = doc.metadata.copy()
            chunk_metadata.update({
                'parent_id': doc.id,
                'chunk_id': len(chunks),
                'start_char': start,
                'end_char': end
            })
            chunks.append(Document(chunk_id, chunk_text, chunk_metadata))
            start = end - self.chunk_overlap
            if start >= len(text) - 50:
                break
        return chunks

# SemanticEmbedder class
class SemanticEmbedder:
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.model.eval()

    def _mean_pooling(self, model_output, attention_mask):
        token_embeddings = model_output[0]
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    def embed(self, texts: List[str]) -> np.ndarray:
        encoded_input = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors='pt')
        with torch.no_grad():
            model_output = self.model(**encoded_input)
        embeddings = self._mean_pooling(model_output, encoded_input['attention_mask'])
        embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
        return embeddings.numpy()

# HybridRetriever class
class HybridRetriever:
    def __init__(self, documents: List[Document], semantic_weight: float = 0.7):
        self.documents = documents
        self.doc_texts = [doc.text for doc in documents]
        self.semantic_weight = semantic_weight
        tokenized_docs = [text.lower().split() for text in self.doc_texts]
        self.bm25 = BM25Okapi(tokenized_docs)
        self.embedder = SemanticEmbedder()
        self.doc_embeddings = self.embedder.embed(self.doc_texts)

    def retrieve(self, query: str, top_k: int = 5) -> List[Document]:
        tokenized_query = query.lower().split()
        bm25_scores = np.array(self.bm25.get_scores(tokenized_query))
        if np.max(bm25_scores) > 0:
            bm25_scores = bm25_scores / np.max(bm25_scores)
        query_embedding = self.embedder.embed([query])[0]
        semantic_scores = np.dot(self.doc_embeddings, query_embedding)
        combined_scores = self.semantic_weight * semantic_scores + (1 - self.semantic_weight) * bm25_scores
        top_indices = np.argsort(-combined_scores)[:top_k]
        results = []
        for idx in top_indices:
            doc = self.documents[idx]
            doc.metadata['retrieval_scores'] = {
                'combined_score': float(combined_scores[idx]),
                'semantic_score': float(semantic_scores[idx]),
                'bm25_score': float(bm25_scores[idx])
            }
            results.append(doc)
        return results

# Reranker class
class Reranker:
    def __init__(self):
        self.tfidf = TfidfVectorizer()

    def fit(self, documents: List[Document]):
        self.tfidf.fit([doc.text for doc in documents])
        return self

    def rerank(self, query: str, docs: List[Document], top_k: int = None) -> List[Document]:
        if not docs:
            return []
        doc_texts = [doc.text for doc in docs]
        doc_vectors = self.tfidf.transform(doc_texts)
        query_vector = self.tfidf.transform([query])
        similarities = np.dot(doc_vectors, query_vector.T).toarray().flatten()
        scored_docs = list(zip(docs, similarities))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        if top_k:
            scored_docs = scored_docs[:top_k]
        result_docs = []
        for doc, score in scored_docs:
            doc.metadata['reranking_score'] = float(score)
            result_docs.append(doc)
        return result_docs

# RAGSystem class
class RAGSystem:
    def __init__(self, documents: List[Document]):
        self.chunker = Chunker(chunk_size=512, chunk_overlap=128)
        self.chunks = []
        for doc in documents:
            self.chunks.extend(self.chunker.chunk_document(doc))
        self.retriever = HybridRetriever(self.chunks)
        self.reranker = Reranker().fit(self.chunks)

    def answer_query(self, query: str, num_chunks: int = 5) -> Dict[str, Any]:
        retrieved_chunks = self.retriever.retrieve(query, top_k=num_chunks * 2)
        reranked_chunks = self.reranker.rerank(query, retrieved_chunks, top_k=num_chunks)
        context = "\n\n".join([f"[Document {i+1}]: {chunk.text}" for i, chunk in enumerate(reranked_chunks)])
        answer = self._mock_generate_answer(query, context)
        return {
            "query": query,
            "answer": answer,
            "context": context,
            "sources": [chunk.metadata for chunk in reranked_chunks]
        }

    def _mock_generate_answer(self, query: str, context: str) -> str:
        return f"This is a mock answer to your query: '{query}'\n\nContext:\n{context[:500]}..."

# Sample documents
docs = [
    Document("doc1", "Artificial intelligence is a branch of computer science that focuses on building smart machines."),
    Document("doc2", "Machine learning is a subset of AI that gives computers the ability to learn from data."),
    Document("doc3", "Natural language processing is a field of AI that enables machines to understand human language."),
]

# Initialize RAG system
rag_system = RAGSystem(docs)

# Dash app setup
app = dash.Dash(__name__)
app.title = "RAG Demo"

app.layout = html.Div([
    html.H2("Retrieval-Augmented Generation (RAG) System"),
    dcc.Textarea(id="query-input", placeholder="Enter your query here...", style={"width": "100%", "height": 100}),
    html.Button("Submit", id="submit-btn", n_clicks=0),
    html.Hr(),
    html.Div(id="answer-output", style={"whiteSpace": "pre-wrap", "marginTop": 20}),
])

@app.callback(
    Output("answer-output", "children"),
    Input("submit-btn", "n_clicks"),
    State("query-input", "value")
)
def handle_query(n_clicks, query):
    if n_clicks > 0 and query:
        result = rag_system.answer_query(query)
        return f"Answer:\n{result['answer']}\n\nSources:\n" + "\n".join(
            f"- {src['parent_id']} (score: {src.get('reranking_score', 0):.4f})"
            for src in result['sources']
        )
    return ""

if __name__ == "__main__":
    app.run_server(debug=True)


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x107e93800>>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/agent-env/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 


In [None]:
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from rank_bm25 import BM25Okapi
import torch
from transformers import AutoTokenizer, AutoModel
import re
from typing import List, Dict, Tuple, Any, Optional
from sklearn.metrics.pairwise import cosine_similarity

class Document:
    """Represents a document with metadata."""
    def __init__(self, doc_id: str, text: str, metadata: Optional[Dict[str, Any]] = None):
        self.id = doc_id
        self.text = text
        self.metadata = metadata if metadata else {}
        
    def __str__(self):
        return f"Document(id={self.id}, text={self.text[:50]}...)"

class Chunker:
    """Chunks documents into smaller pieces."""
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
    def chunk_document(self, doc: Document) -> List[Document]:
        """Split document into chunks with overlap."""
        text = doc.text
        chunks = []
        
        # Simple character-based chunking
        start = 0
        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            
            # Try to end at a sentence boundary
            if end < len(text):
                # Look for sentence boundary (., !, ?)
                sentence_end = max(
                    text.rfind('. ', start, end),
                    text.rfind('! ', start, end),
                    text.rfind('? ', start, end)
                )
                
                if sentence_end > start + 100:  # Only use if not too short
                    end = sentence_end + 1
            
            # Create chunk document
            chunk_text = text[start:end].strip()
            chunk_id = f"{doc.id}_chunk_{len(chunks)}"
            chunk_metadata = doc.metadata.copy()
            chunk_metadata['parent_id'] = doc.id
            chunk_metadata['chunk_id'] = len(chunks)
            chunk_metadata['start_char'] = start
            chunk_metadata['end_char'] = end
            
            chunks.append(Document(chunk_id, chunk_text, chunk_metadata))
            
            # Move start position for next chunk (with overlap)
            start = end - self.chunk_overlap
            if start >= len(text) - 50:  # Avoid tiny chunks at the end
                break
                
        return chunks

class SemanticEmbedder:
    """Creates embeddings using a transformer model."""
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.model.eval()
        
    def _mean_pooling(self, model_output, attention_mask):
        """Mean pooling to get sentence embeddings."""
        token_embeddings = model_output[0]
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    
    def embed(self, texts: List[str]) -> np.ndarray:
        """Create embeddings for a list of texts."""
        # Tokenize
        encoded_input = self.tokenizer(
            texts, 
            padding=True, 
            truncation=True, 
            max_length=512, 
            return_tensors='pt'
        )
        
        # Compute token embeddings
        with torch.no_grad():
            model_output = self.model(**encoded_input)
        
        # Mean pooling
        embeddings = self._mean_pooling(model_output, encoded_input['attention_mask'])
        
        # Normalize
        embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
        
        return embeddings.numpy()

class HybridRetriever:
    """Retrieves documents using both lexical and semantic search."""
    def __init__(self, documents: List[Document], semantic_weight: float = 0.7, 
                 precomputed_embeddings: Optional[np.ndarray] = None):
        self.documents = documents
        self.doc_texts = [doc.text for doc in documents]
        self.semantic_weight = semantic_weight
        
        # Lexical search with BM25
        tokenized_docs = [text.lower().split() for text in self.doc_texts]
        self.bm25 = BM25Okapi(tokenized_docs)
        

        # Semantic search - use precomputed embeddings if provided
        self.embedder = SemanticEmbedder()
        if precomputed_embeddings is not None:
            self.doc_embeddings = precomputed_embeddings
        else:
            self.doc_embeddings = self.embedder.embed(self.doc_texts)
class Reranker:
    """Reranks retrieved documents based on query relevance."""
    def __init__(self, cross_encoder_name: str = None):
        # In a real implementation, would use a cross-encoder model
        # Here we'll implement a simple reranker based on term overlap
        self.tfidf = TfidfVectorizer()
        
    def fit(self, documents: List[Document]):
        """Fit the reranker on documents."""
        self.tfidf.fit([doc.text for doc in documents])
        return self
        
    def rerank(self, query: str, docs: List[Document], top_k: int = None) -> List[Document]:
        """Rerank documents based on query relevance."""
        if not docs:
            return []
            
        # Create TF-IDF vectors
        doc_texts = [doc.text for doc in docs]
        doc_vectors = self.tfidf.transform(doc_texts)
        query_vector = self.tfidf.transform([query])
        
        # Calculate similarity scores
        similarities = np.dot(doc_vectors, query_vector.T).toarray().flatten()
        
        # Sort documents by similarity
        scored_docs = list(zip(docs, similarities))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        # Take top_k if specified
        if top_k:
            scored_docs = scored_docs[:top_k]
        
        # Update metadata with reranking score
        result_docs = []
        for doc, score in scored_docs:
            doc.metadata['reranking_score'] = float(score)
            result_docs.append(doc)
            
        return result_docs

class RAGSystem:
    """Complete RAG system for retrieval and generation."""
    def __init__(self, documents: List[Document]):
        # Process documents
        self.chunker = Chunker(chunk_size=512, chunk_overlap=128)
        self.chunks = []
        for doc in documents:
            self.chunks.extend(self.chunker.chunk_document(doc))
        
        print(f"Created {len(self.chunks)} chunks from {len(documents)} documents")
        
        # Initialize retriever and reranker
        self.retriever = HybridRetriever(self.chunks, semantic_weight=0.7)
        self.reranker = Reranker().fit(self.chunks)
        
        # For a real system, would initialize an LLM here
        
    def answer_query(self, query: str, num_chunks: int = 5) -> Dict[str, Any]:
        """Answer a query using RAG."""
        # 1. Retrieve relevant chunks
        retrieved_chunks = self.retriever.retrieve(query, top_k=num_chunks*2)
        
        # 2. Rerank chunks
        reranked_chunks = self.reranker.rerank(query, retrieved_chunks, top_k=num_chunks)
        
        # 3. Create context
        context = "\n\n".join([f"[Document {i+1}]: {chunk.text}" 
                              for i, chunk in enumerate(reranked_chunks)])
        
        # 4. Generate answer (in a real system, this would callclass RAGSystem:
    """Complete RAG system for retrieval and generation."""
    def __init__(self, documents: List[Document]):
        # Process documents
        self.chunker = Chunker(chunk_size=512, chunk_overlap=128)
        self.chunks = []
        for doc in documents:
            self.chunks.extend(self.chunker.chunk_document(doc))
        
        print(f"Created {len(self.chunks)} chunks from {len(documents)} documents")
        
        # Initialize retriever and reranker
        self.retriever = HybridRetriever(self.chunks, semantic_weight=0.7)
        self.reranker = Reranker().fit(self.chunks)
        
        # For a real system, would initialize an LLM here
        
    def answer_query(self, query: str, num_chunks: int = 5) -> Dict[str, Any]:
        """Answer a query using RAG."""
        # 1. Retrieve relevant chunks
        retrieved_chunks = self.retriever.retrieve(query, top_k=num_chunks*2)
        
        # 2. Rerank chunks
        reranked_chunks = self.reranker.rerank(query, retrieved_chunks, top_k=num_chunks)
        
        # 3. Create context
        context = "\n\n".join([f"[Document {i+1}]: {chunk.text}" 
                              for i, chunk in enumerate(reranked_chunks)])
        
        # 4. Generate answer (in a real system, this would call an LLM API)
        # Here we'll simulate a simple response generation
        answer = self._generate_answer(query, reranked_chunks)
        
        # 5. Return complete response with metadata
        return {
            "query": query,
            "answer": answer,
            "context": context,
            "source_chunks": reranked_chunks,
        }
    
    def _generate_answer(self, query: str, chunks: List[Document]) -> str:
        """Simulate generating an answer from chunks."""
        # In a real implementation, this would prompt an LLM with the query and chunks
        # For this example, we'll create a simple extractive answer
        
        if not chunks:
            return "I couldn't find relevant information to answer your question."
        
        # Simple extractive approach: find sentences most similar to query
        all_text = " ".join([chunk.text for chunk in chunks])
        sentences = re.split(r'(?<=[.!?])\s+', all_text)
        
        # Calculate TF-IDF similarity between query and each sentence
        tfidf = TfidfVectorizer().fit([query] + sentences)
        query_vec = tfidf.transform([query])
        sentence_vecs = tfidf.transform(sentences)
        
        # Get similarity scores
        similarities = np.dot(sentence_vecs, query_vec.T).toarray().flatten()
        
        # Get top 3 most relevant sentences
        top_indices = np.argsort(-similarities)[:3]
        top_sentences = [sentences[idx] for idx in top_indices if similarities[idx] > 0]
        
        if not top_sentences:
            return "I found some information but couldn't generate a specific answer to your question."
        
        # Combine sentences into an answer
        answer = " ".join(top_sentences)
        
        # Add citation
        sources = ", ".join([f"Document {chunk.metadata['parent_id']}" for chunk in chunks[:3]])
        answer += f"\n\nSources: {sources}"
        
        return answer


class RAGEvaluator:
    """Evaluates RAG system performance."""
    
    def __init__(self, rag_system: RAGSystem):
        self.rag_system = rag_system
        
    def evaluate_retrieval(self, queries: List[str], relevant_doc_ids: List[List[str]]) -> Dict[str, float]:
        """Evaluate retrieval performance using precision, recall, and F1."""
        precision_scores = []
        recall_scores = []
        f1_scores = []
        
        for query, rel_ids in zip(queries, relevant_doc_ids):
            # Get system results
            result = self.rag_system.answer_query(query)
            retrieved_chunks = result["source_chunks"]
            retrieved_parent_ids = set([chunk.metadata['parent_id'] for chunk in retrieved_chunks])
            
            rel_ids_set = set(rel_ids)
            
            # Calculate metrics
            if retrieved_parent_ids:
                precision = len(rel_ids_set.intersection(retrieved_parent_ids)) / len(retrieved_parent_ids)
            else:
                precision = 0.0
                
            if rel_ids:
                recall = len(rel_ids_set.intersection(retrieved_parent_ids)) / len(rel_ids_set)
            else:
                recall = 1.0 if not retrieved_parent_ids else 0.0
                
            f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
            
            precision_scores.append(precision)
            recall_scores.append(recall)
            f1_scores.append(f1)
            
        # Calculate averages
        avg_precision = np.mean(precision_scores)
        avg_recall = np.mean(recall_scores)
        avg_f1 = np.mean(f1_scores)
        
        return {
            "precision": avg_precision,
            "recall": avg_recall,
            "f1": avg_f1
        }
    
    def evaluate_answer_relevance(self, queries: List[str], expert_answers: List[str]) -> Dict[str, float]:
        """Evaluate answer relevance and correctness.
        
        In a real implementation, this would use methods like:
        1. BLEU or ROUGE scores
        2. BERTScore or other embedding similarity
        3. LLM-based evaluation
        
        Here we'll implement a simple TF-IDF cosine similarity.
        """
        similarities = []
        
        for query, expert_answer in zip(queries, expert_answers):
            # Get system answer
            result = self.rag_system.answer_query(query)
            system_answer = result["answer"]
            
            # Calculate similarity using TF-IDF
            tfidf = TfidfVectorizer().fit([expert_answer, system_answer])
            expert_vec = tfidf.transform([expert_answer])
            system_vec = tfidf.transform([system_answer])
            
            similarity = cosine_similarity(expert_vec, system_vec)[0][0]
            similarities.append(similarity)
        
        return {
            "answer_similarity": np.mean(similarities)
        }


# Example usage
def demo_rag_system():
    # Create sample documents
    documents = [
        Document("doc1", "Neural networks are computing systems vaguely inspired by the biological neural networks that constitute animal brains. An ANN is based on a collection of connected units or nodes called artificial neurons, which loosely model the neurons in a biological brain. Each connection, like the synapses in a biological brain, can transmit a signal to other neurons. An artificial neuron receives signals then processes them and can signal neurons connected to it."),
        Document("doc2", "Generative AI refers to artificial intelligence systems capable of generating new content, such as text, images, audio, and synthetic data. These systems learn patterns from existing data and use this knowledge to create new, similar but unique outputs. Popular examples include GPT (text), DALL-E (images), and WaveNet (audio)."),
        #Document("doc3", "Retrieval-Augmented Generation (RAG) combines retrieval-based and generation-based approaches for natural language processing tasks. Instead of relying solely on parametric knowledge stored in the model weights, RAG retrieves relevant documents or passages from an external knowledge source to provide context for generation. This approach helps ground the model's outputs in factual information and reduces hallucinations."),
        #Document("doc4", "Vector databases are specialized database systems designed to store, index, and query high-dimensional vectors efficiently. These vectors typically represent embeddings of text, images, or other data types in a semantic space. Popular vector databases include FAISS (Facebook AI Similarity Search), Pinecone, Weaviate, and Milvus. They support operations like nearest neighbor search using various distance metrics."),
        #Document("doc5", "Attention mechanisms in neural networks allow models to focus on specific parts of the input when producing output. The transformer architecture, introduced in the paper 'Attention is All You Need,' uses self-attention to weigh the importance of different parts of the input data. This has been revolutionary for natural language processing and other sequence modeling tasks.")
    ]
    
    # Initialize RAG system
    rag = RAGSystem(documents)
    
    # Example query
    query = "How do vector databases work with RAG systems?"
    result = rag.answer_query(query)
    
    print(f"Query: {query}")
    print(f"\nAnswer: {result['answer']}")
    print(f"\nRetrieved chunks:")
    for i, chunk in enumerate(result['source_chunks']):
        scores = chunk.metadata.get('retrieval_scores', {})
        print(f"\n[{i+1}] {chunk.id} (Score: {scores.get('combined_score', 0):.4f})")
        print(f"    - Semantic: {scores.get('semantic_score', 0):.4f}, BM25: {scores.get('bm25_score', 0):.4f}")
        print(f"    - Text: {chunk.text[:100]}...")
    
    # Evaluate system (with mock data)
    evaluator = RAGEvaluator(rag)
    eval_queries = [
        "What are neural networks?",
        "How does generative AI work?",
        "Explain RAG systems"
    ]
    relevant_docs = [
        ["doc1", "doc2"]
        #["doc2"],
        #["doc3", "doc4"]
    ]
    expert_answers = [
        "Neural networks are computing systems inspired by biological neural networks in animal brains.",
        "Generative AI systems learn patterns from existing data to create new, similar but unique outputs like text, images and audio.",
        "Retrieval-Augmented Generation (RAG) combines retrieval and generation approaches by accessing external knowledge sources to provide context for generation."
    ]
    
    retrieval_metrics = evaluator.evaluate_retrieval(eval_queries, relevant_docs)
    relevance_metrics = evaluator.evaluate_answer_relevance(eval_queries, expert_answers)
    
    print("\n=== Evaluation Results ===")
    print(f"Retrieval Metrics: Precision={retrieval_metrics['precision']:.2f}, " +
          f"Recall={retrieval_metrics['recall']:.2f}, F1={retrieval_metrics['f1']:.2f}")
    print(f"Answer Relevance: {relevance_metrics['answer_similarity']:.2f}")


if __name__ == "__main__":
    demo_rag_system()

In [29]:
import nltk
import re
import string
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import time

# NLTK Setup
nltk.download('punkt', download_dir='/Users/saviz/nltk_data')
nltk.data.path.append('/Users/saviz/nltk_data')

class SimpleRAG:
    def __init__(self, top_k=5, chunk_size=100, overlap=20):
        self.top_k = top_k
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.vectorizer = TfidfVectorizer()
        self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
        self.external_knowledge = {
            "capital of iran": "Tehran",
            "tehran": "Capital of Iran",
        }

    def _clean_text(self, text):
        return re.sub(f"[{re.escape(string.punctuation)}]", "", text.lower())

    def _split_text(self, text):
        sentences = nltk.sent_tokenize(text)
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            words = nltk.word_tokenize(sentence)
            if current_length + len(words) <= self.chunk_size:
                current_chunk.extend(words)
                current_length += len(words)
            else:
                chunks.append(" ".join(current_chunk))
                current_chunk = words[-self.overlap:]
                current_length = len(current_chunk)

        if current_chunk:
            chunks.append(" ".join(current_chunk))
        return chunks

    def fit(self, documents):
        print("\n⏱️ Starting fit process...")
        start_time = time.time()

        print("📄 Splitting documents...")
        self.chunks = []
        self.doc_ids = []
        for i, doc in enumerate(documents):
            cleaned = self._clean_text(doc)
            chunks = self._split_text(cleaned)
            self.chunks.extend(chunks)
            self.doc_ids.extend([i] * len(chunks))

        print("📚 Building TF-IDF index...")
        self.bm25_matrix = self.vectorizer.fit_transform(self.chunks)

        print("🧠 Generating embeddings...")
        self.embeddings = self.embedding_model.encode(self.chunks, show_progress_bar=True)
        
        print(f"✅ Fit completed in {time.time() - start_time:.2f}s\n")

    def retrieve(self, query):
        cleaned_query = self._clean_text(query)
        
        # Check external knowledge first
        if cleaned_query in self.external_knowledge:
            return [self.external_knowledge[cleaned_query]], [0]

        # Semantic similarity
        query_embedding = self.embedding_model.encode([cleaned_query])[0]
        semantic_scores = cosine_similarity([query_embedding], self.embeddings)[0]
        
        # TF-IDF similarity
        query_vec = self.vectorizer.transform([cleaned_query])
        bm25_scores = (self.bm25_matrix @ query_vec.T).toarray().flatten()
        
        # Normalized hybrid scoring
        norm_semantic = (semantic_scores - semantic_scores.min()) / (semantic_scores.max() - semantic_scores.min() + 1e-9)
        norm_bm25 = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-9)
        combined_scores = norm_semantic + norm_bm25
        
        top_indices = combined_scores.argsort()[-self.top_k:][::-1]
        return [self.chunks[i] for i in top_indices], top_indices

    def rerank(self, query, chunks):
        cleaned_query = self._clean_text(query)
        texts = [cleaned_query] + chunks
        tfidf = TfidfVectorizer().fit_transform(texts)
        similarities = cosine_similarity(tfidf[0:1], tfidf[1:]).flatten()
        return [chunks[i] for i in similarities.argsort()[::-1]]

    def answer(self, query):
        print(f"\n🧠 Query: \"{query}\"")
        start_time = time.time()
        
        # Direct knowledge check
        cleaned_query = self._clean_text(query)
        if "capital of " in cleaned_query:
            print(f"✅ Completed in {time.time() - start_time:.2f}s")
            return ""

        retrieved_chunks, _ = self.retrieve(query)
        reranked_chunks = self.rerank(query, retrieved_chunks)
        
        final_answer = reranked_chunks[0] if reranked_chunks else "No relevant information found"
        print(f"✅ Completed in {time.time() - start_time:.2f}s")
        return final_answer

# Usage
if __name__ == "__main__":
    documents = [
        "Paris is the capital of France. It is known for the Eiffel Tower.",
        "Berlin is the capital of Germany. It has a rich history and vibrant culture.",
        "Rome is the capital of Italy. It is famous for the Colosseum and Roman architecture."
    ]

    rag = SimpleRAG(top_k=3)
    rag.fit(documents)
    
    result = rag.answer("What is the capital of India?")
    print("\n📢 Answer:", result)


[nltk_data] Downloading package punkt to /Users/saviz/nltk_data...
[nltk_data]   Package punkt is already up-to-date!



⏱️ Starting fit process...
📄 Splitting documents...
📚 Building TF-IDF index...
🧠 Generating embeddings...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Fit completed in 0.01s


🧠 Query: "What is the capital of India?"
✅ Completed in 0.00s

📢 Answer: 
