<a href="https://colab.research.google.com/github/sovandas3/AI-CODE/blob/main/AI_based_Summarizer_(PDF_CSV_TXT).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1/10 - Install required libraries
!pip install -q chromadb sentence-transformers transformers accelerate gradio pdfplumber tiktoken

# **Imports, Chroma client init, load embedding + generation models (run)**

In [None]:
# 2/10 - Imports and initialization
import os, uuid, tempfile, traceback
from pathlib import Path
from typing import List, Optional, Any, Dict

# ML libs
import torch
from sentence_transformers import SentenceTransformer
from transformers import pipeline

# DB/UI
import chromadb
import pdfplumber
import pandas as pd
import gradio as gr

# Where to persist the Chroma DB in this Colab session
CHROMA_DIR = "/content/chroma_db"
os.makedirs(CHROMA_DIR, exist_ok=True)

# Initialize Chroma client (use PersistentClient if available; fallback to in-memory)
chroma_client = None
try:
    chroma_client = chromadb.PersistentClient(path=CHROMA_DIR)
    print("Using chromadb.PersistentClient with path:", CHROMA_DIR)
except Exception as e:
    print("PersistentClient not available or failed:", e)
    try:
        chroma_client = chromadb.Client()
        print("Using chromadb.Client() (in-memory).")
    except Exception as e2:
        print("Failed to create Chroma client:", e2)
        raise

# Embeddings model (SentenceTransformers)
EMB_MODEL_NAME = "all-MiniLM-L6-v2"
print("Loading SentenceTransformer model (may take 20-60s)...")
sbert = SentenceTransformer(EMB_MODEL_NAME)
print("Loaded embedding model:", EMB_MODEL_NAME)

# Generation model (HuggingFace local). Choose an instruction-tuned model that's small-ish
# Default: google/flan-t5-small (text2text)
GEN_MODEL_NAME = "google/flan-t5-small"

device = 0 if torch.cuda.is_available() else -1
print("Using device:", ("GPU" if device == 0 else "CPU (no GPU)"))

print(f"Loading generation model '{GEN_MODEL_NAME}' into pipeline (this may download weights)...")
gen_pipe = pipeline("text2text-generation", model=GEN_MODEL_NAME, tokenizer=GEN_MODEL_NAME, device=device)
print("Generation pipeline ready:", GEN_MODEL_NAME)


Using chromadb.PersistentClient with path: /content/chroma_db
Loading SentenceTransformer model (may take 20-60s)...
Loaded embedding model: all-MiniLM-L6-v2
Using device: CPU (no GPU)
Loading generation model 'google/flan-t5-small' into pipeline (this may download weights)...


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/308M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

Device set to use cpu


Generation pipeline ready: google/flan-t5-small


# **Token-aware chunking helpers (uses tiktoken if available; fallback to char chunking)**

In [None]:
# 3/10 - Token-aware chunking (tiktoken optional)
try:
    import tiktoken
    _has_tiktoken = True
    enc = tiktoken.encoding_for_model("gpt-3.5-turbo")
    print("tiktoken available: using token-aware chunking")
except Exception:
    _has_tiktoken = False
    enc = None
    print("tiktoken NOT available: using character-based chunking")

def chunk_text_token_aware(text: str, chunk_tokens: int = 512, overlap_tokens: int = 64) -> List[str]:
    if not _has_tiktoken:
        return chunk_text_char_based(text, chunk_size=1200, overlap=200)
    tokens = enc.encode(text)
    chunks = []
    start = 0
    n = len(tokens)
    while start < n:
        end = min(start + chunk_tokens, n)
        chunk = enc.decode(tokens[start:end]).strip()
        if chunk:
            chunks.append(chunk)
        if end == n:
            break
        start = max(end - overlap_tokens, end)
    return chunks

def chunk_text_char_based(text: str, chunk_size: int = 1200, overlap: int = 200) -> List[str]:
    chunks = []
    start = 0
    L = len(text)
    while start < L:
        end = min(start + chunk_size, L)
        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)
        if end == L:
            break
        start = max(end - overlap, end)
    return chunks

def chunk_text(text: str, use_token: bool = True, chunk_tokens: int = 512, overlap_tokens: int = 64, chunk_size:int=1200, overlap:int=200):
    if use_token and _has_tiktoken:
        return chunk_text_token_aware(text, chunk_tokens=chunk_tokens, overlap_tokens=overlap_tokens)
    else:
        return chunk_text_char_based(text, chunk_size=chunk_size, overlap=overlap)


tiktoken available: using token-aware chunking


# **Text extraction helpers (PDF/TXT/CSV)**

In [None]:
# 4/10 - Extract text from file path
def extract_text_from_path(path: str) -> str:
    path = str(path)
    ext = Path(path).suffix.lower()
    if ext == ".pdf":
        texts = []
        with pdfplumber.open(path) as pdf:
            for page in pdf.pages:
                ptext = page.extract_text()
                if ptext:
                    texts.append(ptext)
        return "\n".join(texts)
    elif ext in [".txt"]:
        return Path(path).read_text(encoding="utf-8", errors="ignore")
    elif ext in [".csv"]:
        try:
            df = pd.read_csv(path, dtype=str, keep_default_na=False)
            lines = df.fillna("").astype(str).apply(lambda row: " | ".join(row.values), axis=1).tolist()
            return "\n".join(lines)
        except Exception as e:
            print("CSV read error:", e)
            return ""
    else:
        try:
            return Path(path).read_text(encoding="utf-8", errors="ignore")
        except Exception:
            return ""


# **Embeddings & Chroma safe wrappers (robust to Chroma API differences)**

In [None]:
# REPLACE Step 5 with this cell: Robust embedding + Chroma helpers
import numpy as np
import traceback
from typing import Any, Dict, List

# Safe SBERT embedding -> always returns List[List[float]]
def embed_texts_sentence_transformer_safe(texts: List[str]) -> List[List[float]]:
    if not texts:
        return []
    try:
        # Prefer numpy output, but handle other return types defensively
        try:
            embs = sbert.encode(texts, show_progress_bar=False, convert_to_numpy=True)
        except TypeError:
            # some SBERT versions don't accept convert_to_numpy
            embs = sbert.encode(texts, show_progress_bar=False)
    except Exception as e:
        print("sbert.encode failed:", e)
        traceback.print_exc()
        raise

    # If single vector returned as 1-D array, expand to 2-D
    if isinstance(embs, np.ndarray) and embs.ndim == 1:
        embs = np.expand_dims(embs, axis=0)

    out: List[List[float]] = []
    try:
        # try numpy pathway
        arr = np.asarray(embs)
        if arr.ndim == 2:
            for row in arr:
                out.append([float(x) for x in row.reshape(-1)])
        else:
            # fallback: iterate outer dimension
            for v in embs:
                vec = [float(x) for x in np.asarray(v).reshape(-1)]
                out.append(vec)
    except Exception:
        # final fallback: try .tolist()
        try:
            for v in embs.tolist():
                out.append([float(x) for x in v])
        except Exception as e:
            print("Failed to coerce embeddings to list-of-floats:")
            traceback.print_exc()
            raise RuntimeError("Embedding conversion failed") from e

    if len(out) != len(texts):
        raise RuntimeError(f"Embedding length mismatch: got {len(out)} embeddings for {len(texts)} texts")

    return out

# get/create collection (keeps original safe logic)
def get_or_create_collection_safe(name: str):
    safe = name.replace(" ", "_")[:128]
    try:
        return chroma_client.get_collection(name=safe)
    except Exception:
        try:
            return chroma_client.create_collection(name=safe)
        except TypeError:
            return chroma_client.create_collection(safe)

# Add docs defensively — uses the safe embedding fn and validates types/shapes
def add_documents_to_collection_safe(collection_name: str, texts: List[str], metadatas: List[dict]):
    col = get_or_create_collection_safe(collection_name)

    # compute embeddings (safe)
    embs = embed_texts_sentence_transformer_safe(texts)

    # Debug info
    print(f"[DEBUG] Adding to collection '{collection_name}': {len(texts)} docs; embeddings: {len(embs)} vectors; vector-dim: {len(embs[0]) if embs else 0}")

    # ensure correct types: list of lists of floats
    if not isinstance(embs, list) or not all(isinstance(v, list) for v in embs):
        raise TypeError("Embeddings must be a list of lists of floats")

    embs = [[float(x) for x in vec] for vec in embs]

    if len(embs) != len(texts):
        raise RuntimeError(f"Number of embeddings ({len(embs)}) != number of texts ({len(texts)})")

    # ids and metadata length align
    ids = [f"{uuid.uuid4().hex}_{i}" for i in range(len(texts))]
    if len(metadatas) != len(texts):
        if len(metadatas) < len(texts):
            metadatas = metadatas + [{}] * (len(texts) - len(metadatas))
        else:
            metadatas = metadatas[:len(texts)]

    # call Chroma.add (handle named and positional signatures)
    try:
        col.add(ids=ids, documents=texts, metadatas=metadatas, embeddings=embs)
    except TypeError:
        try:
            col.add(ids, texts, metadatas, embs)
        except Exception as e:
            print("Failed to add documents using positional args; traceback:")
            traceback.print_exc()
            raise RuntimeError("Chroma .add failed (positional fallback)") from e
    except Exception:
        print("Chroma .add raised an exception; traceback:")
        traceback.print_exc()
        raise

    # persist if supported
    try:
        chroma_client.persist()
    except Exception:
        pass

    return len(texts)

# Normalizer and safe query (keep your previous robust code)
def _normalize_query_result(raw: Any) -> Dict[str, Any]:
    if isinstance(raw, dict):
        docs = raw.get("documents", [[]])
        metas = raw.get("metadatas", [[]])
        dists = raw.get("distances", [[]])
        if isinstance(docs, list) and len(docs) and isinstance(docs[0], list): docs = docs[0]
        if isinstance(metas, list) and len(metas) and isinstance(metas[0], list): metas = metas[0]
        if isinstance(dists, list) and len(dists) and isinstance(dists[0], list): dists = dists[0]
        return {"documents": docs, "metadatas": metas, "distances": dists}
    if hasattr(raw, "documents") or hasattr(raw, "metadatas"):
        docs = getattr(raw, "documents", [])
        metas = getattr(raw, "metadatas", [])
        dists = getattr(raw, "distances", [])
        if isinstance(docs, list) and len(docs) and isinstance(docs[0], list): docs = docs[0]
        if isinstance(metas, list) and len(metas) and isinstance(metas[0], list): metas = metas[0]
        if isinstance(dists, list) and len(dists) and isinstance(dists[0], list): dists = dists[0]
        return {"documents": docs, "metadatas": metas, "distances": dists}
    if isinstance(raw, list):
        if len(raw) == 1 and isinstance(raw[0], dict): return _normalize_query_result(raw[0])
        if len(raw) >= 1 and isinstance(raw[0], list):
            docs = raw[0]
            metas = raw[1] if len(raw) > 1 else []
            dists = raw[2] if len(raw) > 2 else []
            if len(docs) and isinstance(docs[0], list): docs = docs[0]
            if len(metas) and isinstance(metas[0], list): metas = metas[0]
            if len(dists) and isinstance(dists[0], list): dists = dists[0]
            return {"documents": docs, "metadatas": metas, "distances": dists}
    return {"documents": [], "metadatas": [], "distances": []}

def query_collection_safe(collection_name: str, query_embedding: List[float], n_results: int = 4, include: list = ["documents","metadatas","distances"]):
    try:
        col = chroma_client.get_collection(collection_name)
    except Exception:
        raise RuntimeError(f"Collection '{collection_name}' not found")
    try:
        raw = col.query(query_embeddings=[query_embedding], n_results=n_results, include=include)
    except TypeError:
        raw = col.query([query_embedding], n_results)
    norm = _normalize_query_result(raw)
    return norm


# **Save uploaded file robustly and ingest (handles Gradio shapes)**

In [None]:
# 6/10 - Save uploaded input robustly and ingest
def _save_gradio_input_to_path(file_input) -> Optional[str]:
    try:
        # path string
        if isinstance(file_input, str) and Path(file_input).exists():
            return str(Path(file_input).resolve())
        # gradio dict {"name": "/tmp/..."}
        if isinstance(file_input, dict) and "name" in file_input:
            p = Path(file_input["name"])
            if p.exists(): return str(p.resolve())
        # tuple (filename, bytes)
        if isinstance(file_input, (list, tuple)) and len(file_input) == 2 and isinstance(file_input[1], (bytes, bytearray)):
            out_dir = Path("uploads"); out_dir.mkdir(parents=True, exist_ok=True)
            out_path = out_dir / Path(file_input[0]).name
            out_path.write_bytes(file_input[1])
            return str(out_path.resolve())
        # file-like object
        if hasattr(file_input, "read"):
            fname = getattr(file_input, "name", f"uploaded_{uuid.uuid4().hex}.bin")
            out_dir = Path("uploads"); out_dir.mkdir(parents=True, exist_ok=True)
            out_path = out_dir / Path(fname).name
            try:
                file_input.seek(0)
            except:
                pass
            data = file_input.read()
            if isinstance(data, str):
                data = data.encode("utf-8", errors="ignore")
            with open(out_path, "wb") as f:
                f.write(data)
            return str(out_path.resolve())
        return None
    except Exception as e:
        print("Error saving upload:", e); traceback.print_exc()
        return None

def ingest_file_safe(file_input, collection_name: Optional[str] = None, use_token_chunking: bool = True):
    saved = _save_gradio_input_to_path(file_input)
    if not saved:
        return {"status":"error", "msg":"Could not save uploaded input. Received type: "+str(type(file_input))}
    if not Path(saved).exists():
        return {"status":"error", "msg":f"Saved path missing: {saved}"}
    text = extract_text_from_path(saved)
    if not text or not text.strip():
        return {"status":"error", "msg":"No text extracted (maybe scanned PDF)."}
    col_name = collection_name or Path(saved).stem
    chunks = chunk_text(text, use_token=use_token_chunking)
    if not chunks:
        return {"status":"error", "msg":"Chunking produced zero chunks"}
    metas = [{"source": Path(saved).name, "chunk_index": i} for i in range(len(chunks))]
    try:
        n = add_documents_to_collection_safe(col_name, chunks, metas)
    except Exception as e:
        tb = traceback.format_exc()
        return {"status":"error", "msg":"Failed to add to Chroma: "+str(e), "traceback": tb}
    return {"status":"ok", "collection": col_name, "n_chunks": n, "saved_path": saved}


# **RAG answer using local HF model (flan-t5-small)**

In [None]:
# 7/10 - RAG answer (defensive) + HF generation helper (replacement for previous Step 7)
import numpy as np
import traceback

# keep your HF generation helper (unchanged)
def hf_generate_answer(prompt: str, max_new_tokens: int = 256, temperature: float = 0.0):
    # For text2text pipeline (flan-t5), use "max_new_tokens" param
    out = gen_pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False, num_return_sequences=1, temperature=temperature)
    if isinstance(out, list) and len(out) and isinstance(out[0], dict):
        return out[0].get("generated_text", "").strip()
    return str(out)

# SAFE embedding: always returns List[List[float]] of native Python floats
def embed_texts_sentence_transformer_safe(texts):
    if not texts:
        return []
    try:
        # prefer numpy output
        try:
            embs = sbert.encode(texts, show_progress_bar=False, convert_to_numpy=True)
        except TypeError:
            embs = sbert.encode(texts, show_progress_bar=False)
    except Exception as e:
        raise RuntimeError(f"sbert.encode failed: {e}")

    arr = np.asarray(embs)
    if arr.ndim == 1:
        arr = np.expand_dims(arr, axis=0)

    out = []
    for row in arr:
        out.append([float(x) for x in np.asarray(row).reshape(-1).tolist()])

    if len(out) != len(texts):
        raise RuntimeError(f"Embedding count mismatch: {len(out)} embeddings for {len(texts)} texts")
    return out

# ensure vector(s) are Python floats (1D -> list, 2D -> list-of-lists)
def _ensure_vector_python_floats(vec):
    a = np.asarray(vec)
    if a.ndim == 1:
        return [float(x) for x in a.tolist()]
    elif a.ndim == 2:
        return [[float(x) for x in row.tolist()] for row in a]
    else:
        raise RuntimeError(f"Unsupported embedding ndim: {a.ndim}")

# Defensive query wrapper for Chroma: coerces types and tries multiple query signatures
def query_collection_defensive(collection_name: str, q_emb, n_results: int = 4, include=["documents","metadatas","distances"]):
    try:
        coerced = _ensure_vector_python_floats(q_emb)
    except Exception as e:
        raise RuntimeError(f"Failed to coerce query embedding: {e}")

    # ensure outer list for query_embeddings arg
    if isinstance(coerced[0], list):
        q_arg = coerced
    else:
        q_arg = [coerced]

    try:
        col = chroma_client.get_collection(collection_name)
    except Exception as e:
        raise RuntimeError(f"Collection '{collection_name}' not found: {e}")

    last_exc = None
    # try modern named-args
    try:
        raw = col.query(query_embeddings=q_arg, n_results=n_results, include=include)
        return _normalize_query_result(raw)
    except Exception as e1:
        last_exc = e1
        # try positional
        try:
            raw = col.query(q_arg, n_results)
            return _normalize_query_result(raw)
        except Exception as e2:
            last_exc = e2
            # try alternative kwarg
            try:
                raw = col.query(queries=q_arg, n_results=n_results, include=include)
                return _normalize_query_result(raw)
            except Exception as e3:
                last_exc = e3

    tb = traceback.format_exc()
    raise RuntimeError(f"All col.query attempts failed. Last exception: {last_exc}\nTrace:\n{tb}")

# RAG answer using safe embed + defensive query + HF generator
def rag_answer_hf_local_safe(collection_name: str, question: str, top_k: int = 4, max_tokens:int=256):
    # safe embed
    try:
        q_embs = embed_texts_sentence_transformer_safe([question])
        q_emb = q_embs[0]
    except Exception as e:
        return {"error": f"Failed to embed question: {e}"}

    # defensive retrieval
    try:
        res = query_collection_defensive(collection_name, q_emb, n_results=top_k)
    except Exception as e:
        return {"error": str(e)}

    docs = res.get("documents", []) or []
    metas = res.get("metadatas", []) or []

    # build context
    context_pieces = []
    for d, m in zip(docs, metas):
        src = m.get("source") if isinstance(m, dict) else str(m)
        idx = m.get("chunk_index") if isinstance(m, dict) else None
        header = f"[{src}{' | chunk '+str(idx) if idx is not None else ''}]"
        context_pieces.append(f"{header}\n{d}")
    context = "\n\n---\n\n".join(context_pieces)

    prompt = ("You are an expert assistant. Use ONLY the provided CONTEXT to answer the question. "
              "If the information is missing, say 'I don't know from the provided documents.'\n\n"
              f"CONTEXT:\n{context}\n\nQUESTION:\n{question}\n\nAnswer concisely and cite chunk indexes when you can.")

    # generate
    try:
        out = gen_pipe(prompt, max_new_tokens=max_tokens, do_sample=False, num_return_sequences=1)
        if isinstance(out, list) and len(out) and isinstance(out[0], dict):
            answer = out[0].get("generated_text","").strip()
        else:
            answer = str(out)
    except Exception as e:
        return {"error": f"Generation failed: {e}", "traceback": traceback.format_exc()}

    return {"answer": answer, "retrieved": [{"meta": m, "text": d} for m, d in zip(metas, docs)]}


# **Summarize a collection using HF local model**

In [None]:
# 8/10 - Summarize collection (defensive & safe) -- REPLACE your old Step 8 with this
import traceback

def summarize_collection_hf(collection_name: str, top_k: int = 6, max_tokens: int = 150):
    """
    Safe summarizer:
      - embeds a short representative query using safe SBERT wrapper
      - retrieves top_k chunks via defensive query (coerces embedding types)
      - concatenates chunks and asks the local HF generator to summarize
    """
    # verify collection exists
    try:
        _ = chroma_client.get_collection(collection_name)
    except Exception as e:
        return {"error": f"Collection not found: {e}"}

    # create representative embedding using the safe embedder
    try:
        q_embs = embed_texts_sentence_transformer_safe(["summarize this document"])
        q_emb = q_embs[0]
    except Exception as e:
        return {"error": f"Failed to embed summary-query: {e}"}

    # defensive retrieval (ensures proper float types & shape)
    try:
        # Use your defensive query wrapper (query_collection_defensive) if present,
        # otherwise fallback to query_collection_safe but ensure we pass plain floats
        if "query_collection_defensive" in globals():
            res = query_collection_defensive(collection_name, q_emb, n_results=top_k)
        else:
            # coerce embedding to plain floats and call existing safe query
            res = query_collection_safe(collection_name, q_emb, n_results=top_k)
    except Exception as e:
        return {"error": f"Retrieval failed: {e}", "traceback": traceback.format_exc()}

    docs = res.get("documents", []) or []
    if not docs:
        return {"summary": "", "n_chunks": 0}

    # combine documents (you may want to truncate if extremely long)
    combined = "\n\n---\n\n".join(docs)

    # build summarization prompt
    prompt = (
        "Summarize the following content into 4–6 concise bullet points (~80–150 words):\n\n"
        f"{combined}"
    )

    # call your HF generator (hf_generate_answer)
    try:
        summary_text = hf_generate_answer(prompt, max_new_tokens=max_tokens, temperature=0.2)
    except Exception as e:
        return {"error": f"Generation failed: {e}", "traceback": traceback.format_exc()}

    return {"summary": summary_text, "n_chunks": len(docs)}


In [None]:
import requests
from pathlib import Path

url = "https://raw.githubusercontent.com/sovandas4/Mental-Health/main/survey.csv"
out = Path("uploads/survey_github.csv")
out.parent.mkdir(parents=True, exist_ok=True)
r = requests.get(url, timeout=30)
r.raise_for_status()
out.write_bytes(r.content)
print("Downloaded to", out)


Downloaded to uploads/survey_github.csv


In [None]:
# Ingest by file path (recommended if you saved in uploads/)
file_path = "uploads/survey_github.csv"   # change to your saved path or uploads/Invoice.txt
collection_name = "survey_github"         # or "" to default to filename
use_token_chunking = True                 # True if tiktoken available, else False

# If you have ingest_file_safe that accepts path/string, use it:
try:
    res = ingest_file_safe(file_path, collection_name=collection_name, use_token_chunking=use_token_chunking)
except TypeError:
    # ingest_file_safe might expect a file-like; use direct flow below
    text = extract_text_from_path(file_path)
    chunks = chunk_text(text, use_token=use_token_chunking)
    metas = [{"source": Path(file_path).name, "chunk_index": i} for i in range(len(chunks))]
    n = add_documents_to_collection_safe(collection_name or Path(file_path).stem, chunks, metas)
    res = {"status":"ok", "collection": collection_name or Path(file_path).stem, "n_chunks": n}
print(res)


[DEBUG] Adding to collection 'survey_github': 207 docs; embeddings: 207 vectors; vector-dim: 384
{'status': 'ok', 'collection': 'survey_github', 'n_chunks': 207, 'saved_path': '/content/uploads/survey_github.csv'}


In [None]:
print("Collections:", list_collections_safe())
# or simple:
cols = chroma_client.list_collections()
print([ (c["name"] if isinstance(c, dict) and "name" in c else getattr(c,"name",str(c))) for c in cols ])


Collections: {'collections': ['invoice', 'cons_excp_details', 'survey_github', 'SOVAN_DAS', 'Portal_login', 'mental_health_survey', 'Invoice', 'test_sample']}
['invoice', 'cons_excp_details', 'survey_github', 'SOVAN_DAS', 'Portal_login', 'mental_health_survey', 'Invoice', 'test_sample']


In [None]:
collection = "survey_github"   # change to your collection
# Try direct get (some chorma clients support .get)
try:
    col = chroma_client.get_collection(collection)
    raw = col.get(include=["documents","metadatas"])
    print("col.get() raw:", raw)
except Exception:
    # fallback to a small query using a sample embedding
    q_emb = embed_texts_sentence_transformer_safe(["what is this document about?"])[0]
    norm = query_collection_defensive(collection, q_emb, n_results=10)
    for i, (m, d) in enumerate(zip(norm["metadatas"], norm["documents"])):
        print(f"--- chunk {i} meta={m} ---\n{d[:400]}\n")


col.get() raw: {'ids': ['450441ec986746e7a5eed7dea4695c1b_0', '748d4732340d4a0e9643b6f6f2071048_1', '4c2f4008da5847389c0434778cb1a54f_2', 'a8345ec17be842f3b7d682e594a09d1d_3', '746262d3ac9247e4a9cd1a3bcbeed0f2_4', '27068b91e1fc4556bccaa6d579d94e61_5', '547cb1e632204690a808649575cd8967_6', '66c4925cc1f442bbb84cd69b81f6f758_7', '3dca13f995b34f998713f64111fc3dc8_8', 'cc3ba83c6ed54b8ba1a75b964948f8e5_9', '580706bd0eb740b98d360092a00d9ab0_10', 'bcabf5fdd4f748f1b61a4a856d462c6e_11', '6dc8ea815063460b8ef92509fd62da18_12', '3fab2f74f9b642aea7e8796a69afc024_13', '0f46e94c3bf742a485fb53bb14bc68df_14', '9f2fe3f849834b9b958683352ea76f64_15', '12880fd224b84f0fa0bbf64e659ff9db_16', '42b428918713467cba2f0e71036817a6_17', '41fc826641c84ed08d59c8552bbb7f6f_18', '8fa05096bea845dfa524b6219071ffdd_19', '20239f686d63486c983556c31d9cd484_20', '38a9381b022a4a29859dc9c3deca2955_21', 'aec47119a1ba43bfb903613e6caecb88_22', 'ee0cf4b0bc7447728f11f59bfe8a5c39_23', 'ccb54f739675495b837a916b1f155d29_24', 'c0288bbc20

In [None]:
print(summarize_collection_hf("survey_github", top_k=6, max_tokens=150))


{'summary': "For example, if you're a woman, you're not a woman, you're not a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are a woman, you are", 'n_chunks': 6}


In [None]:
collection = "survey_github"   # replace with your collection name
tests = [
  "Summarize the document in 5 bullet points.",
  "What columns are present in the survey?",
  "How many rows / respondents (approx)?",
  "List any question names that mention 'stress' or 'anxiety'."
]

for q in tests:
    print("QUESTION:", q)
    out = rag_answer_hf_local_safe(collection, q, top_k=6, max_tokens=200)
    print("ANSWER:", out.get("answer") or out)
    print("-"*80)


QUESTION: Summarize the document in 5 bullet points.
ANSWER: [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [survey_github.csv | chunk 62] --- [sur
--------------------------------------------------------------------------------
QUESTION: What columns are present in the survey?
ANSWER: [survey_github.csv | chunk 121] -08-28 11:05:00 | 44 | m | United States | MO | No | No | No | No | Some of them | No | No | No | No | Some of them | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | 

In [None]:
res = rag_answer_hf_local_safe("survey_github", "What columns are present?", top_k=6)
print("ANSWER:\n", res.get("answer"))
print("\nRETRIEVED CHUNKS:")
for item in res.get("retrieved", []):
    print(item["meta"])
    print(item["text"][:600].replace("\n"," ") + "...")
    print("-----")


ANSWER:
 [surveygithub.csv | chunk 118] 08-28 10:42:09 | 33 | Male | United Kingdom | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No | No

RETRIEVED CHUNKS:
{'source': 'survey_github.csv', 'chunk_index': 93}
NA | No | No | No | Never | 26-100 | Yes | Yes | No | Yes | No | No | Don't know | Don't know | No | No | No | No | No | Maybe | No | No | NA 2014-08-27 20:52:31 | 37 | Male | United States | PA | No | Yes | No | Never | 26-100 | No | Yes | Don't know | No | No | Don't know | Don'

# **Gradio UI (run — launches interactive UI)**

In [None]:
# 9/10 - Gradio UI (final) — dropdown, refresh, auto-fill ingest, show retrieved chunks
import gradio as gr
import traceback

# Helper to safely get collection names
def _get_collection_choices():
    try:
        res = list_collections_safe()
        if isinstance(res, dict) and "collections" in res:
            return res["collections"]
        # fallback if different shape
        if isinstance(res, list):
            return res
        return []
    except Exception:
        try:
            raw = chroma_client.list_collections()
            names = []
            for c in raw:
                if isinstance(c, dict) and "name" in c:
                    names.append(c["name"])
                elif hasattr(c, "name"):
                    names.append(getattr(c, "name"))
                else:
                    names.append(str(c))
            return names
        except Exception:
            return []

# UI
with gr.Blocks() as demo:
    gr.Markdown("## RAG + Chroma (Hugging Face local) — Gradio demo\nUpload a file → index → ask questions. Uses local HF generator and Chroma vector DB.")
    with gr.Row():
        with gr.Column(scale=2):
            file_in = gr.File(label="Upload file (PDF/TXT/CSV)")
            col_name_in = gr.Textbox(label="Collection name (optional)", placeholder="Default: filename without ext")
            token_chunk_checkbox = gr.Checkbox(label="Use token-aware chunking (tiktoken)", value=_has_tiktoken if '_has_tiktoken' in globals() else False)
            ingest_btn = gr.Button("Ingest file")
            ingest_output = gr.JSON(label="Ingest result")
            gr.Markdown("**Quick actions:** use List / Refresh to populate the dropdown, or ingest a file to auto-fill.")
        with gr.Column(scale=1):
            list_btn = gr.Button("List collections")
            collections_out = gr.Textbox(label="Collections (comma-separated)")
            refresh_btn = gr.Button("Refresh dropdown")
            collections_dropdown = gr.Dropdown(choices=_get_collection_choices(), label="Select collection", interactive=True)

    gr.Markdown("---")
    with gr.Row():
        with gr.Column():
            ask_collection = gr.Dropdown(choices=_get_collection_choices(), label="Collection to query (select or ingest to auto-fill)")
            question = gr.Textbox(label="Question", placeholder="Ask something about the uploaded document...")
            ask_btn = gr.Button("Ask")
            answer_out = gr.Markdown("")  # show human-readable answer
            raw_json_out = gr.JSON(label="Answer (raw JSON, with retrieved chunks)")
        with gr.Column():
            summ_collection = gr.Dropdown(choices=_get_collection_choices(), label="Collection to summarize")
            summ_btn = gr.Button("Summarize")
            summ_out = gr.Textbox(label="Summary (text)")

    # ----- Callbacks -----
    def do_ingest_and_return_collection(file, col_name, use_token_chunking):
        if file is None:
            return ({"status":"error","msg":"No file supplied"}, "")
        try:
            res = ingest_file_safe(file, collection_name=col_name, use_token_chunking=use_token_chunking)
        except Exception as e:
            return ({"status":"error","msg": str(e), "traceback": traceback.format_exc()}, "")
        # auto-fill collection name if available
        collection_to_fill = res.get("collection") or col_name or ""
        return (res, collection_to_fill)

    def gr_list_collections():
        try:
            names = _get_collection_choices()
            return ", ".join(names) if names else "No collections"
        except Exception as e:
            return f"Error listing collections: {e}"

    def gr_refresh_dropdown():
        # returns new choices for Dropdown components
        try:
            choices = _get_collection_choices()
            return choices, choices, choices  # for ask_collection, summ_collection, collections_dropdown
        except Exception as e:
            return [], [], []

    def do_ask_button(collection, q):
        # returns (answer_markdown, raw_json)
        if not collection:
            return "⚠️ collection required", {"error":"collection required"}
        try:
            res = rag_answer_hf_local_safe(collection, q, top_k=6, max_tokens=256)
            if isinstance(res, dict) and "error" in res:
                return f"⚠️ Error: {res.get('error')}", res
            # build pretty markdown for answer + retrieved chunks
            answer_text = res.get("answer", "")
            retrieved = res.get("retrieved", [])
            md = f"### Answer\n\n{answer_text}\n\n---\n\n### Retrieved chunks (source | chunk_index)\n"
            if retrieved:
                for i, item in enumerate(retrieved):
                    meta = item.get("meta", {})
                    text = item.get("text", "")
                    src = meta.get("source") if isinstance(meta, dict) else str(meta)
                    idx = meta.get("chunk_index") if isinstance(meta, dict) else None
                    header = f"- **{src}**" + (f" (chunk {idx})" if idx is not None else "")
                    snippet = text.replace("\n"," ")[:600]
                    md += f"{header}\n\n```\n{snippet}...\n```\n\n"
            else:
                md += "No retrieved chunks.\n"
            return md, res
        except Exception as e:
            return f"⚠️ Unexpected error: {e}", {"error": str(e), "traceback": traceback.format_exc()}

    def do_summary_button(collection):
        if not collection:
            return "collection required"
        try:
            s = summarize_collection_hf(collection, top_k=6, max_tokens=150)
            if isinstance(s, dict) and s.get("error"):
                return f"Error: {s.get('error')}"
            if isinstance(s, dict):
                return s.get("summary", "")
            return str(s)
        except Exception as e:
            return f"Error: {e}\n\n{traceback.format_exc()}"

    # ----- Bindings -----
    ingest_btn.click(
        fn=do_ingest_and_return_collection,
        inputs=[file_in, col_name_in, token_chunk_checkbox],
        outputs=[ingest_output, ask_collection]   # auto-fill ask_collection dropdown value
    )

    list_btn.click(fn=gr_list_collections, inputs=None, outputs=collections_out)
    refresh_btn.click(fn=gr_refresh_dropdown, inputs=None, outputs=[ask_collection, summ_collection, collections_dropdown])

    ask_btn.click(fn=do_ask_button, inputs=[ask_collection, question], outputs=[answer_out, raw_json_out])
    summ_btn.click(fn=do_summary_button, inputs=[summ_collection], outputs=summ_out)

# Launch the UI
demo.launch(share=True, inbrowser=False)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://1cbe0bc80ba9093a73.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:
# 9/10 - Chat-style Gradio UI (corrected: no .style() on Chatbot)
import gradio as gr
import traceback
from html import escape

# Helper: get collection choices robustly (reuse your existing list_collections_safe if present)
def _get_collection_choices():
    try:
        res = list_collections_safe()
        if isinstance(res, dict) and "collections" in res:
            return res["collections"]
        if isinstance(res, list):
            return res
        return []
    except Exception:
        try:
            raw = chroma_client.list_collections()
            names = []
            for c in raw:
                if isinstance(c, dict) and "name" in c:
                    names.append(c["name"])
                elif hasattr(c, "name"):
                    names.append(getattr(c, "name"))
                else:
                    names.append(str(c))
            return names
        except Exception:
            return []

# Preset questions
PRESETS = [
    ("Summarize (5 bullets)", "Summarize the document in 5 concise bullet points."),
    ("Key facts", "List the key facts and figures in the document."),
    ("Who & When", "Who is the author/vendor and what is the date?"),
    ("Total / Amount", "What is the total amount due?"),
    ("Columns (survey)", "What columns are present in the survey?")
]

# Build UI
with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column(scale=3):
            gr.Markdown("## 📚 RAG Chat — Ask your uploaded documents")
            chat = gr.Chatbot(elem_id="chatbot", label="Conversation", height=580)
            with gr.Row():
                txt = gr.Textbox(show_label=False, placeholder="Type your question here (press Enter to send)")
                send = gr.Button("Send")
            with gr.Row():
                show_sources = gr.Checkbox(value=True, label="Show retrieved sources")
                topk_slider = gr.Slider(minimum=1, maximum=8, value=6, step=1, label="Top-k retrieved chunks")
        with gr.Column(scale=1):
            gr.Markdown("### ⚙️ Document & Collections")
            file_in = gr.File(label="Upload (PDF/TXT/CSV)")
            col_name_in = gr.Textbox(label="Collection name (optional)")
            token_chunk_checkbox = gr.Checkbox(label="Use token-aware chunking (tiktoken)", value=_has_tiktoken if '_has_tiktoken' in globals() else False)
            ingest_btn = gr.Button("Ingest and Index", variant="primary")
            ingest_result = gr.Textbox(label="Ingest result", interactive=False)
            gr.Markdown("---")
            collection_dropdown = gr.Dropdown(choices=_get_collection_choices(), label="Collection", interactive=True)
            refresh_btn = gr.Button("Refresh collections")
            list_out = gr.Textbox(label="Detected collections", interactive=False)
            gr.Markdown("---")
            gr.Markdown("### Presets")
            preset_buttons = []
            for label, _ in PRESETS:
                preset_buttons.append(gr.Button(label))
            gr.Markdown("---")
            summ_btn = gr.Button("Summarize Collection")
            summ_out = gr.Textbox(label="Summary result", interactive=False)

    # hidden JSON output for debugging / programmatic uses
    raw_json = gr.JSON(visible=False)

    # helpers
    def refresh_collections():
        choices = _get_collection_choices()
        list_str = ", ".join(choices) if choices else "No collections"
        return choices, list_str

    def ingest_and_fill(file, col_name, use_token_chunking):
        if not file:
            return {"status":"error","msg":"No file provided"}, ""
        try:
            result = ingest_file_safe(file, collection_name=col_name, use_token_chunking=use_token_chunking)
        except Exception as e:
            return {"status":"error","msg": str(e), "traceback": traceback.format_exc()}, ""
        collection_to_fill = result.get("collection") or col_name or ""
        msg = f"Status: {result.get('status')} | Collection: {collection_to_fill} | Chunks: {result.get('n_chunks')}"
        return msg, collection_to_fill

    def _append_message(history, role, text, sources=None):
        if role == "user":
            return history + [[text, None]]
        else:
            md = text
            if sources and len(sources) > 0:
                src_md = "\n\n**Sources:**\n"
                for m in sources:
                    meta = m.get("meta", {}) if isinstance(m, dict) else {}
                    src_name = meta.get("source") if isinstance(meta, dict) else str(meta)
                    idx = meta.get("chunk_index") if isinstance(meta, dict) else None
                    snippet = m.get("text","")[:300].replace("\n"," ")
                    src_md += f"- {src_name}" + (f" (chunk {idx})" if idx is not None else "") + f": `{escape(snippet)}...`\n"
                md += "\n\n---\n" + src_md
            # find last empty assistant slot and fill
            for i in range(len(history)-1, -1, -1):
                user_msg, assistant_msg = history[i]
                if assistant_msg is None:
                    history[i][1] = md
                    return history
            return history + [["", md]]

    def chat_send(history, collection, question_text, show_sources_val, top_k):
        if not collection:
            return history, "⚠️ Please select a collection from the sidebar.", {}
        if not question_text or question_text.strip() == "":
            return history, "⚠️ Please type a question.", {}
        # append user
        history = _append_message(history, "user", question_text)
        try:
            res = rag_answer_hf_local_safe(collection, question_text, top_k=int(top_k), max_tokens=256)
            if isinstance(res, dict) and res.get("error"):
                return history, f"⚠️ Error: {res.get('error')}", res
            answer_text = res.get("answer","")
            retrieved = res.get("retrieved", [])
            history = _append_message(history, "assistant", answer_text, sources=retrieved if show_sources_val else [])
            return history, "", res
        except Exception as e:
            return history, f"⚠️ Unexpected error: {e}", {"error": str(e), "traceback": traceback.format_exc()}

    def handle_preset_click(history, collection, show_sources_val, top_k, preset_index):
        if not collection:
            return history, "⚠️ Please select a collection first.", {}
        _, preset_q = PRESETS[preset_index]
        return chat_send(history, collection, preset_q, show_sources_val, top_k)

    def do_summarize(collection):
        if not collection:
            return "collection required", {}
        try:
            s = summarize_collection_hf(collection, top_k=6, max_tokens=150)
            if isinstance(s, dict) and s.get("error"):
                return f"Error: {s.get('error')}", s
            summary_text = s.get("summary") if isinstance(s, dict) else str(s)
            return summary_text, s
        except Exception as e:
            return f"Error: {e}\n\n{traceback.format_exc()}", {"error": str(e)}

    # Bindings
    refresh_btn.click(fn=refresh_collections, inputs=None, outputs=[collection_dropdown, list_out])
    ingest_btn.click(fn=ingest_and_fill, inputs=[file_in, col_name_in, token_chunk_checkbox], outputs=[ingest_result, collection_dropdown])
    send.click(fn=chat_send, inputs=[chat, collection_dropdown, txt, show_sources, topk_slider], outputs=[chat, ingest_result, raw_json])
    txt.submit(fn=chat_send, inputs=[chat, collection_dropdown, txt, show_sources, topk_slider], outputs=[chat, ingest_result, raw_json])
    summ_btn.click(fn=do_summarize, inputs=[collection_dropdown], outputs=[summ_out, raw_json])

    # Bind preset buttons
    for i, btn in enumerate(preset_buttons):
        # create a stable lambda capturing i
        btn.click(fn=lambda history, c, s, k, idx=i: handle_preset_click(history, c, s, k, idx),
                  inputs=[chat, collection_dropdown, show_sources, topk_slider],
                  outputs=[chat, ingest_result, raw_json])

# Launch the app
demo.launch(share=True, inbrowser=False)


  chat = gr.Chatbot(elem_id="chatbot", label="Conversation", height=580)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://1c2859f3a040e1276d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:
# Card-Based Workspace Gradio UI (ready to paste as Step 9)
import gradio as gr
import traceback
import time
import json
from html import escape
from pathlib import Path
import tempfile

# Helper to get collection choices (tries list_collections_safe then fallback)
def _get_collection_choices():
    try:
        res = list_collections_safe()
        if isinstance(res, dict) and "collections" in res:
            return res["collections"]
        if isinstance(res, list):
            return res
    except Exception:
        pass
    try:
        raw = chroma_client.list_collections()
        names = []
        for c in raw:
            if isinstance(c, dict) and "name" in c:
                names.append(c["name"])
            elif hasattr(c, "name"):
                names.append(getattr(c, "name"))
            else:
                names.append(str(c))
        return names
    except Exception:
        return []

# Utility: render cards list (simple HTML)
def render_cards_html(cards):
    html = """
    <style>
      .card-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(320px, 1fr)); gap: 12px; }
      .card { background: white; color:#0b1220; border-radius:10px; padding:12px; box-shadow:0 4px 12px rgba(0,0,0,0.08); }
      .card h4 { margin:0 0 8px 0; font-size:16px; }
      .card .meta { font-size:12px; color:#616874; margin-bottom:8px; }
      .snippet { font-size:14px; color:#111827; margin-bottom:8px; white-space:pre-wrap; }
      .sources pre { background:#f6f8fa; padding:8px; border-radius:6px; overflow:auto; max-height:140px; }
      .btn { display:inline-block; padding:6px 10px; border-radius:6px; background:#2563eb; color:white; text-decoration:none; margin-right:6px; font-size:13px; }
    </style>
    <div class="card-grid">
    """
    for i, c in enumerate(reversed(cards)):  # newest first
        q = escape(c.get("question",""))
        a = escape(c.get("answer",""))
        ts = c.get("ts","")
        coll = escape(c.get("collection",""))
        snippet = (a[:500] + "...") if len(a) > 500 else a
        html += f'''
          <div class="card">
            <h4>Q: {q}</h4>
            <div class="meta">Collection: <strong>{coll}</strong> • {ts}</div>
            <div class="snippet">{snippet}</div>
            <details class="sources">
              <summary>Show retrieved sources ({len(c.get("retrieved",[]))})</summary>
              <div>
        '''
        for src in c.get("retrieved", []):
            meta = src.get("meta", {})
            src_name = escape(meta.get("source","unknown"))
            idx = meta.get("chunk_index", None)
            text = escape(src.get("text",""))[:800]
            idx_part = f" (chunk {idx})" if idx is not None else ""
            html += f"<div style='margin:8px 0;'><strong>{src_name}{idx_part}</strong><pre>{text}...</pre></div>"
        html += """
              </div>
            </details>
          </div>
        """
    html += "</div>"
    return html

# Core UI
with gr.Blocks() as demo:
    gr.Markdown("## 📦 Card Workspace — RAG Assistant\nUpload files, index them, and get answers displayed as cards you can inspect and export.")
    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### Upload & Collections")
            file_in = gr.File(label="Upload file (PDF/TXT/CSV)", file_count="single")
            col_name_in = gr.Textbox(label="Collection name (optional)")
            token_chunk_checkbox = gr.Checkbox(label="Use token-aware chunking (tiktoken)", value=_has_tiktoken if '_has_tiktoken' in globals() else False)
            ingest_btn = gr.Button("Ingest & Index", variant="primary")
            ingest_out = gr.JSON(label="Ingest result")
            gr.Markdown("---")
            gr.Markdown("### Collections")
            collections_dropdown = gr.Dropdown(choices=_get_collection_choices(), label="Select collection")
            refresh_btn = gr.Button("Refresh")
            collections_list_text = gr.Textbox(label="Collections (debug)", interactive=False)
            gr.Markdown("---")
            gr.Markdown("### Summarize")
            summ_btn = gr.Button("Summarize collection")
            summ_out = gr.Textbox(label="Summary", interactive=False)
            gr.Markdown("---")
            gr.Markdown("### Export")
            export_last_btn = gr.Button("Export last answer (.txt)")
            export_file = gr.File(label="Download exported answer", visible=False)

        with gr.Column(scale=2):
            gr.Markdown("### Ask (or use presets)")
            ask_collection = gr.Dropdown(choices=_get_collection_choices(), label="Query collection")
            question = gr.Textbox(label="Question", placeholder="Type a question about the selected collection...")
            ask_btn = gr.Button("Ask")
            gr.Markdown("### Presets")
            with gr.Row():
                preset_summ = gr.Button("Summarize (5 bullets)")
                preset_key = gr.Button("Key facts")
                preset_columns = gr.Button("Columns (survey)")
            gr.Markdown("---")
            gr.Markdown("### Cards")
            cards_html = gr.HTML(render_cards_html([]))
            # invisible state to hold cards list
            cards_state = gr.State([])

    # Hidden json output for debug
    raw_json = gr.JSON(visible=False)

    # Callbacks
    def refresh_collections():
        choices = _get_collection_choices()
        return choices, ", ".join(choices) if choices else "No collections"

    def ingest_and_return(file, col_name, use_token_chunking):
        if not file:
            return {"status":"error","msg":"No file provided"}, None, {}
        try:
            res = ingest_file_safe(file, collection_name=col_name, use_token_chunking=use_token_chunking)
        except Exception as e:
            return {"status":"error","msg": str(e), "traceback": traceback.format_exc()}, None, {}
        coll = res.get("collection") or col_name or ""
        # after ingest, refresh collection choices
        choices = _get_collection_choices()
        return res, choices, {"collection": coll, "n_chunks": res.get("n_chunks", 0)}

    def summarize_collection(collection):
        if not collection:
            return "collection required", {}
        try:
            s = summarize_collection_hf(collection, top_k=6, max_tokens=150)
            if isinstance(s, dict) and s.get("error"):
                return f"Error: {s.get('error')}", s
            return s.get("summary") if isinstance(s, dict) else str(s), s
        except Exception as e:
            return f"Error: {e}", {"error": str(e)}

    def ask_and_add_card(cards, collection, q_text, top_k=6):
        if not collection:
            return cards, "Please select a collection (left)", {}
        if not q_text or not q_text.strip():
            return cards, "Please type a question", {}
        try:
            res = rag_answer_hf_local_safe(collection, q_text, top_k=top_k, max_tokens=256)
            if isinstance(res, dict) and res.get("error"):
                return cards, f"Error: {res.get('error')}", res
            # build card record
            card = {
                "question": q_text,
                "answer": res.get("answer",""),
                "retrieved": res.get("retrieved", []),
                "collection": collection,
                "ts": time.strftime("%Y-%m-%d %H:%M:%S")
            }
            cards = cards + [card]
            html = render_cards_html(cards)
            return cards, html, res
        except Exception as e:
            return cards, f"Unexpected error: {e}", {"error": str(e), "traceback": traceback.format_exc()}

    def export_last(cards):
        if not cards:
            return None, {"error":"no cards to export"}
        last = cards[-1]
        content = f"Question: {last.get('question')}\n\nAnswer:\n{last.get('answer')}\n\nRetrieved sources:\n"
        for r in last.get("retrieved", []):
            meta = r.get("meta", {})
            source = meta.get("source") if isinstance(meta, dict) else str(meta)
            idx = meta.get("chunk_index") if isinstance(meta, dict) else None
            content += f"- {source}" + (f" (chunk {idx})" if idx is not None else "") + "\n"
            content += r.get("text","") + "\n\n"
        # write to a temp file
        tf = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
        tf.write(content.encode("utf-8"))
        tf.flush()
        tf.close()
        return tf.name, {"ok": True}

    # Bindings
    refresh_btn.click(fn=refresh_collections, inputs=None, outputs=[collections_dropdown, collections_list_text])
    ingest_btn.click(fn=ingest_and_return, inputs=[file_in, col_name_in, token_chunk_checkbox], outputs=[ingest_out, collections_dropdown, raw_json])
    summ_btn.click(fn=summarize_collection, inputs=[collections_dropdown], outputs=[summ_out, raw_json])

    # Ask / presets
    ask_btn.click(fn=ask_and_add_card, inputs=[cards_state, ask_collection, question], outputs=[cards_state, cards_html, raw_json])
    preset_summ.click(fn=lambda cards, coll: ask_and_add_card(cards, coll, "Summarize the document in 5 concise bullet points.", top_k=6), inputs=[cards_state, ask_collection], outputs=[cards_state, cards_html, raw_json])
    preset_key.click(fn=lambda cards, coll: ask_and_add_card(cards, coll, "List the key facts and figures in the document.", top_k=6), inputs=[cards_state, ask_collection], outputs=[cards_state, cards_html, raw_json])
    preset_columns.click(fn=lambda cards, coll: ask_and_add_card(cards, coll, "What columns are present in the survey?", top_k=6), inputs=[cards_state, ask_collection], outputs=[cards_state, cards_html, raw_json])

    export_last_btn.click(fn=export_last, inputs=[cards_state], outputs=[export_file, raw_json])

# Launch
demo.launch(share=True, inbrowser=False)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://f4efc683fd2f5bbe06.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


