LangGraph Components

In [None]:
import os
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException

os.environ["NRP_API_KEY"] = "Api key here"
config.load_incluster_config()

v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
batch_v1 = client.BatchV1Api()
networking_v1 = client.NetworkingV1Api()


In [20]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

from tabulate import tabulate

In [21]:
from openai import OpenAI

client = OpenAI(
    api_key=os.environ.get("NRP_API_KEY"),
    base_url="https://llm.nrp-nautilus.io/"
)


In [22]:
def describe_pods(namespace="gsoc"):
    """
    Describe pods and print only fields useful for Prometheus metric queries.
    """
    try:
        pods = v1.list_namespaced_pod(namespace=namespace) if namespace else v1.list_pod_for_all_namespaces()

        rows = []
        for pod in pods.items:
            pod_name = pod.metadata.name
            ns = pod.metadata.namespace
            pod_ip = pod.status.pod_ip
            node = pod.spec.node_name
            container_names = [c.name for c in pod.spec.containers]
            container = ", ".join(container_names)

            rows.append([pod_name, ns, pod_ip, node, container])

        headers = ["Pod", "Namespace", "Pod IP", "Node", "Container"]
        print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))

    except ApiException as e:
        print(f"❌ Error fetching pods: {e}")


In [23]:
describe_pods()

╒══════════════════════════════════╤═════════════╤════════════════╤════════════════════════════╤═════════════╕
│ Pod                              │ Namespace   │ Pod IP         │ Node                       │ Container   │
╞══════════════════════════════════╪═════════════╪════════════════╪════════════════════════════╪═════════════╡
│ agno-deployment-55c55964db-lzhkx │ gsoc        │ 10.244.215.212 │ hcc-nrp-shor-c6013.unl.edu │ jupyter     │
├──────────────────────────────────┼─────────────┼────────────────┼────────────────────────────┼─────────────┤
│ my-postgres-cluster-0            │ gsoc        │ 10.244.91.149  │ k8s-gen4-02.ampath.net     │ postgres    │
├──────────────────────────────────┼─────────────┼────────────────┼────────────────────────────┼─────────────┤
│ shellshock-cluster-0             │ gsoc        │ 10.244.19.231  │ dtn-gpu2.kreonet.net       │ postgres    │
╘══════════════════════════════════╧═════════════╧════════════════╧════════════════════════════╧═════════════╛


In [24]:
#namespace gpu utilization

In [25]:
import requests


def namespace_gpu_utilization(prom_url="https://prometheus.nrp-nautilus.io", threshold=0):
    """
    Display average GPU utilization per namespace using PromQL.
    Args:
        prom_url (str): Base Prometheus URL.
        threshold (float): Minimum % utilization to show (filtering).
    """
    query = 'avg by (namespace) (DCGM_FI_DEV_GPU_UTIL)'
    url = f"{prom_url}/api/v1/query"

    try:
        response = requests.get(url, params={"query": query}, timeout=10)
        response.raise_for_status()
        data = response.json()

        if data.get("status") != "success":
            print("❌ Prometheus query failed.")
            return

        results = data["data"]["result"]
        if not results:
            print("✅ Query successful, but no GPU usage data returned.")
            return

        rows = []
        for r in results:
            ns = r["metric"].get("namespace", "unknown")
            util = float(r["value"][1])
            if util >= threshold:
                status = (
                    "🟢 Low" if util < 40 else
                    "🟡 Moderate" if util < 70 else
                    "🔴 High"
                )
                rows.append([ns, f"{util:.2f}%", status])

        headers = ["Namespace", "Avg GPU Utilization", "Status"]
        print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))

    except Exception as e:
        print(f"❌ Error querying Prometheus: {e}")


In [26]:
namespace_gpu_utilization()


╒════════════════════════════════════╤═══════════════════════╤═════════════╕
│ Namespace                          │ Avg GPU Utilization   │ Status      │
╞════════════════════════════════════╪═══════════════════════╪═════════════╡
│ gpu-mon                            │ 0.15%                 │ 🟢 Low      │
├────────────────────────────────────┼───────────────────────┼─────────────┤
│ csusb-hpc                          │ 0.00%                 │ 🟢 Low      │
├────────────────────────────────────┼───────────────────────┼─────────────┤
│ nrp-llm                            │ 7.08%                 │ 🟢 Low      │
├────────────────────────────────────┼───────────────────────┼─────────────┤
│ csusb-xli                          │ 0.00%                 │ 🟢 Low      │
├────────────────────────────────────┼───────────────────────┼─────────────┤
│ sdsu-goldberg                      │ 0.00%                 │ 🟢 Low      │
├────────────────────────────────────┼───────────────────────┼─────────────┤
│ sp

In [27]:
import requests
from tabulate import tabulate

def fetch_dcgm_gpu_util_data(prom_url="https://prometheus.nrp-nautilus.io"):
    """
    Fetch rich GPU utilization data from Prometheus using DCGM_FI_DEV_GPU_UTIL.
    
    Returns:
        list of dicts with context: [{hostname, gpu_id, model, namespace, pod, utilization, ...}]
    """
    query = 'DCGM_FI_DEV_GPU_UTIL'
    url = f"{prom_url}/api/v1/query"

    try:
        response = requests.get(url, params={"query": query}, timeout=10)
        response.raise_for_status()
        data = response.json()

        if data.get("status") != "success":
            print("❌ Prometheus query failed.")
            return []

        results = data["data"]["result"]
        if not results:
            print("✅ Query successful, but no GPU data returned.")
            return []

        enriched = []
        for r in results:
            m = r["metric"]
            val = float(r["value"][1])
            enriched.append({
                "hostname": m.get("Hostname", "unknown"),
                "ip_port": m.get("instance", "unknown"),
                "gpu_id": m.get("gpu", "N/A"),
                "device": m.get("device", "N/A"),
                "uuid": m.get("UUID", "N/A"),
                "model": m.get("modelName", "unknown"),
                "namespace": m.get("namespace", "N/A"),
                "pod": m.get("pod", "N/A"),
                "utilization": val
            })

        return enriched

    except Exception as e:
        print(f"❌ Error querying Prometheus: {e}")
        return []


def display_gpu_data_head(data, n=5):
    """
    Display the first `n` GPU entries with rich context.
    """
    if not data:
        print("No data to display.")
        return

    rows = [
        [d["hostname"], d["gpu_id"], d["model"], f"{d['utilization']:.2f}%", d["namespace"], d["pod"]]
        for d in data[:n]
    ]
    print(tabulate(rows, headers=["Host", "GPU", "Model", "Utilization", "Namespace", "Pod"], tablefmt="fancy_grid"))


def analyze_dcgm_gpu_data(data):
    """
    Analyze DCGM GPU data with statistics and top utilization.
    """
    if not data:
        print("No data to analyze.")
        return

    total = len(data)
    avg_util = sum(d["utilization"] for d in data) / total
    maxed = [d for d in data if d["utilization"] >= 99.0]
    idle = [d for d in data if d["utilization"] < 1.0]
    available = [d for d in data if d["utilization"] < 100.0]
    unique_hosts = set(d["hostname"] for d in data)
    unique_models = set(d["model"] for d in data)

    print(f"\n🔍 Total GPUs: {total}")
    print(f"📊 Average Utilization: {avg_util:.2f}%")
    print(f"🔴 Fully Utilized GPUs (>=99%): {len(maxed)}")
    print(f"🟢 Idle GPUs (<1%): {len(idle)}")
    print(f"💻 Unique Host Machines: {len(unique_hosts)}")
    print(f"🧠 Unique GPU Models: {len(unique_models)}")
    print(f"🧮 GPUs Available (<100%): {len(available)}\n")

    print("📈 Top 10 GPUs by Utilization:")
    top = sorted(data, key=lambda x: x["utilization"], reverse=True)[:10]
    rows = [[d["hostname"], d["gpu_id"], d["model"], f"{d['utilization']:.2f}%", d["namespace"], d["pod"]] for d in top]
    print(tabulate(rows, headers=["Host", "GPU", "Model", "Utilization", "Namespace", "Pod"], tablefmt="github"))


# Run it
if __name__ == "__main__":
    data = fetch_dcgm_gpu_util_data()
    display_gpu_data_head(data, n=5)
    analyze_dcgm_gpu_data(data)


╒═══════════════════════════════╤═══════╤════════════════════════════╤═══════════════╤═════════════╤════════════════════════════════════════════════════════╕
│ Host                          │   GPU │ Model                      │ Utilization   │ Namespace   │ Pod                                                    │
╞═══════════════════════════════╪═══════╪════════════════════════════╪═══════════════╪═════════════╪════════════════════════════════════════════════════════╡
│ k8s-gpu-03.sdsc.optiputer.net │     7 │ NVIDIA GeForce GTX 1080 Ti │ 0.00%         │ gpu-mon     │ dcgm-export-dcgm-exporter-r5cz4                        │
├───────────────────────────────┼───────┼────────────────────────────┼───────────────┼─────────────┼────────────────────────────────────────────────────────┤
│ gpu-01.csusb.edu              │     0 │ NVIDIA RTX A5000           │ 0.00%         │ csusb-hpc   │ jupyter-anthony-2elopez6979-40coyote-2ecsusb-2eedu     │
├───────────────────────────────┼───────┼───────────

In [28]:
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from typing import Optional
from io import StringIO
import sys
import json

# Helper to capture and truncate printed output
def capture_stdout_truncated(func, max_length=2000, *args, **kwargs):
    """Capture stdout and truncate if too long to prevent LLM loops"""
    old_stdout = sys.stdout
    sys.stdout = mystdout = StringIO()
    try:
        func(*args, **kwargs)
    finally:
        sys.stdout = old_stdout
    
    output = mystdout.getvalue()
    if len(output) > max_length:
        output = output[:max_length] + f"\n\n... [Output truncated - showing first {max_length} characters]"
    return output




In [29]:
# Define tools with truncated outputs
@tool
def describe_pods_tool(namespace: Optional[str] = "gsoc") -> str:
    """Describe pods in a given Kubernetes namespace. Defaults to 'gsoc'."""
    return capture_stdout_truncated(describe_pods, 1500, namespace=namespace)

@tool
def namespace_gpu_util_tool(threshold: Optional[float] = 0.0) -> str:
    """Get average GPU utilization per namespace with optional threshold filter."""
    return capture_stdout_truncated(namespace_gpu_utilization, 1500, threshold=threshold)

@tool
def dcgm_gpu_inspect_tool(threshold: float = 0.0) -> str:
    """
    Inspect raw GPU usage with model name, host, pod, and utilization.
    Shows top 10 GPUs above threshold to prevent overwhelming output.
    """
    data = fetch_dcgm_gpu_util_data()
    if not data:
        return "⚠️ No GPU data available."

    filtered = [d for d in data if d["utilization"] >= threshold]
    if not filtered:
        return f"✅ No GPUs over {threshold}% utilization."

    # Limit to top 10 to prevent massive output
    top = sorted(filtered, key=lambda x: x["utilization"], reverse=True)[:10]
    rows = [
        [d["hostname"][:20], d["gpu_id"], d["model"][:25], f"{d['utilization']:.2f}%", d["namespace"], d["pod"][:20]]
        for d in top
    ]
    
    from tabulate import tabulate
    result = tabulate(rows, headers=["Host", "GPU", "Model", "Util%", "Namespace", "Pod"], tablefmt="grid")
    
    # Add summary info
    result += f"\n\nShowing top 10 of {len(filtered)} GPUs above {threshold}% threshold."
    return result

@tool
def calculate_dcgm_gpu_stats(threshold: float = 0.0) -> str:
    """
    Analyze GPU utilization across nodes and return statistical breakdown.
    Includes averages, idle/overloaded counts, and model/host distribution.
    """
    data = fetch_dcgm_gpu_util_data()
    if not data:
        return "⚠️ No GPU data available."

    filtered = [d for d in data if d["utilization"] >= threshold]
    total = len(filtered)
    if total == 0:
        return f"✅ No GPUs over the threshold of {threshold}% utilization."

    avg_util = sum(d["utilization"] for d in filtered) / total
    maxed = [d for d in filtered if d["utilization"] >= 99.0]
    idle = [d for d in filtered if d["utilization"] < 1.0]
    moderate = [d for d in filtered if 1.0 <= d["utilization"] < 70.0]
    available = [d for d in filtered if d["utilization"] < 100.0]
    unique_models = set(d["model"] for d in filtered)
    unique_hosts = set(d["hostname"] for d in filtered)

    return f"""📊 GPU Utilization Stats (threshold: {threshold}%):

🔍 Total GPUs: {total}
📈 Average Utilization: {avg_util:.2f}%
🔴 Fully Utilized (>=99%): {len(maxed)}
🟢 Idle (<1%): {len(idle)}
⚙️ Moderate (1-70%): {len(moderate)}
💻 Unique Hosts: {len(unique_hosts)}
🧠 Unique Models: {len(unique_models)}
🧮 Available (<100%): {len(available)}"""

In [30]:
class NRPModel:
    def __init__(self, client):
        self.client = client
        self.tools = []

    def bind_tools(self, tools):
        self.tools = tools
        return self

    def _convert_tool_to_openai_format(self, tool):
        """Convert LangChain tool to OpenAI tool format"""
        return {
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.args_schema.model_json_schema() if tool.args_schema else {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            }
        }

    def invoke(self, messages):
        # Convert messages to proper format
        formatted_messages = []
        for msg in messages:
            if hasattr(msg, 'content'):
                if msg.__class__.__name__ == "SystemMessage":
                    formatted_messages.append({"role": "system", "content": msg.content})
                elif msg.__class__.__name__ == "HumanMessage":
                    formatted_messages.append({"role": "user", "content": msg.content})
                elif msg.__class__.__name__ == "AIMessage":
                    formatted_messages.append({"role": "assistant", "content": msg.content})
                elif msg.__class__.__name__ == "ToolMessage":
                    # Truncate tool message content if too long
                    content = str(msg.content)
                    if len(content) > 2000:
                        content = content[:2000] + "\n[Content truncated...]"
                    formatted_messages.append({
                        "role": "tool", 
                        "content": content,
                        "tool_call_id": getattr(msg, 'tool_call_id', 'unknown')
                    })
            else:
                formatted_messages.append(msg)

        # Convert tools to OpenAI format
        openai_tools = None
        if self.tools:
            openai_tools = [self._convert_tool_to_openai_format(t) for t in self.tools]

        try:
            response = self.client.chat.completions.create(
                model="gemma3",
                temperature=0,
                messages=formatted_messages,
                tool_choice="auto" if openai_tools else None,
                tools=openai_tools,
            )

            choice = response.choices[0].message

            tool_calls = []
            if hasattr(choice, "tool_calls") and choice.tool_calls:
                for t in choice.tool_calls:
                    args = t.function.arguments
                    if isinstance(args, str):
                        try:
                            args = json.loads(args)
                        except json.JSONDecodeError:
                            args = {}
                    
                    tool_calls.append({
                        "name": t.function.name,
                        "args": args,
                        "id": t.id
                    })

            return AIMessage(
                content=choice.content or "",
                tool_calls=tool_calls
            )
        except Exception as e:
            return AIMessage(content=f"Error calling model: {str(e)}")

In [31]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]


# %%

In [32]:
class Agent:
    def __init__(self, model, tools, system: str = ""):
        self.system = system
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)
        self.max_iterations = 5  # Prevent infinite loops
        self.current_iteration = 0

        from langgraph.graph import StateGraph, END
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")

        self.raw_graph = graph
        self.graph = graph.compile()

    def exists_action(self, state: AgentState) -> bool:
        """Check if the last message has tool calls and we haven't exceeded max iterations"""
        if self.current_iteration >= self.max_iterations:
            print(f"⚠️ Reached max iterations ({self.max_iterations}). Stopping.")
            return False
            
        try:
            result = state["messages"][-1]
            return (hasattr(result, "tool_calls") and 
                    result.tool_calls is not None and 
                    len(result.tool_calls) > 0)
        except (IndexError, KeyError, AttributeError):
            return False

    def call_openai(self, state: AgentState) -> dict:
        messages = state["messages"]
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {"messages": [message]}

    def take_action(self, state: AgentState) -> dict:
        self.current_iteration += 1
        tool_calls = state["messages"][-1].tool_calls
        results = []
        
        for t in tool_calls:
            tool_name = t["name"]
            tool_args = t["args"]
            print(f"🔧 Calling tool: {tool_name} with args: {tool_args}")
            
            if tool_name not in self.tools:
                result = "❌ Tool name not recognized. Available tools: " + ", ".join(self.tools.keys())
            else:
                try:
                    result = self.tools[tool_name].invoke(tool_args)
                    # Ensure result is string and truncate if needed
                    result = str(result)
                    if len(result) > 3000:
                        result = result[:3000] + "\n\n[Output truncated to prevent loops]"
                except Exception as e:
                    result = f"❌ Tool error: {str(e)}"
            
            results.append(ToolMessage(tool_call_id=t["id"], name=tool_name, content=result))
        
        print("✅ Tool(s) executed. Returning to model.")
        return {"messages": results}

In [33]:
# %%
from langchain_core.messages import HumanMessage

# Updated system prompt to be more specific
system_prompt = """You are a Kubernetes monitoring assistant. 

Use these tools to answer questions:
- 'describe_pods_tool': View pod/container info in a namespace
- 'namespace_gpu_util_tool': View average GPU utilization per namespace  
- 'dcgm_gpu_inspect_tool': View detailed GPU metrics (top 10 results)
- 'calculate_dcgm_gpu_stats': Get statistical breakdown of GPU usage

IMPORTANT: Only call each tool ONCE per question. Use the tool output to provide a direct answer. Do not repeat tool calls."""

# Create agent with updated tools
model = NRPModel(client)
tools = [describe_pods_tool, namespace_gpu_util_tool, dcgm_gpu_inspect_tool, calculate_dcgm_gpu_stats]
abot = Agent(model=model, tools=tools, system=system_prompt)



In [34]:
# Reset iteration counter before each use
def ask_agent(question):
    abot.current_iteration = 0  # Reset counter
    messages = [HumanMessage(content=question)]
    response = abot.graph.invoke({"messages": messages})
    return response["messages"][-1].content

# Test cases
print("=== Test 1: List pods ===")
print(ask_agent("List pods in gsoc namespace"))

print("\n=== Test 2: GPU usage by namespace ===")  
print(ask_agent("Show me GPU usage across namespaces"))

print("\n=== Test 3: GPU statistics ===")
print(ask_agent("Give me overall GPU statistics for the cluster"))

=== Test 1: List pods ===
🔧 Calling tool: describe_pods_tool with args: {'namespace': 'gsoc'}
✅ Tool(s) executed. Returning to model.


Here are the pods currently running in the `gsoc` namespace:

1. **agno-deployment-55c55964db-lzhkx**  
   - Container: `jupyter`  
   - Node: `hcc-nrp-shor-c6013.unl.edu`

2. **my-postgres-cluster-0**  
   - Container: `postgres`  
   - Node: `k8s-gen4-02.ampath.net`

3. **shellshock-cluster-0**  
   - Container: `postgres`  
   - Node: `dtn-gpu2.kreonet.net`

Each pod is associated with its respective container and Kubernetes node host.

=== Test 2: GPU usage by namespace ===
🔧 Calling tool: namespace_gpu_util_tool with args: {'threshold': 0.0}
✅ Tool(s) executed. Returning to model.
🔧 Calling tool: namespace_gpu_util_tool with args: {'threshold': 0.0}
✅ Tool(s) executed. Returning to model.


Here's the GPU utilization breakdown across Kubernetes namespaces:

**GPU Usage Summary by Namespace**  
🟢 = Low utilization (<10% threshold)

| Namespace     

In [36]:
# Complex A100 analysis queries you can try:

# 1. Comprehensive A100 analysis
query1 = """
Analyze all modelName="NVIDIA A100 80GB PCIe" I want to know:
- How many A100s are available vs fully utilized
- Which namespaces are using A100s the most
- Show me the top A100s by utilization with their host details
- Give me overall statistics for A100s specifically
"""

# 2. A100 availability analysis
query2 = """
I need to deploy a new workload that requires A100 GPUs. Can you:
- Find all idle or low-utilization A100s (under 10% usage)
- Show me which hosts have available A100s
- Tell me which namespaces have the most A100 capacity available
"""

# 3. A100 performance comparison
query3 = """
Compare A100 usage patterns across different namespaces:
- Which namespace is using A100s most efficiently
- Are there any A100s that are consistently underutilized
- Show me the distribution of A100 utilization levels
"""

# 4. A100 resource optimization
query4 = """
Help me optimize A100 resource allocation:
- Find A100s with less than 50% utilization that could be reallocated
- Identify hosts with mixed A100 utilization (some high, some low on same host)
- Show me the overall A100 efficiency across the cluster
"""

# 5. Specific A100 investigation
query5 = """
I'm investigating A100 performance issues. Please:
- Show me all A100s with utilization above 95%
- Identify any A100s that might be stuck or problematic (0% utilization)
- Give me detailed host and pod information for the most utilized A100s
"""

# Test one of these complex queries
print("Testing complex A100 analysis query...")
response = ask_agent(query1)
print(response)

Testing complex A100 analysis query...
🔧 Calling tool: calculate_dcgm_gpu_stats with args: {'threshold': 0.0}
🔧 Calling tool: dcgm_gpu_inspect_tool with args: {'threshold': 0.0}
✅ Tool(s) executed. Returning to model.


### Analysis of NVIDIA A100 80GB PCIe GPUs

#### 1. **Available vs Fully Utilized A100s**
- **Fully Utilized (≥99%)**: **4 A100s** (all observed at **100% utilization** in the top GPU list).  
- **Available (<100%)**: **0 observed** in the top utilization list (all visible A100s are fully utilized).  
  *(Note: Total A100 count isn't explicitly provided in the data, but the top 10 GPUs include 4 A100s—all maxed out.)*

---

#### 2. **Top Namespace Using A100s**
- **`sdsc-llm`** is the **only namespace** observed using A100s (all 4 fully utilized GPUs).  
  - Workload: `llama3-3-vllm-inferer` (LLM inference pod).  
  - *No other namespaces are using A100s in the top utilization data.*

---

#### 3. **Top A100s by Utilization (Host Details)**
| Host                 | GPU 