In [1]:
import os
import re
import hashlib
import json
import logging
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Any
from pathlib import Path

from crewai import Agent, Task, Crew, Process

# LangChain bits for RAG
from langchain_community.document_loaders import Docx2txtLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
CONFIG = {
    "doc_path": "./data/Ravi_Total.docx",
    "persist_dir": "chroma_db",
    "chunk_size": 700,
    "chunk_overlap": 200,
    "retrieve_k": 5,
    "llm_model": "groq/llama-3.3-70b-versatile",
    "verbose": False,  # Set to False for cleaner output
}

In [2]:

# -----------------------------
# RAG: Build / Load Knowledge Base
# -----------------------------
def create_knowledge_base(doc_path: str = None,
                          persist_dir: str = None,
                          chunk_size: int = None,
                          chunk_overlap: int = None):
    """
    Creates (or loads) a persisted Chroma DB from Ravi_Total.docx.
    Re-running will reuse the persisted DB (no re-chunking).
    """
    doc_path = doc_path or CONFIG["doc_path"]
    persist_dir = persist_dir or CONFIG["persist_dir"]
    chunk_size = chunk_size or CONFIG["chunk_size"]
    chunk_overlap = chunk_overlap or CONFIG["chunk_overlap"]
    
    try:
        embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

        if os.path.exists(persist_dir) and os.listdir(persist_dir):
            kb = Chroma(persist_directory=persist_dir, embedding_function=embeddings)
            logger.info("‚úÖ Loaded existing knowledge base")
            return kb

        if not os.path.exists(doc_path):
            raise FileNotFoundError(f"Document not found: {doc_path}")
            
        logger.info(f"Loading document from: {doc_path}")
        loader = Docx2txtLoader(doc_path)
        docs = loader.load()

        if not docs:
            raise ValueError(f"No content found in document: {doc_path}")
            
        logger.info(f"Splitting {len(docs)} document(s) into chunks...")
        splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        chunks = splitter.split_documents(docs)

        logger.info(f"Creating knowledge base with {len(chunks)} chunks...")
        kb = Chroma.from_documents(chunks, embeddings, persist_directory=persist_dir)
        logger.info("‚úÖ Knowledge base created & persisted")
        return kb
        
    except FileNotFoundError as e:
        logger.error(f"‚ùå {str(e)}")
        raise
    except Exception as e:
        logger.error(f"‚ùå Error creating knowledge base: {str(e)}")
        raise


In [3]:


# -----------------------------
# Helpers for trace/formatting
# -----------------------------
def _short(text: str, n: int = 300) -> str:
    text = text.strip().replace("\n", " ")
    return (text[:n] + "‚Ä¶") if len(text) > n else text

def _doc_id(doc_text: str) -> str:
    return hashlib.md5(doc_text.encode("utf-8")).hexdigest()[:8]

def retrieve_context(kb: Chroma, query: str, k: int = None) -> List[Dict[str, Any]]:
    """
    Retrieve k most relevant chunks with their scores and ids.
    """
    k = k or CONFIG["retrieve_k"]
    try:
        retriever = kb.as_retriever(search_kwargs={"k": k})
        docs = retriever.get_relevant_documents(query)
        
        if not docs:
            logger.warning(f"‚ö†Ô∏è No relevant documents found for query: {query}")
            return []
            
        results = []
        for d in docs:
            rid = _doc_id(d.page_content)
            results.append({
                "id": rid,
                "excerpt": _short(d.page_content, 420),
                "metadata": d.metadata or {},
                "raw": d.page_content
            })
        return results
    except Exception as e:
        logger.error(f"‚ùå Error retrieving context: {str(e)}")
        return []

def build_context_block(retrieved: List[Dict[str, Any]]) -> str:
    if not retrieved:
        return "[No relevant context found]"
    lines = []
    for r in retrieved:
        lines.append(f"[{r['id']}] {r['excerpt']}")
    return "\n".join(lines)

def parse_validator_output(text: str) -> Dict[str, Any]:
    """
    Expect the validator to output in this schema:

    VERDICT: PASS | REWRITE
    JUSTIFICATION:
    - bullet 1
    - bullet 2
    FINAL:
    <final hr-ready answer>

    We parse this to build a structured trace.
    """
    try:
        verdict_match = re.search(r"^VERDICT:\s*(.+)$", text, flags=re.IGNORECASE | re.MULTILINE)
        final_split = re.split(r"\bFINAL:\s*", text, flags=re.IGNORECASE)
        just_match = re.search(r"JUSTIFICATION:\s*(.*)", text, flags=re.IGNORECASE | re.DOTALL)

        verdict = verdict_match.group(1).strip() if verdict_match else "UNKNOWN"
        if len(final_split) >= 2:
            final_answer = final_split[-1].strip()
            justification_block = just_match.group(1).split("FINAL:")[0].strip() if just_match else ""
        else:
            final_answer = text.strip()
            justification_block = ""

        bullets = []
        for line in justification_block.splitlines():
            line = line.strip()
            if line.startswith("-"):
                bullets.append(line[1:].strip())
        return {"verdict": verdict, "final_answer": final_answer, "bullets": bullets}
    except Exception as e:
        logger.error(f"‚ùå Error parsing validator output: {str(e)}")
        return {"verdict": "ERROR", "final_answer": text, "bullets": []}

def format_trace_for_display(trace: Dict[str, Any]) -> str:
    """Format trace dictionary for readable output."""
    output = []
    output.append("\nüìä QUERY:")
    output.append(f"  {trace['query']}\n")
    output.append(f"üìö RETRIEVED CHUNKS: ({len(trace['retrieved_chunks'])} chunks)")
    for chunk in trace['retrieved_chunks']:
        output.append(f"  [{chunk['id']}] {chunk['excerpt']}")
    output.append(f"\n‚úîÔ∏è  VALIDATOR VERDICT: {trace['validator_verdict']}")
    if trace['validator_justification_bullets']:
        output.append("üìù JUSTIFICATION:")
        for bullet in trace['validator_justification_bullets']:
            output.append(f"  ‚Ä¢ {bullet}")
    return "\n".join(output)


In [None]:

# -----------------------------
# Crew Agents & Tasks
# -----------------------------
def run_hr_rag_session(query: str) -> Dict[str, Any]:
    """
    One-run pipeline:
      1) Retrieve context from Chroma
      2) Answer Agent drafts answer (context-only)
      3) Validator Agent checks/optimizes answer
      4) Return final answer + compact reasoning trace (no chain-of-thought)
    """
    try:
        logger.info(f"üîç Processing query: {query}")
        kb = create_knowledge_base()
        retrieved = retrieve_context(kb, query)
        context_block = build_context_block(retrieved)

        # --- Agent 1: Answer Generator (uses only provided context) ---
        answer_agent = Agent(
            role="Answer Generator",
            goal="Generate accurate answers about Ravi using ONLY the provided context.",
            backstory=(
                "You specialize in answering questions about Ravi using retrieved chunks from a private knowledge base."
                " Do not invent facts. If the context is insufficient, say 'Insufficient context'."
            ),
            llm=CONFIG["llm_model"],
            verbose=CONFIG["verbose"]
        )

        task1 = Task(
            description=(
                "Use ONLY the following context to answer the user's question.\n\n"
                f"CONTEXT:\n{context_block}\n\n"
                f"QUESTION: {query}\n\n"
                "REQUIREMENTS:\n"
                "- Cite chunk IDs in-line like [id] where relevant.\n"
                "- If information is not present in the context, reply exactly: 'Insufficient context'.\n"
                "- Keep answer concise and factual."
            ),
            expected_output="A concise factual answer with inline citations to chunk IDs where used.",
            agent=answer_agent
        )

        # --- Agent 2: HR Validator/Optimizer ---
        validator_agent = Agent(
            role="HR Validator",
            goal=(
                "Check if the draft is accurate to the provided context and HR-appropriate. "
                "If needed, rewrite it to be HR-ready."
            ),
            backstory=(
                "You are a professional HR communication specialist. "
                "You ensure responses are accurate, concise, and suitable for HR discussions."
            ),
            llm=CONFIG["llm_model"],
            verbose=CONFIG["verbose"]
        )

        task2 = Task(
            description=(
                "You will receive a DRAFT answer and the CONTEXT used to create it.\n\n"
                f"CONTEXT:\n{context_block}\n\n"
                "DRAFT: {{output_of_Task_1}}\n\n"
                "Validate against the CONTEXT and produce output in EXACTLY this format:\n"
                "VERDICT: PASS or REWRITE\n"
                "JUSTIFICATION:\n"
                "- brief bullet explaining accuracy (no internal chain-of-thought)\n"
                "- brief bullet explaining HR-clarity/tone\n"
                "FINAL:\n"
                "- If PASS: return the DRAFT verbatim\n"
                "- If REWRITE: return an improved HR-friendly version (still grounded in CONTEXT)\n"
                "\nRULES:\n"
                "- Do NOT reveal chain-of-thought. Only provide concise bullets.\n"
                "- Do NOT add facts not present in CONTEXT.\n"
                "- Keep the final answer succinct and HR-ready."
            ),
            expected_output="A PASS/REWRITE verdict with brief justification bullets and the HR-ready final answer.",
            agent=validator_agent,
            context=[task1]
        )
        
        crew = Crew(
            agents=[answer_agent, validator_agent],
            tasks=[task1, task2],
            process=Process.sequential,
            verbose=CONFIG["verbose"]
        )

        logger.info("‚öôÔ∏è  Running crew workflow...")
        result_text = crew.kickoff()

        # Parse validator output into a structured result
        parsed = parse_validator_output(result_text)

        # Build a clean reasoning trace (no chain-of-thought)
        trace = {
            "query": query,
            "retrieved_chunks": [
                {"id": r["id"], "excerpt": r["excerpt"], "metadata": r["metadata"]}
                for r in retrieved
            ],
            "draft_answer_note": "Draft generated from context-only by Answer Agent (not shown to avoid chain-of-thought).",
            "validator_verdict": parsed.get("verdict", "UNKNOWN"),
            "validator_justification_bullets": parsed.get("bullets", []),
        }

        logger.info("‚úÖ Query processed successfully")
        return {
            "final_answer": parsed.get("final_answer", result_text),
            "trace": trace
        }
        
    except Exception as e:
        logger.error(f"‚ùå Error in HR RAG session: {str(e)}")
        return {
            "final_answer": f"Error processing query: {str(e)}",
            "trace": {"query": query, "error": str(e), "retrieved_chunks": []}
        }


: 

In [None]:

# -----------------------------
# Query Interface & History
# -----------------------------
class QueryHistory:
    """Manages query history and statistics."""
    def __init__(self, history_file: str = "query_history.json"):
        self.history_file = history_file
        self.queries = self._load_history()
    
    def _load_history(self) -> List[Dict[str, Any]]:
        if os.path.exists(self.history_file):
            try:
                with open(self.history_file, 'r') as f:
                    return json.load(f)
            except:
                return []
        return []
    
    def save_history(self):
        with open(self.history_file, 'w') as f:
            json.dump(self.queries, f, indent=2)
    
    def add_query(self, query: str, result: Dict[str, Any]):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "query": query,
            "final_answer": result.get("final_answer", ""),
            "verdict": result.get("trace", {}).get("validator_verdict", "")
        }
        self.queries.append(entry)
        self.save_history()
    
    def display_history(self):
        if not self.queries:
            print("üìã No query history yet.")
            return
        print("\n" + "="*80)
        print("üìã QUERY HISTORY")
        print("="*80)
        for i, q in enumerate(self.queries[-10:], 1):  # Show last 10
            print(f"\n{i}. [{q['timestamp']}]")
            print(f"   Q: {q['query']}")
            print(f"   V: {q['verdict']}")
            print(f"   A: {_short(q['final_answer'], 100)}")
        print("\n" + "="*80)

def display_welcome():
    """Display welcome message."""
    print("\n" + "="*80)
    print("ü§ñ RAG HR ASSISTANT - Query Interface")
    print("="*80)
    print("Welcome to the HR Knowledge Base Query System!")
    print("\nCommands:")
    print("  ‚Ä¢ Type your question and press Enter")
    print("  ‚Ä¢ Type 'history' to see query history")
    print("  ‚Ä¢ Type 'help' for available commands")
    print("  ‚Ä¢ Type 'exit' to quit")
    print("="*80)

def display_help():
    """Display help information."""
    print("\n" + "="*80)
    print("üìö HELP - Available Commands")
    print("="*80)
    print("""
Usage:
  - Ask questions naturally about Ravi's background, experience, skills, etc.
  - Questions are processed through:
    1. Retrieval: Find relevant information from knowledge base
    2. Generation: Create an accurate answer based on retrieved context
    3. Validation: Ensure answer meets HR standards

Commands:
  history    - Show recent query history
  help       - Display this help message
  exit       - Exit the application

Examples:
  "What is Ravi's experience with Python?"
  "Summarize Ravi's work history"
  "What are Ravi's key technical skills?"
    """)
    print("="*80)

def single_run_demo():
    """Run a single demo query."""
    query = "Summarize Ravi's Machine Learning experience for HR."
    print(f"\nüîç Demo Query: {query}\n")
    out = run_hr_rag_session(query)
    print("\n" + "="*80)
    print("‚úÖ FINAL HR-READY ANSWER")
    print("="*80)
    print(out["final_answer"])
    print("\n" + "="*80)
    print("üìä REASONING TRACE")
    print("="*80)
    print(format_trace_for_display(out["trace"]))
    print("="*80 + "\n")

def interactive_loop():
    """Interactive query loop with history tracking."""
    display_welcome()
    history = QueryHistory()
    
    while True:
        try:
            print("\nüí¨ Enter your question (or 'help' for commands):")
            user_input = input("> ").strip()
            
            if not user_input:
                print("‚ö†Ô∏è  Please enter a valid question.")
                continue
            
            if user_input.lower() == "exit":
                print("\nüëã Thank you for using RAG HR Assistant. Goodbye!")
                break
            
            if user_input.lower() == "help":
                display_help()
                continue
            
            if user_input.lower() == "history":
                history.display_history()
                continue
            
            # Process query
            result = run_hr_rag_session(user_input)
            history.add_query(user_input, result)
            
            print("\n" + "="*80)
            print("‚úÖ FINAL HR-READY ANSWER")
            print("="*80)
            print(result["final_answer"])
            print("\n" + "="*80)
            print("üìä REASONING TRACE")
            print("="*80)
            print(format_trace_for_display(result["trace"]))
            print("="*80)
            
        except KeyboardInterrupt:
            print("\n\nüëã Session interrupted. Goodbye!")
            break
        except Exception as e:
            logger.error(f"‚ùå Unexpected error: {str(e)}")
            print(f"‚ùå An error occurred: {str(e)}")
            print("Please try again with a different query.")

if __name__ == "__main__":
    try:
        # Uncomment the line below to run a single demo query
        # single_run_demo()
        
        # Run interactive query interface
        interactive_loop()
    except Exception as e:
        logger.error(f"‚ùå Fatal error: {str(e)}")
        print(f"‚ùå Fatal error: {str(e)}")



ü§ñ RAG HR ASSISTANT - Query Interface
Welcome to the HR Knowledge Base Query System!

Commands:
  ‚Ä¢ Type your question and press Enter
  ‚Ä¢ Type 'history' to see query history
  ‚Ä¢ Type 'help' for available commands
  ‚Ä¢ Type 'exit' to quit

üí¨ Enter your question (or 'help' for commands):


2026-01-20 12:09:01,022 - INFO - üîç Processing query: Tell me about you
  embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
