User Question
    │
    ├─▶ Retriever (BM25 by default; FAISS+embeddings optional)
    │        ▲
    │        │    Synthetic Docs (manual pages)
    │        └──  Ingestion → Split → Index
    │
    ├─▶ Context Composer (combine top-k docs)
    │
    ├─▶ PromptTemplate (system + context + question + chat history)
    │
    ├─▶ LLM (offline Mock LLM or OpenAI if available)
    │
    └─▶ Output Parser + Source Attributions


Cell 1 — Dependency Install (if not prebuilt)

Install core libs and BM25:

langchain langchain-core langchain-community rank_bm25

Optional: faiss-cpu (vector index demo) and langchain-openai.

Operational note: Pin versions in enterprise repos (e.g., langchain==0.x.y, langchain-core==0.x.y) to avoid API drift (e.g., get_relevant_documents → .invoke() change).

In [7]:
# Cell 1
%pip -q install langchain langchain-core langchain-community rank_bm25 faiss-cpu --upgrade

# Optional online pieces (safe to skip if no API key / internet):
# %pip -q install langchain-openai sentence-transformers


Note: you may need to restart the kernel to use updated packages.




Cell 2 — Imports, Feature Flags, Constants

Imports:

Document, RecursiveCharacterTextSplitter

BM25Retriever

LCEL: RunnableLambda, RunnableParallel, RunnablePassthrough, ChatPromptTemplate, StrOutputParser

Memory: ChatMessageHistory, RunnableWithMessageHistory

Flags:

USE_OPENAI: bool (route to OpenAI or a deterministic mock)

USE_EMBEDDINGS: bool (kept False to avoid downloads)

TOP_K: int (retrieval fan-in; default 4)

Rationale: Feature flags make the graph stable; model providers change without touching chain topology.

In [8]:
# Cell 2
import os
from typing import List, Dict, Any

# Core LangChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document

# Community bits (retrievers, text splitters, vectorstores)
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import TextLoader
from langchain_community.callbacks.manager import get_openai_callback  # harmless if OpenAI not used
from langchain.text_splitter import RecursiveCharacterTextSplitter

# LLMs
from langchain_community.llms import FakeListLLM  # offline demo
# Optional: from langchain_openai import ChatOpenAI
# Optional: from langchain_openai import OpenAIEmbeddings
# Optional: from langchain_community.embeddings import HuggingFaceEmbeddings

# Conversation memory wrappers
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

# ---- DEMO TOGGLES ----
USE_OPENAI = False  # flip to True if you have OPENAI_API_KEY set
USE_EMBEDDINGS = False  # BM25 only by default to stay offline
TOP_K = 4


Cell 4 — Corpus Construction (Synthetic Manual)

Build 6 Documents with metadata={"section": "<name>"}:

intro, setup, grind, descale, troubleshoot, warranty

Text is domain-shaped and concise to avoid excessive chunking.

Contract: docs: List[Document]

In [9]:
# Cell 4
manual_sections = [
    ("intro", """
NovaPress 3000 Overview:
- 19-bar pump, 1.8L reservoir, PID temperature control.
- Modes: Espresso, Americano, Steam.
- Default grind range: 1 (fine) to 15 (coarse), factory at 7.
"""),
    ("setup", """
Setup Steps:
1) Rinse reservoir, fill with filtered water.
2) Run 'Prime' cycle without coffee.
3) Heat up until READY light.
4) Pull a blank shot to purge air.
"""),
    ("grind", """
Grind Guide:
- Espresso: 3–6; lighter roasts → finer; darker → coarser.
- Channeling fix: even tamp ~30 lbs, pre-infuse 3s.
- If shot <25s: grind finer; if >35s: grind coarser.
"""),
    ("descale", """
Descaling Procedure:
- Use citric acid solution 20g/L.
- Run 'Clean' cycle: 2x brew, 1x steam.
- Rinse thoroughly: 3 tanks of clean water.
- Frequency: every 8 weeks or 200 shots.
"""),
    ("troubleshoot", """
Troubleshooting:
- Bitter taste: too fine grind, too hot, over-extracted (>40s).
- Sour taste: too coarse, low temperature, under-extracted (<20s).
- Low pressure: clogged basket or pump prime needed.
"""),
    ("warranty", """
Warranty:
- 2 years parts & labor with proof of purchase.
- Excludes damage from unfiltered hard water or misuse.
- Support: support@novapress.example
"""),
]

docs = [
    Document(page_content=txt.strip(), metadata={"section": sec})
    for sec, txt in manual_sections
]

# Split for better retrieval granularity
splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
doc_chunks: List[Document] = splitter.split_documents(docs)
len(doc_chunks), doc_chunks[0].metadata


(6, {'section': 'intro'})

Cell 5 — Split & Index

Splitter: RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)

Produces smaller, overlapping chunks to increase retrieval recall near boundaries.

Index: BM25Retriever.from_documents(doc_chunks)

Set bm25.k = TOP_K.

Common tuning:

Too small chunk_size → fragmented context; too large → prompt bloat.

chunk_overlap should cover sentence bleed (20–60 chars typical).

In [10]:
# Cell 5
# BM25 (keyword) retriever – works fully offline
bm25 = BM25Retriever.from_documents(doc_chunks)
bm25.k = TOP_K

# Optional: vector index (only if you enable embeddings)
vector_retriever = None
if USE_EMBEDDINGS:
    # Pick one: OpenAI or local HF embeddings
    # embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    # OR offline-capable (requires model download first run):
    # embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    raise NotImplementedError("Flip USE_EMBEDDINGS and choose an embedding class above, then remove this line.")


In [11]:
# ✅ Cell 6 — robust retriever (handles str or dict) + modern API

def _to_query(x):
    # Accept either {"question": "..."} or just "..."
    return x["question"] if isinstance(x, dict) else x

def retrieve(x) -> List[Document]:
    query = _to_query(x)
    return bm25.invoke(query)  # new API; returns List[Document]

# Runnable receives whatever upstream sends (str or dict) and normalizes it
retriever_runnable = RunnableLambda(lambda x: retrieve(x))


We’ll condense the retrieved docs to a single context string that flows into the prompt.

In [12]:
# Cell 8
def docs_to_context(docs: List[Document]) -> str:
    lines = []
    for i, d in enumerate(docs, 1):
        src = d.metadata.get("section", "unknown")
        lines.append(f"[{i}] ({src}) {d.page_content.strip()}")
    return "\n".join(lines)

context_builder = RunnableLambda(lambda docs: {"context": docs_to_context(docs), "sources": docs})


Cell 9 — Prompt Template

ChatPromptTemplate.from_messages([("system", SYSTEM), ("placeholder", "{chat_history}"), ("human", "Question: {question}\n\nRelevant context:\n{context}\n\nAnswer:")])

SYSTEM should define:

directive to be procedural,

citation convention [n(section)],

uncertainty policy (“say you don’t know if context insufficient”).

Input keys required by prompt: question, context, chat_history.

In [13]:
# Cell 9
SYSTEM = """You are NovaPress Support Assistant.
Answer precisely with steps. If unsure, say so.
Cite sources as [n(section)] at the end of sentences where relevant.
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", SYSTEM),
    ("placeholder", "{chat_history}"),
    ("human", "Question: {question}\n\nRelevant context:\n{context}\n\nAnswer:")
])


In [None]:
import os
os.environ["OPENAI_API_KEY"] = 


Cell 10 — LLM Binding (Provider Switch)

In [15]:
# Cell 10 — choose OpenAI or mock

import os
from langchain_core.runnables import RunnableLambda

USE_OPENAI = True   # flip False if you want the offline mock
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

def mock_answer(inputs):
    q = inputs["question"].lower()
    ctx = inputs["context"]
    if "descal" in q:
        return "To descale: 20g/L citric acid, run Brew x2 + Steam x1, rinse 3 tanks. Every ~8wks/200 shots. [descale]"
    if "grind" in q:
        return "Espresso grind: 3–6. <25s → finer; >35s → coarser. Tamp ~30 lbs, pre-infuse 3s. [grind]"
    if "warranty" in q:
        return "2-year parts & labor warranty, excludes hard water damage. [warranty]"
    return "From context: " + ctx.splitlines()[0]

if USE_OPENAI:
    if not OPENAI_API_KEY:
        raise RuntimeError("Set OPENAI_API_KEY first!")
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)
else:
    llm = RunnableLambda(lambda x: mock_answer(x))


Cell 11 — Parser + Result Packer

For OpenAI path, normalize to string via StrOutputParser().

Pack with sources for downstream UIs / audits:

In [16]:
# Cell 11
parser = StrOutputParser()

def pack_result(answer: str, sources: List[Document]) -> Dict[str, Any]:
    return {
        "answer": answer,
        "sources": [{"section": d.metadata.get("section"), "snippet": d.page_content[:160].strip()} for d in sources]
    }

pack_runnable = RunnableLambda(lambda d: pack_result(d["answer"], d["sources"]))


Cell 12 — Chain Assembly (LCEL graph)

In [17]:
# Cell 12 — assemble rag_inputs and then base_chain

# First: rag_inputs builds {"question", "context", "sources"}
rag_inputs = RunnableParallel({
    "question": RunnablePassthrough(),
    "retrieved_docs": retriever_runnable
}) | RunnableParallel({
    "question": lambda d: d["question"],
    "context_and_sources": lambda d: context_builder.invoke(d["retrieved_docs"])
}) | RunnableLambda(lambda d: {
    "question": d["question"],
    "context": d["context_and_sources"]["context"],
    "sources": d["context_and_sources"]["sources"]
})

# Then: branch base_chain depending on USE_OPENAI
if USE_OPENAI:
    base_chain = (
        RunnableParallel({
            "question": RunnablePassthrough(),
            "context": (rag_inputs | (lambda x: x["context"]))
        })
        | prompt
        | llm
        | StrOutputParser()
    )
else:
    base_chain = (
        RunnableParallel({
            "question": RunnablePassthrough(),
            "context": (rag_inputs | (lambda x: x["context"]))
        })
        | RunnableLambda(mock_answer)
    )

# Finally: wrap with sources for full_chain
full_chain = RunnableParallel({
    "answer": base_chain,
    "sources": (rag_inputs | (lambda x: x["sources"]))
}) | RunnableLambda(lambda d: {
    "answer": d["answer"],
    "sources": [{"section": s.metadata.get("section"),
                 "snippet": s.page_content[:160].strip()} for s in d["sources"]]
})


Cell 13 — Conversational Memory

In [18]:
# Cell 13
# Simple chat history in-memory store (keyed per session id)
store: Dict[str, BaseChatMessageHistory] = {}

def get_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

conversational = RunnableWithMessageHistory(
    full_chain,
    get_history,
    input_messages_key="question",      # what's considered user input
    history_messages_key="chat_history" # injected into the prompt placeholder
)

SESSION_ID = "novapress-demo"


Cell 14 — First Invocation (Smoke Test)

In [19]:
# Cell 14
query = "How do I descale this machine safely, and how often should I do it?"
result = conversational.invoke(
    {"question": query},
    config={"configurable": {"session_id": SESSION_ID}}
)
result


Error in RootListenersTracer.on_chain_end callback: KeyError('output')


{'answer': "To descale your machine safely, follow these steps:\n\n1. **Prepare Descaling Solution**: Mix a citric acid solution at a concentration of 20g per liter of water.\n2. **Run Clean Cycle**: Execute the 'Clean' cycle on your machine, which involves:\n   - Brewing 2 cycles of the descaling solution.\n   - Running 1 cycle of steam.\n3. **Rinse Thoroughly**: After descaling, rinse the machine by running 3 tanks of clean water through it to ensure all descaling solution is removed.\n\n**Frequency**: You should descale your machine every 8 weeks or after 200 shots, whichever comes first [3].",
 'sources': [{'section': 'warranty',
   'snippet': 'Warranty:\n- 2 years parts & labor with proof of purchase.\n- Excludes damage from unfiltered hard water or misuse.\n- Support: support@novapress.example'},
  {'section': 'troubleshoot',
   'snippet': 'Troubleshooting:\n- Bitter taste: too fine grind, too hot, over-extracted (>40s).\n- Sour taste: too coarse, low temperature, under-extracted

Cell 15 — Follow-up (Memory Verification)

In [20]:
# Cell 15
follow_up = "Cool—if my shots run fast afterward, what grind changes should I make?"
result2 = conversational.invoke(
    {"question": follow_up},
    config={"configurable": {"session_id": SESSION_ID}}
)
result2


Error in RootListenersTracer.on_chain_end callback: KeyError('output')


{'answer': 'If your shots are running fast (less than 25 seconds), you should make the following grind changes:\n\n1. **Grind Finer**: Adjust your grinder to a finer setting. This will increase the resistance against the water, slowing down the extraction time.\n\n2. **Check Your Dose**: Ensure you are using the correct amount of coffee. A standard dose is typically around 18-20 grams for a double shot.\n\n3. **Tamp Evenly**: Make sure you are tamping evenly with about 30 lbs of pressure to avoid channeling, which can lead to fast shots.\n\n4. **Consider Pre-Infusion**: If your machine allows, use a pre-infusion setting for about 3 seconds to help with even extraction.\n\nBy following these steps, you should be able to achieve a more balanced extraction time for your espresso shots [1].',
 'sources': [{'section': 'grind',
   'snippet': 'Grind Guide:\n- Espresso: 3–6; lighter roasts → finer; darker → coarser.\n- Channeling fix: even tamp ~30 lbs, pre-infuse 3s.\n- If shot <25s: grind fi

Cell 16 — Retrieval Inspection (Operational Visibility)

In [21]:
# Cell 16
def preview_retrieval(q: str, k: int = TOP_K):
    hits = bm25.invoke(q)[:k]
    for i, d in enumerate(hits, 1):
        print(f"[{i}] section={d.metadata.get('section')}\n{d.page_content[:220].strip()}\n---")

preview_retrieval("prime the pump and setup steps?")


[1] section=troubleshoot
Troubleshooting:
- Bitter taste: too fine grind, too hot, over-extracted (>40s).
- Sour taste: too coarse, low temperature, under-extracted (<20s).
- Low pressure: clogged basket or pump prime needed.
---
[2] section=warranty
Warranty:
- 2 years parts & labor with proof of purchase.
- Excludes damage from unfiltered hard water or misuse.
- Support: support@novapress.example
---
[3] section=descale
Descaling Procedure:
- Use citric acid solution 20g/L.
- Run 'Clean' cycle: 2x brew, 1x steam.
- Rinse thoroughly: 3 tanks of clean water.
- Frequency: every 8 weeks or 200 shots.
---
[4] section=grind
Grind Guide:
- Espresso: 3–6; lighter roasts → finer; darker → coarser.
- Channeling fix: even tamp ~30 lbs, pre-infuse 3s.
- If shot <25s: grind finer; if >35s: grind coarser.
---


Cell 17 — ask() Helper (Guard + Pretty Output)

In [22]:
# Cell 17
def ask(question: str, session_id: str = SESSION_ID):
    # quick guard: if nothing retrieved, don’t waste tokens
    hits = bm25.invoke(question)
    if not hits:
        return {
            "answer": "I don’t have enough relevant context to answer. Try rephrasing or expanding the docs.",
            "sources": []
        }
    out = conversational.invoke(
        {"question": question},
        config={"configurable": {"session_id": session_id}}
    )
    # pretty print
    print("Answer:\n", out["answer"], "\n")
    if out.get("sources"):
        print("Sources:")
        for i, s in enumerate(out["sources"], 1):
            print(f" [{i}] ({s['section']}) {s['snippet']}")
    return out

# Try a few:
ask("How do I descale this machine safely, and how often?")
ask("My shot took 15 seconds—what should I change?")
ask("How do I prime the pump on first use?")


Error in RootListenersTracer.on_chain_end callback: KeyError('output')


Answer:
 To descale your machine safely, follow these steps:

1. **Prepare the Descaling Solution**: Mix a citric acid solution at a concentration of 20g per liter of water.

2. **Run the Clean Cycle**:
   - Start by running the brew cycle twice.
   - Then, run the steam cycle once.

3. **Rinse the Machine**: After descaling, thoroughly rinse the machine by running three tanks of clean water through it.

**Frequency of Descaling**: You should descale your machine every 8 weeks or after 200 shots, whichever comes first [3]. 

Sources:
 [1] (warranty) Warranty:
- 2 years parts & labor with proof of purchase.
- Excludes damage from unfiltered hard water or misuse.
- Support: support@novapress.example
 [2] (troubleshoot) Troubleshooting:
- Bitter taste: too fine grind, too hot, over-extracted (>40s).
- Sour taste: too coarse, low temperature, under-extracted (<20s).
- Low pressu
 [3] (descale) Descaling Procedure:
- Use citric acid solution 20g/L.
- Run 'Clean' cycle: 2x brew, 1x steam.
- 

Error in RootListenersTracer.on_chain_end callback: KeyError('output')


Answer:
 To adjust your shot time of 15 seconds, follow these steps:

1. **Check Grind Size**: Since your shot is under the ideal range (25-35 seconds), you should grind your coffee finer. The recommended grind size for espresso is between 3-6, so consider adjusting it closer to 3.

2. **Tamp Pressure**: Ensure you are tamping evenly with about 30 lbs of pressure. An uneven tamp can lead to channeling, which affects extraction time.

3. **Pre-Infusion**: If you're not already doing so, consider using a pre-infusion time of about 3 seconds. This can help with even extraction.

4. **Test Again**: After making these adjustments, pull another shot and time it. Aim for a duration between 25-35 seconds for optimal extraction.

By making these changes, you should see an improvement in your shot time. If you continue to have issues, consider reaching out to support for further assistance [1(section), 3(section)]. 

Sources:
 [1] (intro) NovaPress 3000 Overview:
- 19-bar pump, 1.8L reservoir, P

Error in RootListenersTracer.on_chain_end callback: KeyError('output')


Answer:
 To prime the pump on first use, follow these steps:

1. **Fill the Water Tank**: Ensure the water tank is filled with fresh, clean water.

2. **Turn On the Machine**: Power on the machine and wait for it to reach the appropriate temperature.

3. **Activate the Brew Cycle**: Start a brew cycle without coffee in the basket. This will help to push water through the system and prime the pump.

4. **Observe Water Flow**: You should see water flowing from the group head. If no water flows, repeat the brew cycle a few times until the pump is primed.

5. **Check for Air Bubbles**: If you notice air bubbles in the water flow, continue running the brew cycle until the flow is steady.

6. **Ready for Use**: Once the pump is primed and water flows smoothly, you can proceed to brew your coffee.

If you encounter any issues, refer to the troubleshooting section for low pressure or contact support at support@novapress.example for assistance [1]. 

Sources:
 [1] (troubleshoot) Troubleshooting

{'answer': 'To prime the pump on first use, follow these steps:\n\n1. **Fill the Water Tank**: Ensure the water tank is filled with fresh, clean water.\n\n2. **Turn On the Machine**: Power on the machine and wait for it to reach the appropriate temperature.\n\n3. **Activate the Brew Cycle**: Start a brew cycle without coffee in the basket. This will help to push water through the system and prime the pump.\n\n4. **Observe Water Flow**: You should see water flowing from the group head. If no water flows, repeat the brew cycle a few times until the pump is primed.\n\n5. **Check for Air Bubbles**: If you notice air bubbles in the water flow, continue running the brew cycle until the flow is steady.\n\n6. **Ready for Use**: Once the pump is primed and water flows smoothly, you can proceed to brew your coffee.\n\nIf you encounter any issues, refer to the troubleshooting section for low pressure or contact support at support@novapress.example for assistance [1].',
 'sources': [{'section': 't