# ðŸ“˜ Notebook: Robust Tiny LLM System

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/your-repo/series1-coding-exercises/blob/main/exercises/blog-12/exercise-00.ipynb)

**Colab-ready, minimal hardcoding**

This notebook demonstrates a production-ready LLM system architecture with:
- RAG (Retrieval-Augmented Generation)
- Intent classification
- Entity extraction
- Deterministic business logic tools
- Hybrid LLM routing (local + API)
- Observability and logging
- Evaluation harness
- Drift detection

**Key principle:** Business logic and data live outside the model. The orchestrator coordinates deterministic tools, RAG retrieval, and LLM calls.

## 0) Install Dependencies

Run this cell once to install all required packages.

In [None]:
%pip install -q transformers sentence-transformers faiss-cpu openai datasets sqlalchemy prettytable scikit-learn dateparser

## 1) Imports + Config

This cell sets up the environment. We avoid hardcoding model backends â€” `OPENAI_KEY` is optional. In production you'd manage keys via secret managers and rotate them.

In [None]:
# == Imports ==
import os, time, json, sqlite3, re
from datetime import datetime
from typing import List, Dict, Any

import numpy as np
import pandas as pd
import dateparser

import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer

from sentence_transformers import SentenceTransformer
import faiss

# Small ML utilities for a lightweight intent classifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression

# Optional OpenAI (ChatGPT) client (function-calling style possible)
import openai

# == Config ==
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")  # set in Colab if you want live ChatGPT
if OPENAI_KEY:
    openai.api_key = OPENAI_KEY

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

## 2) Tiny "Real-ish" Knowledge Base

Simulate a tiny knowledge base as JSON-like objects (easy to extend or load from files/db).

**Why load like this?** Real systems keep KBs externally (CMS, DB, knowledge store). The orchestrator fetches and indexes them; you never bake business facts into code.

In [None]:
# Simulate a tiny knowledge base as JSON-like objects (easy to extend or load from files/db)
kb = [
    {"id":"policy_standard","title":"Refund policy - standard","text":"Refunds: standard plans eligible within 30 days of purchase. Proof required."},
    {"id":"policy_premium","title":"Refund policy - premium","text":"Premium plans are non-refundable except hardware failures, reviewed case-by-case."},
    {"id":"faq_refund","title":"How to request refund","text":"To request a refund, email billing@company.com with order id and reason. Processing: up to 7 business days."},
    {"id":"faq_support","title":"Support hours","text":"Technical support: Mon-Fri, 9:00-18:00 local time."}
]

# You can persist / update this KB externally (S3, DB) and re-index when it changes.

## 3) Build RAG Index

Choose embedding model (cheap & effective). In prod, consider larger encoders or a managed vector DB (e.g., Pinecone, Weaviate).

**Real-world note:** you'll likely move to a vector DB (Pinecone/Weaviate/FAISS-on-cloud) for scale, persistence, filtering, and metadata scoring.

In [None]:
# Choose embedding model (cheap & effective). In prod, consider larger encoders or a managed vector DB (e.g., Pinecone, Weaviate).
EMBED_MODEL_NAME = "all-MiniLM-L6-v2"
embedder = SentenceTransformer(EMBED_MODEL_NAME)

# Index creation helper
def build_rag_index(documents: List[Dict[str,str]]):
    texts = [d["text"] for d in documents]
    embeddings = embedder.encode(texts, convert_to_numpy=True)
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(embeddings)
    id_map = {i: documents[i] for i in range(len(documents))}
    return index, id_map

index, id_map = build_rag_index(kb)

def retrieve(query: str, k=3):
    q_emb = embedder.encode([query], convert_to_numpy=True)
    D, I = index.search(q_emb, k)
    return [id_map[int(i)] for i in I[0]]

# quick check (non-hardcoded query)
print([d['title'] for d in retrieve("How do I get a refund?")])

## 4) Intent Classification

We use a tiny TFâ€“IDF â†’ logistic regression model. This demonstrates how production systems often start with small classifiers and then graduate to model-based intent detection.

**Real-world:** intent classifier can be improved by:
- gathering labeled logs and retraining
- using transfer learning (fine-tune small transformer)
- or zero-shot classification via an LLM for cold start (e.g., OpenAI zero-shot labeler)

In [None]:
# Small labeled training set (expandable). Keep this outside code in a config or labeled dataset.
intent_samples = [
    ("I want a refund for my $120 purchase", "refund"),
    ("How can I get my money back?", "refund"),
    ("When is support available?", "support"),
    ("Who do I contact for invoices?", "billing"),
    ("Is premium refundable?", "policy_query"),
    ("What's the refund policy?", "policy_query"),
    ("Calculate refund for $50 bought 20 days ago", "refund_calc"),
    ("Get refund amount for $99 in 40 days", "refund_calc")
]

texts, labels = zip(*intent_samples)
vectorizer = TfidfVectorizer(ngram_range=(1,2), min_df=1)
X = vectorizer.fit_transform(texts)
clf = LogisticRegression(max_iter=500).fit(X, labels)

# helper
def predict_intent(query: str, threshold=0.5):
    x = vectorizer.transform([query])
    probs = clf.predict_proba(x)[0]
    pred = clf.classes_[probs.argmax()]
    return pred, probs.max()

# test
for q in [
    "I paid $200, what is my refund?",
    "What time is tech support open?",
    "calculate refund $40"
]:
    print(q, "->", predict_intent(q))

## 5) Robust Entity Extraction

**Strategy:**
- Try fast deterministic parsers (regex for dollar amounts)
- Try dateparser for natural-language dates
- If missing, fallback to an LLM function call to extract structured fields

**Real-world options:**
- Duckling (by Rasa) for dates, amounts, durations â€” robust and fast
- spaCy NER or HuggingFace token-classifier for custom entities
- LLM function-calling as a fallback for messy, ambiguous queries (structured JSON returns)

In [None]:
# amount regex
AMOUNT_RE = re.compile(r"\$(\d+(?:\.\d{1,2})?)")

def extract_amount(query: str):
    m = AMOUNT_RE.search(query)
    return float(m.group(1)) if m else None

def extract_days(query: str):
    # look for "X days" or natural phrases like "30 days after"
    m = re.search(r"(\d+)\s+days", query)
    if m:
        return int(m.group(1))
    # fallback to natural date parsing: try to parse "bought on 2025-01-01" etc.
    # dateparser lets us compute deltas if needed (left as an exercise)
    return None

def extract_entities(query: str):
    amount = extract_amount(query)
    days = extract_days(query)
    return {"amount": amount, "days": days}

# test
print(extract_entities("I paid $200 twenty days ago, do I get a refund?"))
print(extract_entities("Refund $45"))

## 6) Deterministic Tool Implementations

Parameterized deterministic business logic. In production this belongs to a service with robust tests.

**Real-world:** business rules live in services/feature flags, versioned, tested, and audited â€” not inline scripts. Edge cases logged and escalated.

In [None]:
def calculate_refund(amount: float, days_since_purchase: int, plan_tier: str = "standard"):
    """
    Parameterized deterministic business logic. In production this belongs to a service with robust tests.
    """
    if amount is None:
        return {"error":"amount_missing"}
    if plan_tier.lower() == "premium":
        return {"refund": 0.0, "reason": "premium_non_refundable"}
    if days_since_purchase is None:
        # If days unknown, be conservative: require more info
        return {"error":"days_missing"}
    refund = amount if days_since_purchase <= 30 else 0.0
    return {"refund": refund}

## 7) Local Small LM Fallback + Optional OpenAI Call

Local small LM for low-risk completions / demo: distilgpt2

**Real-world:** many shops use a hybrid approach:
- local small models for cheap generation, filtering, and previews
- API models (OpenAI, Anthropic) for higher-quality responses where cost justified
- function-calling / tool-enabled chat for structured outputs

In [None]:
# Local small LM for low-risk completions / demo: distilgpt2
LOCAL_MODEL_NAME = "distilgpt2"
local_tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL_NAME)
local_model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL_NAME).to(DEVICE)
local_model.eval()

def call_local_llm(prompt: str, max_new_tokens=80, temperature=0.7):
    inputs = local_tokenizer(prompt, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        out = local_model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=temperature, top_k=50)
    return local_tokenizer.decode(out[0], skip_special_tokens=True)

def call_openai_completion(prompt: str, max_tokens=150, temperature=0.2):
    # Minimal wrapper (user can swap to chat api + function calling)
    resp = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=max_tokens, temperature=temperature)
    return resp.choices[0].text.strip() if resp and resp.choices else ""

## 8) Orchestrator: Decision Logic

The orchestrator uses the classifier to choose path; entity extraction is layered; deterministic tool handles finance; RAG grounds policy queries; OpenAI used if available; everything logged for observability.

In [None]:
# Logging DB (sqlite) - still fine for prototyping
conn = sqlite3.connect("llm_system_logs_v2.db", check_same_thread=False)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS logs(
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 ts TEXT, query TEXT, intent TEXT, intent_conf REAL,
 retrieved TEXT, tool_called TEXT, model_used TEXT, response TEXT, latency REAL
)
""")
conn.commit()

def log_event(row: dict):
    cur.execute(
        "INSERT INTO logs(ts, query, intent, intent_conf, retrieved, tool_called, model_used, response, latency) VALUES(?,?,?,?,?,?,?,?,?)",
        (row.get("ts"), row.get("query"), row.get("intent"), row.get("intent_conf"),
         json.dumps(row.get("retrieved")), row.get("tool_called"), row.get("model_used"),
         row.get("response"), row.get("latency"))
    )
    conn.commit()

def orchestrate(query: str, use_openai=True):
    start = time.time()
    # 1) intent
    intent, conf = predict_intent(query)
    tool_called = None
    retrieved = []
    response = ""
    model_used = None

    # 2) If intent suggests refund calculation, extract entities and call deterministic tool:
    if intent in ("refund_calc", "refund"):
        ents = extract_entities(query)
        # If critical entity missing, try LLM-assisted extraction via function-call pattern (simple fallback)
        if ents["amount"] is None or ents["days"] is None:
            # Try using LLM to extract structured JSON - here we use local LLM as simple fallback; in prod use ChatGPT function-calling
            prompt = f"Extract amount and days from this query as JSON: \"{query}\". Return {{\"amount\":..., \"days\":...}}"
            model_used = "local_extractor"
            raw = call_local_llm(prompt, max_new_tokens=40, temperature=0.0)
            # naive parse for numbers in returned text
            a = AMOUNT_RE.search(raw)
            d = re.search(r"(\d+)\s+days", raw)
            ents["amount"] = ents["amount"] or (float(a.group(1)) if a else None)
            ents["days"] = ents["days"] or (int(d.group(1)) if d else None)

        # call deterministic business logic
        tool_called = f"calculate_refund(amount={ents['amount']}, days={ents['days']})"
        res = calculate_refund(ents["amount"], ents["days"])
        response = f"Refund result: {res}"
        model_used = model_used or "deterministic_tool"
    else:
        # 3) RAG-grounded answer: retrieve context and call an LLM (openai preferred if configured)
        retrieved = retrieve(query, k=3)
        context = "\n".join([r["text"] for r in retrieved])
        prompt = f"Use only the context to answer. Context:\n{context}\nQuestion: {query}\nIf not answerable, say 'Not enough information.'"
        if use_openai and OPENAI_KEY:
            model_used = "openai_api"
            try:
                response = call_openai_completion(prompt, temperature=0.2)
            except Exception as e:
                model_used = "local_fallback"
                response = call_local_llm(prompt, temperature=0.3)
        else:
            model_used = "local_model"
            response = call_local_llm(prompt, temperature=0.3)

    latency = time.time() - start
    log_event({
        "ts": datetime.utcnow().isoformat(),
        "query": query,
        "intent": intent,
        "intent_conf": float(conf),
        "retrieved": [r["id"] for r in retrieved],
        "tool_called": tool_called,
        "model_used": model_used,
        "response": response,
        "latency": latency
    })
    return response

# quick demo
queries = [
    "I paid $120 thirty days ago. How much will be refunded?",
    "How do I request a refund?",
    "Are premium subscriptions refundable?"
]

for q in queries:
    print("Q:", q)
    print("A:", orchestrate(q))
    print("-"*60)

## 9) Evaluation Harness

Test set stored as data (expandable); not baked into orchestration.

**Real-world evaluation** uses curated question sets, continuous A/B, and human review for edge cases. Store tests in files and run nightly.

In [None]:
# Test set stored as data (expandable); not baked into orchestration.
tests = [
    {"query":"What is the refund window?", "expected_substr":"30"},
    {"query":"Who do I email for a refund?", "expected_substr":"billing@company.com"},
    {"query":"If I paid $150 and it's been 20 days, what's my refund?", "expected_substr":"150"}
]

def evaluate_system(test_cases):
    results = []
    for t in test_cases:
        ans = orchestrate(t["query"])
        ok = t["expected_substr"].lower() in ans.lower()
        results.append({"query":t["query"], "answer":ans, "expected":t["expected_substr"], "ok":ok})
    return pd.DataFrame(results)

df_results = evaluate_system(tests)
print(df_results)

## 10) Drift Simulation

Simulate KB change (drift) by appending a new policy version.

**Ops note:** In production the KB update should trigger reindexing and a policy that notifies product/ops teams (CI/CD style). Observability surfaces drift (sudden changes in answer distributions).

In [None]:
# Simulate KB change (drift) by appending a new policy version
new_doc = {"id":"policy_update_2025","title":"Refund policy update","text":"As of 2025-01-01 refunds are allowed within 14 days for standard plans."}
kb.append(new_doc)

# Rebuild index (in production you'd update index incrementally)
index, id_map = build_rag_index(kb)

# Query
print("After policy change:", orchestrate("What is the refund window?"))

# Inspect last logs
logs_df = pd.read_sql_query("SELECT * FROM logs ORDER BY id DESC LIMIT 10", conn)
logs_df[['ts','query','intent','intent_conf','retrieved','tool_called','model_used','latency']].head(10)

## 11) Observability & Alert Ideas

Example: compute percent of "Not enough information" answers or "fallback to local" per day.

**Set alerts when:**
- `openai_api` calls spike (cost)
- `local_fallback` increases (quality drop)
- fraction of "Not enough information" grows (coverage gap)

In [None]:
# Example: compute percent of "Not enough information" answers or "fallback to local" per day
logs_df = pd.read_sql_query("SELECT * FROM logs", conn)
logs_df['date'] = pd.to_datetime(logs_df['ts']).dt.date
summary = logs_df.groupby(['date','model_used']).size().unstack(fill_value=0)
print(summary.tail())

## Final Notes: Why These Choices, and What Real Systems Use

### Intent Detection
- **Notebook:** tiny TFâ€“IDF + logistic regression (cheap, fast, transparent)
- **Real world:** start small, then move to a fine-tuned transformer or zero-shot LLM classifier (for dozens of intents). Use labeled logs to retrain.

### Entity Parsing
- **Notebook:** regex + dateparser + local LLM fallback
- **Real world:** Duckling (dates/amounts), spaCy/HuggingFace NER for custom entities, or function-calling LLM for complex extractions. Always prefer deterministic parsers for safety-critical fields (money, dates, identifiers).

### RAG / Retrieval
- **Notebook:** Sentence-Transformers + FAISS
- **Real world:** managed vector DBs (Pinecone, Weaviate) for persistence, metadata filters, scalable nearest-neighbor, and multi-vector per doc support.

### LLM Choices
- **Notebook:** local small LM for cheap demos + optional ChatGPT for quality
- **Real world:** hybrid â€” local models for first pass + high-quality APIs for critical answers; caching, rate limits, and cost-aware routing.

### Tools / Deterministic Services
- **Notebook:** `calculate_refund()` as a deterministic service
- **Real world:** push business logic into microservices with tests, audit logs, and access control. LLMs should never be the source of truth for actions that change state without deterministic verification.

### Observability & Evaluation
- **Notebook:** SQLite logs + simple metrics
- **Real world:** structured logs, dashboards, alerting, drift detection, automated regression tests, and periodic human review panels.

### Safety & Guardrails
- Always validate extracted critical entities before acting
- Rate-limiting, authentication, and policy enforcement live outside the model