<a href="https://colab.research.google.com/github/waqasalam/EosSdk/blob/master/RAG_Pipeline_MVP_for_Bug_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# First, ensure you have the necessary libraries installed:
# pip install langchain langchain-openai langchain-community "langgraph[hub]" "arize-phoenix[all]" chromadb

import os
from datetime import datetime
from typing import List, Literal, TypedDict

# LangChain/LangGraph imports
from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph

# Arize Phoenix imports
import phoenix as px
from phoenix.trace import LangChainSpanEvaluator, SpanEvaluator

# Initialize Phoenix for tracing
# This will launch the Phoenix UI, typically at http://localhost:6006
session = px.launch_app()

# --- Configuration ---
# IMPORTANT: Ensure your OPENAI_API_KEY is set as an environment variable.
# Example (do not hardcode in production):
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
if "OPENAI_API_KEY" not in os.environ:
    print("WARNING: OPENAI_API_KEY environment variable not set. Please set it to run the LLM.")
    # Exit or handle gracefully if API key is critical for execution
    # For this MVP, we'll proceed, but LLM calls will fail without the key.

# --- 1. Data Collection & Preprocessing (Mock Data for MVP) ---

# Mock Datadog Logs
# In a real application, you would fetch these via Datadog's API.
datadog_logs = [
    {
        "timestamp": "2023-05-15T10:00:00Z",
        "service": "authentication-service",
        "level": "ERROR",
        "message": "Failed to authenticate user 'john.doe': Invalid credentials.",
        "log_id": "log-001"
    },
    {
        "timestamp": "2023-05-15T10:05:30Z",
        "service": "order-processing-service",
        "level": "WARN",
        "message": "High latency detected for database query 'get_orders_by_user'. Duration: 2500ms.",
        "log_id": "log-002"
    },
    {
        "timestamp": "2023-05-15T10:10:15Z",
        "service": "payment-gateway",
        "level": "ERROR",
        "message": "Stripe API call failed: Payment declined. Transaction ID: XYZ123.",
        "log_id": "log-003"
    },
    {
        "timestamp": "2023-05-15T10:12:00Z",
        "service": "authentication-service",
        "level": "ERROR",
        "message": "Rate limit exceeded for login attempts from IP: 192.168.1.10. User: 'jane.doe'.",
        "log_id": "log-004"
    },
    {
        "timestamp": "2023-05-16T11:00:00Z",
        "service": "inventory-service",
        "level": "ERROR",
        "message": "Negative stock detected for product ID: P123. Current stock: -5.",
        "log_id": "log-005"
    },
]

# Mock Jira Issues
# In a real application, you would fetch these via Jira's API.
jira_issues = [
    {
        "issue_id": "JIRA-101",
        "summary": "Authentication failure for invalid credentials",
        "description": "Users are reporting issues logging in with correct credentials. Logs show 'Invalid credentials' error from authentication-service.",
        "status": "Open",
        "priority": "High"
    },
    {
        "issue_id": "JIRA-102",
        "summary": "Slow order processing due to DB latency",
        "description": "Order processing times have increased. Datadog logs indicate high latency for 'get_orders_by_user' query.",
        "status": "In Progress",
        "priority": "Medium"
    },
    {
        "issue_id": "JIRA-103",
        "summary": "Stripe payment declines for some transactions",
        "description": "Some payments are being declined via Stripe. Logs show 'Payment declined' errors.",
        "status": "Open",
        "priority": "Critical"
    },
    {
        "issue_id": "JIRA-104",
        "summary": "Implement rate limiting for login attempts",
        "description": "Multiple login attempts from single IPs are not rate-limited, leading to potential brute-force attacks.",
        "status": "To Do",
        "priority": "Low"
    },
    {
        "issue_id": "JIRA-105",
        "summary": "Negative stock bug in inventory service",
        "description": "Inventory service is reporting negative stock for some products, leading to incorrect availability.",
        "status": "Open",
        "priority": "High"
    }
]

# Mock Codebase (simplified for MVP, imagine full files)
# In a real application, you would integrate with your Git repository
# and use more sophisticated code parsing/chunking.
code_snippets = [
    {
        "file_path": "auth_service/handlers.py",
        "function_name": "authenticate_user",
        "code": """
def authenticate_user(username, password):
    # ...
    # This is where the authentication logic resides.
    # A common bug here is incorrect password hashing or comparison.
    if not check_password_hash(user.password_hash, password):
        logger.error(f"Failed to authenticate user '{username}': Invalid credentials.")
        return False
    # ...
"""
    },
    {
        "file_path": "order_service/db_utils.py",
        "function_name": "get_orders_by_user",
        "code": """
def get_orders_by_user(user_id):
    start_time = time.time()
    # This query might be inefficient for large datasets or complex joins.
    orders = db.query(Order).filter_by(user_id=user_id).all()
    duration = (time.time() - start_time) * 1000
    if duration > 2000: # Threshold for high latency
        logger.warning(f"High latency detected for database query 'get_orders_by_user'. Duration: {duration:.0f}ms.")
    return orders
"""
    },
    {
        "file_path": "payment_service/stripe_api.py",
        "function_name": "process_payment",
        "code": """
import stripe
def process_payment(amount, token):
    try:
        charge = stripe.Charge.create(
            amount=amount,
            currency='usd',
            source=token,
            description='Payment for order'
        )
        return charge.id
    except stripe.error.CardError as e:
        # This catch block handles Stripe-specific card errors.
        # Ensure all possible Stripe exceptions are handled.
        logger.error(f"Stripe API call failed: {e.user_message}. Transaction ID: {e.request_id}.")
        raise # Re-raise to propagate the error
"""
    },
    {
        "file_path": "auth_service/rate_limiter.py",
        "function_name": "check_rate_limit",
        "code": """
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    key_func=get_remote_address,
    default_limits=["10 per minute"] # Example rate limit
)

def check_rate_limit(ip_address, user_id):
    # This function is defined but might not be properly integrated
    # into the main login flow, leading to unhandled rate limit scenarios.
    pass # Needs implementation to actually apply the limit
"""
    },
    {
        "file_path": "inventory_service/stock_manager.py",
        "function_name": "update_stock",
        "code": """
def update_stock(product_id, quantity_change):
    current_stock = get_current_stock(product_id)
    new_stock = current_stock + quantity_change
    if new_stock < 0:
        # This condition indicates a bug where stock can go negative.
        # It should ideally prevent the update or trigger an immediate alert/rollback.
        logger.error(f"Negative stock detected for product ID: {product_id}. Current stock: {current_stock}, Change: {quantity_change}.")
        # For MVP, just logging the error. A real fix would prevent this state.
    db.update_stock(product_id, new_stock)
"""
    }
]

# Convert mock data into LangChain Document objects
# Each document will have its content and associated metadata.
log_documents = []
for log in datadog_logs:
    content = f"Datadog Log - Service: {log['service']}, Level: {log['level']}, Message: {log['message']}"
    metadata = {
        "source": "datadog_log",
        "timestamp": log["timestamp"],
        "service": log["service"],
        "level": log["level"],
        "log_id": log["log_id"]
    }
    log_documents.append(Document(page_content=content, metadata=metadata))

jira_documents = []
for issue in jira_issues:
    content = f"Jira Issue - ID: {issue['issue_id']}, Summary: {issue['summary']}, Description: {issue['description']}, Status: {issue['status']}, Priority: {issue['priority']}"
    metadata = {
        "source": "jira_issue",
        "issue_id": issue["issue_id"],
        "summary": issue["summary"],
        "status": issue["status"],
        "priority": issue["priority"]
    }
    jira_documents.append(Document(page_content=content, metadata=metadata))

code_documents = []
for snippet in code_snippets:
    # Include the code content directly in the document for embedding
    content = f"Code Snippet - File: {snippet['file_path']}, Function: {snippet['function_name']}\n```python\n{snippet['code']}\n```"
    metadata = {
        "source": "code_snippet",
        "file_path": snippet["file_path"],
        "function_name": snippet["function_name"]
    }
    code_documents.append(Document(page_content=content, metadata=metadata))

# Combine all documents into a single list for the vector store
all_documents = log_documents + jira_documents + code_documents

# --- 2. Embedding and Vector Store ---
# Initialize the OpenAI Embeddings model. This converts text into numerical vectors.
embeddings = OpenAIEmbeddings()

# Create a ChromaDB vector store from the documents.
# ChromaDB is used here for simplicity as an in-memory or local file-based store.
# For production, consider persistent and scalable vector databases like Milvus, Pinecone, etc.
vectorstore = Chroma.from_documents(documents=all_documents, embedding=embeddings)

# Create a retriever from the vector store.
# It will fetch the top 'k' (here, 5) most semantically similar documents to a given query.
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# --- 3. LLM Setup ---
# Initialize the ChatOpenAI model.
# gpt-4o-mini is chosen for its balance of capability and cost-effectiveness.
# temperature=0 makes the LLM's responses more deterministic and factual.
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# --- 4. RAG Pipeline with LangGraph ---

# Define the state for our LangGraph workflow.
# This TypedDict specifies the data structure that will be passed between nodes.
class AgentState(TypedDict):
    query: str  # The initial user query or log message
    documents: List[Document]  # List of retrieved documents (logs, Jira, code)
    answer: str  # The final answer generated by the LLM

# Define the 'retrieve_documents' node function.
# This node is responsible for fetching relevant documents from the vector store.
def retrieve_documents(state: AgentState):
    print("\n---NODE: Retrieving Documents---")
    query = state["query"]
    # Invoke the retriever to get documents similar to the query
    documents = retriever.invoke(query)
    print(f"Retrieved {len(documents)} documents.")
    # Return the updated state with the retrieved documents
    return {"documents": documents}

# Define the 'generate_answer' node function.
# This node takes the query and retrieved documents and uses the LLM to synthesize an answer.
def generate_answer(state: AgentState):
    print("\n---NODE: Generating Answer with LLM---")
    query = state["query"]
    documents = state["documents"]

    # Format the retrieved documents into a single context string for the LLM.
    context_str = "\n\n".join([doc.page_content for doc in documents])

    # Define the prompt template for the LLM.
    # It includes a system message to guide the LLM's role and placeholders for context and query.
    prompt_template = ChatPromptTemplate.from_messages([
        ("system", """You are an expert AI assistant specializing in bug detection, root cause analysis, and code localization using logs, Jira issues, and code snippets.
        Analyze the provided context carefully. Identify potential bugs, their root causes, and suggest relevant code locations.
        If the query is about a bug, try to link it to existing Jira issues or logs.
        Be concise and factual. If you cannot find relevant information, state that you don't have enough information.

        Context:
        {context}"""),
        ("user", "{query}")
    ])

    # Create a LangChain chain: prompt -> LLM -> output parser.
    chain = prompt_template | llm | StrOutputParser()

    # Invoke the chain with the context and query to get the LLM's answer.
    answer = chain.invoke({"context": context_str, "query": query})
    print("LLM generated answer.")
    # Return the updated state with the generated answer.
    return {"answer": answer}

# Build the LangGraph workflow.
workflow = StateGraph(AgentState)

# Add the defined nodes to the workflow.
workflow.add_node("retrieve", retrieve_documents)
workflow.add_node("generate", generate_answer)

# Set the entry point of the graph (where execution begins).
workflow.set_entry_point("retrieve")

# Define the edges (transitions between nodes).
# After retrieving documents, the workflow moves to generating the answer.
workflow.add_edge("retrieve", "generate")
# After generating the answer, the workflow ends.
workflow.add_edge("generate", END)

# Compile the graph into an executable application.
app = workflow.compile()

# --- 5. Arize Phoenix Integration ---
# Phoenix automatically instruments LangChain/LangGraph calls if it's launched
# (as done with `px.launch_app()` at the beginning).
# You will see detailed traces of each step (retrieval, LLM invocation) in the Phoenix UI.

# --- Example Usage ---
print("\n--- Running MVP Pipeline with Example Queries ---")

# Example Query 1: A log error message
query1 = "I see an error log 'Failed to authenticate user: Invalid credentials' from authentication-service. What could be the issue and relevant code?"
inputs1 = {"query": query1}
print(f"\nProcessing Query 1: {query1}")
output1 = app.invoke(inputs1)
print(f"\nQuery 1 Result:\n{output1['answer']}")

# Example Query 2: A Jira issue ID
query2 = "We have a Jira issue JIRA-105 about negative stock. Can you find related logs or code snippets that explain this?"
inputs2 = {"query": query2}
print(f"\nProcessing Query 2: {query2}")
output2 = app.invoke(inputs2)
print(f"\nQuery 2 Result:\n{output2['answer']}")

# Example Query 3: A code-centric query
query3 = "Review the 'process_payment' function in 'payment_service/stripe_api.py' for potential errors or common issues."
inputs3 = {"query": query3}
print(f"\nProcessing Query 3: {query3}")
output3 = app.invoke(inputs3)
print(f"\nQuery 3 Result:\n{output3['answer']}")

# Example Query 4: A general bug description
query4 = "We're experiencing slow responses in the order processing service. What might be the cause?"
inputs4 = {"query": query4}
print(f"\nProcessing Query 4: {query4}")
output4 = app.invoke(inputs4)
print(f"\nQuery 4 Result:\n{output4['answer']}")

print("\n--- MVP Pipeline Finished ---")
print("Check your Arize Phoenix UI for detailed traces and evaluations of these runs.")
print(f"Phoenix UI URL: {session.url}")

# --- Optional: Custom Evaluation with Arize Phoenix (More advanced) ---
# For a more detailed RAG evaluation, you'd typically define specific evaluation criteria
# and potentially run this on a larger, labeled dataset.

# Here's a placeholder for how you might define a custom evaluator if needed.
# This would be used to programmatically evaluate the quality of the RAG responses.
class BugDetectionEvaluator(SpanEvaluator):
    # The 'evaluate' method is called by Phoenix for spans.
    # You would implement logic here to assess the quality of the LLM's output
    # based on the span's inputs, outputs, and potentially ground truth data.
    def evaluate(self, trace_id: str, span_id: str) -> dict:
        # Retrieve the specific span from the trace
        span = px.get_trace_span(trace_id, span_id)

        # Check if the span is an LLM call and has an 'answer' in its output
        if span and span.operation_name == "langchain_run" and "answer" in span.output:
            response_text = span.output["answer"]
            query_text = span.input.get("query", "") # Get the original query

            # Example: A very basic check for keywords indicating a successful bug identification
            # In a real scenario, this would involve more sophisticated NLP or comparison
            # against a ground truth.
            contains_bug_keywords = any(keyword in response_text.lower() for keyword in ["bug", "error", "issue", "root cause", "code location"])

            # You can also add checks for hallucination, relevance, etc.
            # For instance, check if the response mentions specific services or files that were in the retrieved context.

            return {
                "contains_bug_keywords": contains_bug_keywords,
                # Add more custom metrics here
                # "is_relevant_to_query": True/False,
                # "suggests_code_fix": True/False,
            }
        return {} # Return empty dict if evaluation criteria not met for this span

# To use a custom evaluator, you would typically add it to Phoenix after launching:
# px.add_evaluator(BugDetectionEvaluator())
# Note: Activating evaluators and running evaluations usually involves more setup
# like creating datasets and running evaluation jobs within Phoenix.