In [None]:
import os
import logging
import shutil
import getpass

In [None]:
# Langchain core
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough, RunnableBranch
from langchain_core.messages import HumanMessage, SystemMessage

In [None]:
# Langchain community
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_nomic.embeddings import NomicEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.llms import Ollama
from langchain_community.tools.tavily_search import TavilySearchResults

In [None]:
# Streamlit
import streamlit as st

In [None]:
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [None]:
# --- Environment setup ---
def _set_env(var: str):
    """Sets an environment variable if not already set, prompting the user interactively."""
    if not os.environ.get(var):
        logger.warning(f"Environment variable '{var}' not found.")
        try:
            os.environ[var] = getpass.getpass(f"Please enter your {var}: ")
        except Exception as e:
            logger.error(f"Could not get input for {var}: {e}. Functionality requiring this key may fail.")
            os.environ[var] = ""

In [None]:
os.environ["TOKENIZERS_PARALLELISM"] = "true"
logger.info(f"Set TOKENIZERS_PARALLELISM to {os.environ['TOKENIZERS_PARALLELISM']}")

In [None]:
# --- Knowledge Base and Web Serahc configuration ---
URLS = [
    "https://developers.google.com/machine-learning/guides/rules-of-ml/",
    "https://peps.python.org/pep-0008/",
    "https://google.github.io/styleguide/pyguide.html"
]
PERSIST_DIRECTORY = "./chroma_db_nomic_v1_notebook_final"
EMBEDDING_MODEL = "nomic-embed-text-v1.5"
LLM_MODEL = "gemma3"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

# --- Environment Variables ---
_set_env("TAVILY_API_KEY")

TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY")

if TAVILY_API_KEY:
    logger.info("TAVILY_API_KEY is set for this notebook session.")
else:
    logger.warning("TAVILY_API_KEY could not be obtained or is empty. Web search will be disabled.")

In [None]:
# The build function
def build_vector_store(force_rebuild=False):
    """Data load, split, embed, and persists to Vector DB (Chroma for now, but will be refined with other Open Source Vector Database)."""
    logger.info("Starting the vector store build process...")
    was_built = False

    if os.path.exists(PERSIST_DIRECTORY):
        if force_rebuild:
            logger.warning(f"Force rebuild requested. Removing existing vector store at '{PERSIST_DIRECTORY}'.")
            try:
                shutil.rmtree(PERSIST_DIRECTORY)
                logger.info(f"Removed existing directory: {PERSIST_DIRECTORY}")
            except OSError as e:
                logger.error(f"Error removing directory {PERSIST_DIRECTORY}: {e}")
                return False
        else:
            logger.info(f"Vector store already exists at '{PERSIST_DIRECTORY}'. Set force_rebuild=True to overwrite.")
            return True

    # load documents
    logger.info(f"Loading documents from {len(URLS)} URLs...")
    try:
        loader = WebBaseLoader(URLS, continue_on_failure=True)
        docs = loader.load()
        if not docs:
             logger.error("Documents not were successfully loaded. Check URLs and network connection.")
             return False
        logger.info(f"Successfully loaded {len(docs)} base documents.")
    except Exception as e:
        logger.error(f"Failed during document loading: {e}")
        return False

    # split documents
    logger.info(f"Splitting documents into chunks (size={CHUNK_SIZE}, overlap={CHUNK_OVERLAP})...")
    try:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=CHUNK_SIZE,
            chunk_overlap=CHUNK_OVERLAP
        )
        doc_splits = text_splitter.split_documents(docs)
        logger.info(f"Documents split into {len(doc_splits)} chunks.")
        if not doc_splits:
            logger.error("No document chunks were created after splitting.")
            return False
    except Exception as e:
        logger.error(f"Failed to split documents: {e}")
        return False

    # init embeddings
    logger.info(f"Initializing Nomic embeddings ('{EMBEDDING_MODEL}' with local inference)...")
    logger.warning("This might download the embedding model if not already cached (~0.5 GB).")
    try:
        embeddings = NomicEmbeddings(model=EMBEDDING_MODEL, inference_mode="local")
        logger.info("Nomic embeddings initialized.")
    except Exception as e:
        logger.error(f"Failed to initialize embeddings: {e}")
        return False

    # create | persist to Vector Database
    logger.info(f"Creating and persisting ChromaDB vector store at '{PERSIST_DIRECTORY}'...")
    try:
        vectorstore = Chroma.from_documents(
            documents=doc_splits,
            embedding=embeddings,
            persist_directory=PERSIST_DIRECTORY
        )
        logger.info("Vector store created and persisted successfully.")
        was_built = True
    except Exception as e:
        logger.error(f"Failed to create/persist vector store: {e}")
        return False

    logger.info("Vector store build process finished.")
    return was_built

In [None]:
# --- build process ---
build_successful = False
try:
    build_successful = build_vector_store(force_rebuild=False)
    if build_successful:
        print(f"Vector store build/check completed successfully. Ready at {PERSIST_DIRECTORY}")
    else:
        print("Vector store build failed.")
except Exception as e:
    print(f"An unexpected error occurred during the build execution block: {e}")
    logger.exception("Error in build execution block")

In [None]:
# components for runtime queries
components_loaded = False
llm = None
retriever = None
web_search_tool = None
router_llm = None
embeddings = None

In [None]:
# --- build validation phase ---
if 'build_successful' not in locals() and 'build_successful' not in globals():
    logger.error("The variable 'build_successful' is not defined.")
    logger.error("run the cell in 'indexing phase' before running this cell.")
elif build_successful:
    logger.info("Build successful, proceeding to initialize runtime components...")
    try:
        logger.info(f"Initializing Nomic embeddings ({EMBEDDING_MODEL}, local)...")
        embeddings = NomicEmbeddings(model=EMBEDDING_MODEL, inference_mode="local")

        logger.info(f"Loading Chroma vector store from {PERSIST_DIRECTORY}...")
        if not os.path.exists(PERSIST_DIRECTORY):
             logger.error(f"Vector store directory not found at {PERSIST_DIRECTORY}, though build was marked successful?")
             raise FileNotFoundError(f"Vector store not found at {PERSIST_DIRECTORY}")
        vectorstore = Chroma(persist_directory=PERSIST_DIRECTORY, embedding_function=embeddings)
        retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # top 3 chunks
        logger.info("Vector store and retriever loaded.")

        logger.info(f"Initializing Ollama LLM ({LLM_MODEL})...")
        llm = Ollama(model=LLM_MODEL, temperature=0)
        llm.invoke("Respond briefly: OK") # connection validation
        logger.info("Ollama LLM connection verified.")
        router_llm = llm

        # init Tavily - web search
        if TAVILY_API_KEY:
            web_search_tool = TavilySearchResults(k=3)
            logger.info("Tavily Search tool initialized.")
        else:
            web_search_tool = None
            logger.warning("web search NOT initialized (API key not available).")

        components_loaded = True
        logger.info("All available runtime components initialized successfully.")

    except Exception as e:
        logger.exception(f"Error initializing runtime components: {e}")
        components_loaded = False
else:
    components_loaded = False
    logger.error("Cannot initialize runtime components because the vector store build failed.")

In [None]:
if components_loaded:
    print("Runtime components are loaded and ready.")
else:
    print("Runtime components failed to load.")

In [None]:
# Router, RAG chain, web search, and branching logic
def format_tavily_results(results):
    """Tavily search results formatted into a readable string."""
    if not results:
        return "No relevant information found via web search."
    try:
        # results is iterable and items are dictionaries
        if not isinstance(results, list): return "Received non-list results from Tavily."
        summary = "\n\n".join(
            f"URL: {res.get('url', 'N/A')}\nContent: {res.get('content', 'N/A')}"
            for res in results if isinstance(res, dict)
        )
        return summary if summary else "Found results, but could not extract content."
    except Exception as e:
        logger.error(f"Error formatting Tavily results: {e}")
        return "Error processing search results."

In [None]:
# --- Chains ---
full_chain = None

if components_loaded:
    logger.info("Defining Langchain chains...")
    try:
        # Router Chain
        router_prompt_template = """Given the user query, determine if it is best answered using internal knowledge about Python Style Guides (PEP8, Google Style Guide) and ML best practices (Google's Rules of ML), or if it requires a real-time web search for general programming topics, specific code examples not related to style/ML rules, current events, or very recent information.
Respond only with the word 'vectorstore' or 'web_search'.

User Query: {question}
Decision:"""
        router_prompt = PromptTemplate.from_template(router_prompt_template)
        router = (
            {"question": RunnablePassthrough()}
            | router_prompt
            | router_llm
            | StrOutputParser()
            | RunnableLambda(lambda x: x.strip().lower().replace("'", "").replace('"', '')) # Clean output
        )

        # RAG Chain
        rag_prompt_template = """You are coding assistant specializing in Python style (PEP8, Google Style Guide) and ML best practices (Google's Rules of ML). Answer the user's question based *only* on the following provided context. If the context doesn't contain the answer, state that the specific information isn't available in the provided knowledge base. Do not use external knowledge.

Context:
{context}

Question: {question}

Answer:"""
        rag_prompt = ChatPromptTemplate.from_template(rag_prompt_template)
        rag_chain = (
            RunnablePassthrough.assign(
                context= RunnablePassthrough.assign(question=lambda x: x["question"])
                         | (lambda x: x["question"])
                         | retriever
                         | (lambda docs: "\n\n".join(doc.page_content for doc in docs))
            )
            | rag_prompt
            | llm
            | StrOutputParser()
        )


        # Web Search Chain
        if web_search_tool:
            web_search_prompt_template = """You are a helpful Python coding assistant. Analyze the user's question and the provided web search results.

1.  **Determine Intent:** Is the user asking for an explanation, or for code implementation (e.g., "How to write...", "Implement...", "Code for...")?
2.  **Synthesize Answer:**
    *   If the user wants an explanation, provide a clear summary based on the context.
    *   If the user wants code: Generate clean, functional Python code based on the information in the web search results and the user's request.
3.  **Apply Style:** When generating Python code, make a best effort to adhere to the Google Python Style Guide (e.g., clear variable names, docstrings for functions/classes, reasonable line lengths, comments where necessary).
4.  **Handle Insufficient Info:** If the search results are irrelevant or insufficient to fulfill the request (either explanation or code), clearly state that.

Web Search Results:
{context}

Question: {question}

Answer:"""
            web_search_prompt = ChatPromptTemplate.from_template(web_search_prompt_template)
            web_chain = (
                RunnablePassthrough.assign(
                     context= RunnablePassthrough.assign(question=lambda x: x["question"])
                              | (lambda x: x["question"])
                              | web_search_tool
                              | RunnableLambda(format_tavily_results)
                )
                | web_search_prompt
                | llm
                | StrOutputParser()
            )
            logger.info("Web search chain defined.")
        else:
            web_chain = RunnableLambda(lambda x: "Web search is not available - missing Tavily API key.")
            logger.warning("Web search chain is disabled (no API key).")

        # Branching
        def decide_chain(route_info):
            """Decide which chain to run based on router output."""
            question = route_info.get("question", "N/A")
            route_decision = route_info.get("route_decision", "").lower()
            logger.debug(f"Deciding chain for question '{question[:50]}...'. Router decision: '{route_decision}'")

            if "vectorstore" in route_decision:
                logger.info(f"Routing to RAG chain for question: {question[:50]}...")
                return rag_chain
            elif web_search_tool:
                logger.info(f"Routing to Web Search chain for question: {question[:50]}...")
                return web_chain
            else:
                logger.warning(f"Routing defaulted away from web search (disabled) for question: {question[:50]}...")
                return RunnableLambda(lambda x: "Web search was intended but is disabled.")


        # original question and the routing decision
        routing_chain = RunnablePassthrough.assign(
            route_decision = {"question": RunnablePassthrough()} | router
        )
        
        full_chain = routing_chain | RunnableLambda(decide_chain)

        logger.info("Langchain chains defined successfully.")
        print("Langchain chains are defined and ready for testing.")

    except Exception as e:
        logger.exception("Error defining Langchain chains.")
        print(f"Error defining Langchain chains: {e}")
        full_chain = None
else:
    logger.error("Skipping chain definition as components failed to load.")
    print("Skipping chain definition as components failed to load.")

In [None]:
# validation of chains
if full_chain:
    print("\n--- Running tests ---")
    test_queries = {
        "Merge Sort": "How to write Merge Sort Algorithm?",
        "ML Rule": "How to implement Regression Machine Learning Model?"
    }

    for name, query in test_queries.items():
        print(f"\n--- Testing Query ({name}): '{query}' ---")
        try:
            
            response = full_chain.invoke({"question": query})
            print(f"Full Chain Response:\n{response}\n")
        except Exception as e:
            print(f"!!! Error testing query '{query}': {e}")
            logger.exception(f"Error during interactive test for query: {query}")

    print("--- Chain tests completed ---")

else:
    print("\nSkipping testing as chains could not be defined.")