In [4]:
from collections import deque
from langchain_core.tools import tool
from pathlib import Path
from typing import List
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv(override=True, dotenv_path=".env")

# Retrieve the directory from environment variables
LOG_DIR = os.getenv("LOG_DIRECTORY", "./logs")

@tool
def list_log_files() -> List[str]:
    """Lists all .log files in the configured log directory."""
    try:
        path = Path(LOG_DIR)
        if not path.is_dir():
            return [f"Error: {LOG_DIR} is not a valid directory."]
        
        return [f.name for f in path.glob("*.log")]
    except Exception as e:
        return [f"Error listing logs: {str(e)}"]

@tool
def read_log_file(filename: str, last_n_lines: int = 20) -> str:
    """
    Reads the last N lines of a specific log file from the log directory.
    Only the filename (e.g., 'server.log') is required.
    """
    try:
        # Join the base LOG_DIR with the filename for security
        path = Path(LOG_DIR) / filename
        
        if not path.exists():
            return f"Error: File {filename} not found in {LOG_DIR}"

        with open(path, 'r', encoding='utf-8', errors='replace') as f:
            return "".join(deque(f, maxlen=last_n_lines))
            
    except Exception as e:
        return f"Error reading log: {str(e)}"


In [5]:
import os
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
# from log_reader import read_log_file, list_log_files
from dotenv import load_dotenv

# Load environment variables - MUST come before ChatOpenAI()
load_dotenv(override=True, dotenv_path=".env")

# Validate API key
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError(
        "OPENAI_API_KEY not found. Please set it in your .env file or environment variables."
    )

# Get model name from environment or use default
model_name = os.getenv("OPENAI_MODEL", "gpt-5-mini")

# Register all available tools
tools = [read_log_file, list_log_files]

# Create model with tools
model = ChatOpenAI(model=model_name).bind_tools(tools)
tool_node = ToolNode(tools)


In [6]:
import os
from pathlib import Path
from typing import Annotated, Sequence, TypedDict, Literal
from dotenv import load_dotenv

from langchain_core.messages import BaseMessage, HumanMessage
from langsmith import traceable
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# from openai_model import model, tool_node

# Optional import for LangSmith upload functionality
try:
    from upload_to_langsmith import upload_logs_to_langsmith
except ImportError:
    upload_logs_to_langsmith = None

# 1. Configuration & State
load_dotenv(override=True)
os.environ["LANGSMITH_TRACING"] = "true"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

# 2. Nodes
@traceable(run_type="llm")
def call_model(state: AgentState):
    """The 'Brain' - decides which logs to read."""
    response = model.invoke(state["messages"])
    return {"messages": [response]}

def summarize_results(state: AgentState):
    """The 'Synthesizer' - converts raw log data into a clean report."""
    summary_prompt = HumanMessage(content=(
        "You have finished reading the logs. Provide a structured 'Log Analysis Summary' "
        "including: 1. Root Cause, 2. Timestamp, and 3. Suggested Fix."
    ))
    # We pass the full history to the model for the final summary
    response = model.invoke(state["messages"] + [summary_prompt])
    return {"messages": [response]}

# 3. Routing Logic (The Conditional Edge)
@traceable
def router(state: AgentState) -> Literal["tools", "summarize", "agent"]:
    last_msg = state["messages"][-1]
    
    # Path A: Model wants to use a tool (list_dir, read_file)
    if getattr(last_msg, "tool_calls", None):
        return "tools"
    
    # Path B: Detect if we found a crash but might need more context (Stack Trace)
    # If 'Traceback' is in the text, we let the agent loop once more to get detail
    if "Traceback" in last_msg.content and len(state["messages"]) < 6:
        return "agent"
    
    # Path C: Information gathered, move to summary
    return "summarize"

# 4. Graph Construction
workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("summarize", summarize_results)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", router)
workflow.add_edge("tools", "agent")
workflow.add_edge("summarize", END)

app = workflow.compile()

# 5. Main Execution
if __name__ == "__main__":
    log_dir = os.getenv("LOG_DIRECTORY", "./logs")
    
    inputs = {
        "messages": [
            ("user", f"Analyze the logs in '{log_dir}'. Identify any recurring crashes.")
        ]
    }
    
    for output in app.stream(inputs, stream_mode="updates"):
        for node, data in output.items():
            print(f"--- Node: {node} ---")
            print(data["messages"][-1].content or "Calling Tools...")
    
    # Optional: Upload logs to LangSmith if function is available
    if upload_logs_to_langsmith:
        upload_logs_to_langsmith("log-analyzer", log_dir)


--- Node: agent ---
Calling Tools...
--- Node: tools ---
["server.log"]
--- Node: agent ---
Calling Tools...
--- Node: tools ---
93.184.216.34 - - [31/Jan/2026:15:00:00 +0000] "GET /index.html HTTP/1.1" 200 1045 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
192.168.1.10 - specific_user [31/Jan/2026:15:05:30 +0000] "POST /api/login HTTP/1.1" 401 120 "https://www.example.com/login" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15"
10.0.0.5 - - [31/Jan/2026:15:10:15 +0000] "GET /images/logo.png HTTP/1.1" 304 0 "https://www.example.com/" "Googlebot/2.1 (+http://www.google.com)" 

--- Node: agent ---
I inspected the log directory and the only log file present is server.log. I read the most recent entries from that file; they are standard HTTP access entries (GET/POST) and show no server crashes or error stack traces. Summary:

- Files found: ser

Evaluate Agent

In [None]:
"""
LangSmith SDK Evaluation Script

This script runs programmatic evaluations of the log analyzer agent using LangSmith SDK.
It tests the agent against a dataset of realistic queries and evaluates the responses.
"""

import os
import json
from pathlib import Path
from typing import Dict, List, Any
from dotenv import load_dotenv

from langsmith import Client, traceable
from langsmith.evaluation import evaluate
from langsmith.schemas import Run

from main import app

# Load environment variables
load_dotenv(override=True, dotenv_path=".env")

# Initialize LangSmith client
client = Client(api_key=os.getenv("LANGSMITH_API_KEY"))
PROJECT_NAME = os.getenv("LANGSMITH_PROJECT", "log-analyzer")


def load_evaluation_dataset() -> List[Dict[str, Any]]:
    """Load evaluation dataset from JSON file."""
    dataset_path = Path(__file__).parent / "evaluation_dataset.json"
    with open(dataset_path, "r") as f:
        return json.load(f)


@traceable(name="log_analyzer_agent")
def run_agent(query: str) -> Dict[str, Any]:
    """
    Run the agent with a given query and return the final response.
    This is wrapped with @traceable for LangSmith tracking.
    """
    from langchain_core.messages import HumanMessage
    
    log_dir = os.getenv("LOG_DIRECTORY", "./logs")
    
    inputs = {
        "messages": [
            HumanMessage(content=query)
        ]
    }
    
    # Collect all messages from the stream
    final_messages = []
    for output in app.stream(inputs, stream_mode="updates"):
        for node, data in output.items():
            if "messages" in data and data["messages"]:
                final_messages.extend(data["messages"])
    
    # Get the last message content (final response)
    if final_messages:
        last_message = final_messages[-1]
        if hasattr(last_message, "content"):
            return {"output": last_message.content}
        else:
            return {"output": str(last_message)}
    
    return {"output": "No response generated"}


def agent_predict(inputs: Dict[str, str]) -> Dict[str, str]:
    """
    Wrapper function for the agent that LangSmith can evaluate.
    This is the function that gets called during evaluation.
    """
    query = inputs.get("query", "")
    result = run_agent(query)
    return {"output": result.get("output", "")}


def contains_evaluator(run: Run, example) -> Dict[str, Any]:
    """
    Custom evaluator that checks if the output contains expected keywords.
    """
    prediction = run.outputs.get("output", "").lower()
    expected = example.outputs.get("expected_contains", [])
    
    if not expected:
        return {"key": "contains_check", "score": 1.0}
    
    found_count = sum(1 for keyword in expected if keyword.lower() in prediction)
    score = found_count / len(expected) if expected else 0.0
    
    return {
        "key": "contains_check",
        "score": score,
        "comment": f"Found {found_count}/{len(expected)} expected keywords"
    }


def structure_evaluator(run: Run, example) -> Dict[str, Any]:
    """
    Custom evaluator that checks if the output has expected structure elements.
    """
    prediction = run.outputs.get("output", "").lower()
    expected_structure = example.outputs.get("expected_structure", [])
    
    if not expected_structure:
        return {"key": "structure_check", "score": 1.0}
    
    found_count = sum(1 for element in expected_structure if element.lower() in prediction)
    score = found_count / len(expected_structure) if expected_structure else 0.0
    
    return {
        "key": "structure_check",
        "score": score,
        "comment": f"Found {found_count}/{len(expected_structure)} expected structure elements"
    }


def min_score_evaluator(run: Run, example) -> Dict[str, Any]:
    """
    Evaluator that checks if the overall score meets the minimum threshold.
    """
    min_score = example.outputs.get("min_score", 0.7)
    
    # Calculate average of other evaluators
    contains_score = contains_evaluator(run, example).get("score", 0.0)
    structure_score = structure_evaluator(run, example).get("score", 0.0)
    avg_score = (contains_score + structure_score) / 2
    
    passed = avg_score >= min_score
    
    return {
        "key": "min_score_check",
        "score": 1.0 if passed else 0.0,
        "comment": f"Average score {avg_score:.2f} {'meets' if passed else 'below'} minimum {min_score}"
    }


def run_evaluation(project_name: str = None, project_url: str = None):
    """
    Run the evaluation experiment using LangSmith SDK.
    
    Args:
        project_name: LangSmith project name. If not provided, uses LANGSMITH_PROJECT env var.
        project_url: LangSmith project URL. If not provided, constructs from project_name.
    """
    if project_name is None:
        project_name = os.getenv("LANGSMITH_PROJECT", "log-analyzer")
    
    if project_url is None:
        project_url = f"https://smith.langchain.com/projects/{project_name}"
    
    print(f"üöÄ Starting LangSmith evaluation for project: {project_name}")
    print("=" * 60)
    
    # Load dataset
    dataset = load_evaluation_dataset()
    print(f"üìä Loaded {len(dataset)} test cases")
    
    # Create or get dataset in LangSmith
    dataset_name = f"{project_name}-dataset"
    try:
        # Try to get existing dataset
        client.read_dataset(dataset_name=dataset_name)
        print(f"üìÅ Using existing dataset: {dataset_name}")
    except Exception:
        # Create new dataset
        try:
            client.create_dataset(
                dataset_name=dataset_name,
                description="Log analyzer agent evaluation dataset"
            )
            print(f"üìÅ Created new dataset: {dataset_name}")
        except Exception as e:
            print(f"‚ö†Ô∏è  Dataset creation issue (may already exist): {e}")
    
    # Upload examples to dataset (use raw dicts; LangSmith assigns ids)
    try:
        client.create_examples(
            inputs=[item["inputs"] for item in dataset],
            outputs=[item["outputs"] for item in dataset],
            dataset_name=dataset_name
        )
        print(f"‚úÖ Uploaded {len(dataset)} examples to dataset")
    except Exception as e:
        print(f"‚ö†Ô∏è  Example upload issue (may already exist): {e}")
        print(f"   Continuing with existing examples...")
    
    # Run evaluation
    print("\nüîç Running evaluation...")
    print("-" * 60)
    
    results = evaluate(
        agent_predict,
        data=dataset_name,
        evaluators=[
            contains_evaluator,
            structure_evaluator,
            min_score_evaluator,
        ],
        experiment_prefix=f"{project_name}-experiment",
        max_concurrency=1,  # Run sequentially to avoid rate limits
    )
    
    print("\n" + "=" * 60)
    print("üìà Evaluation Results Summary")
    print("=" * 60)
    
    # Wait for experiment feedback to be processed so we can read results
    results.wait()
    
    # Aggregate scores by evaluator key
    scores_by_key: Dict[str, List[float]] = {}
    n_results = 0
    for row in results:
        n_results += 1
        eval_results = getattr(row, "evaluation_results", None)
        if eval_results is None:
            continue
        res_list = getattr(eval_results, "results", [])
        for r in res_list or []:
            key = getattr(r, "key", "unknown")
            score = getattr(r, "score", None)
            if score is not None and isinstance(score, (int, float)):
                scores_by_key.setdefault(key, []).append(float(score))
    
    if not scores_by_key and n_results == 0:
        print("\n‚ö†Ô∏è  No evaluation results found. Results may still be processing.")
        print("   Check the LangSmith UI in a minute for full feedback.")
    else:
        print(f"\n‚úÖ Evaluation completed ({n_results} runs)")
        if scores_by_key:
            print("\n  Evaluator scores (average):")
            for key in sorted(scores_by_key.keys()):
                vals = scores_by_key[key]
                avg = sum(vals) / len(vals) if vals else 0
                print(f"    {key}: {avg:.2f}  (n={len(vals)})")
        print(f"\nüìä View full results in LangSmith:")
        print(f"   {project_url}")
    print()


if __name__ == "__main__":
    # Validate API key
    if not os.getenv("LANGSMITH_API_KEY"):
        raise ValueError(
            "LANGSMITH_API_KEY not found. Please set it in your .env file."
        )
    
    run_evaluation()
