<a href="https://colab.research.google.com/github/swago72/cx-knowledge-assistant/blob/main/cx_knowledge_assistant_week1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install langchain chromadb sentence-transformers beautifulsoup4 lxml -q

In [None]:
!pip install langchain-text-splitters -q

In [None]:
from google.colab import files

print("Select all 13 HTML files from your raw folder")
uploaded = files.upload()

print(f"\n✓ {len(uploaded)} files uploaded:")
for fname in uploaded.keys():
    print(f"  - {fname}")

In [None]:
from bs4 import BeautifulSoup
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import chromadb

# ── STEP 1: Load + clean HTML ───────────────────────────────────

def load_html_files(uploaded_files):
    documents = []
    for filename, content in uploaded_files.items():
        raw_html = content.decode("utf-8", errors="ignore")
        soup = BeautifulSoup(raw_html, "lxml")

        # Strip noise
        for tag in soup(["script", "style", "nav", "header",
                          "footer", "aside", "noscript", "iframe"]):
            tag.decompose()

        clean_text = soup.get_text(separator="\n", strip=True)
        lines = [l for l in clean_text.splitlines() if l.strip()]
        clean_text = "\n".join(lines)

        if len(clean_text) < 100:
            print(f"  ⚠ Skipping {filename} — too short")
            continue

        # Create a short clean name for citations
        short_name = filename.replace(" - Google Play Help.html", "")

        documents.append({
            "content":    clean_text,
            "source":     filename,
            "short_name": short_name,
            "chars":      len(clean_text)
        })
        print(f"  ✓ {short_name} ({len(clean_text):,} chars)")

    print(f"\nTotal documents loaded: {len(documents)}")
    return documents


# ── STEP 2: Chunk ───────────────────────────────────────────────

def chunk_documents(documents):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        separators=["\n\n", "\n", ".", " "]
    )

    chunks = []
    for doc in documents:
        texts = splitter.split_text(doc["content"])
        for i, text in enumerate(texts):
            # Create a safe ID — remove spaces and special chars
            safe_id = doc["source"].replace(" ", "_") \
                                   .replace("-", "_") \
                                   .replace(".", "_") \
                                   [:80]
            chunks.append({
                "text":       text,
                "source":     doc["source"],
                "short_name": doc["short_name"],
                "chunk_id":   f"{safe_id}_chunk_{i}",
                "chunk_num":  i,
                "total":      len(texts)
            })

    print(f"Total chunks created: {len(chunks)}")
    return chunks


# ── STEP 3: Embed + store ───────────────────────────────────────

def embed_and_store(chunks, collection_name="cx_knowledge_base"):
    print("Loading HuggingFace model (downloads once ~80MB)...")
    model = SentenceTransformer("all-MiniLM-L6-v2")
    print("✓ Model ready\n")

    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_or_create_collection(
        name=collection_name,
        metadata={"hnsw:space": "cosine"}
    )

    existing_ids = set(collection.get()["ids"])
    new_chunks = [c for c in chunks
                  if c["chunk_id"] not in existing_ids]

    if not new_chunks:
        print("Already embedded. Nothing to add.")
        return collection, model

    print(f"Embedding {len(new_chunks)} chunks...")
    batch_size = 50

    for i in range(0, len(new_chunks), batch_size):
        batch  = new_chunks[i:i + batch_size]
        texts  = [c["text"] for c in batch]
        embeds = model.encode(texts, show_progress_bar=False).tolist()

        collection.add(
            embeddings=embeds,
            documents=[c["text"] for c in batch],
            metadatas=[{
                "source":     c["source"],
                "short_name": c["short_name"],
                "chunk_num":  c["chunk_num"]
            } for c in batch],
            ids=[c["chunk_id"] for c in batch]
        )
        done = min(i + batch_size, len(new_chunks))
        print(f"  Stored {done}/{len(new_chunks)}")

    print(f"\n✓ ChromaDB contains {collection.count()} total chunks")
    return collection, model


# ── RUN ─────────────────────────────────────────────────────────

print("=" * 55)
print("  CX Knowledge Assistant — Ingestion Pipeline")
print("=" * 55 + "\n")

print("STEP 1: Loading HTML files...")
documents = load_html_files(uploaded)

print("\nSTEP 2: Chunking...")
chunks = chunk_documents(documents)

print("\nSTEP 3: Embedding and storing in ChromaDB...")
collection, embed_model = embed_and_store(chunks)

print("\n✅ Done. Knowledge base is ready for Week 2.")

In [None]:
def test_query(question, n=3):
    embedding = embed_model.encode(question).tolist()
    results = collection.query(
        query_embeddings=[embedding],
        n_results=n,
        include=["documents", "metadatas", "distances"]
    )

    print(f" ? '{question}'\n")
    for i, (doc, meta, dist) in enumerate(zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0]
    )):
        score = round((1 - dist) * 100, 1)
        print(f"  Result {i+1} — {meta['short_name']} "
              f"({score}% match)")
        print(f"  {doc[:180]}...")
        print()

# Run all 5 — one per issue type
test_query("How do I request a refund for an app?")
test_query("My subscription was charged after I cancelled it")
test_query("App is stuck on pending and won't download")
test_query("There's a charge on my account I don't recognize")
test_query("I was charged for an in-app purchase I didn't make")