In [None]:
%env OPENAI_API_KEY="keyhere"

In [None]:
#!/bin/bash

helm upgrade --install argo-rollouts argo-rollouts \
  --repo https://argoproj.github.io/argo-helm \
  --version 2.37.6 \
  --namespace argo-rollouts \
  --create-namespace \
  --wait

kubectl apply -f- <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rollouts-demo
  labels:
    app: rollouts-demo
spec:
  replicas: 4
  selector:
    matchLabels:
      app: rollouts-demo
  template:
    metadata:
      labels:
        app: rollouts-demo
    spec:
      containers:
      - name: rollouts-demo
        image: argoproj/rollouts-demo:blue
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 1
---
apiVersion: v1
kind: Service
metadata:
  name: rollouts-demo
  labels:
    app: rollouts-demo
spec:
  ports:
  - port: 80
    targetPort: http
    protocol: TCP
    name: http
  selector:
    app: rollouts-demo
apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
  name: gateway
spec:
  gatewayClassName: istio
  listeners:
  - name: default
    port: 80
    protocol: HTTP
    allowedRoutes:
      namespaces:
        from: All
---
kind: HTTPRoute
apiVersion: gateway.networking.k8s.io/v1beta1
metadata:
  name: rollouts-demo
spec:
  parentRefs:
    - name: gateway
  rules:
  - matches:
    - path:
        type: PathPrefix
        value: /  
    backendRefs:
    - name: rollouts-demo
      kind: Service
      port: 80
EOF

In [5]:
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

from kagent.tools.argo import (
    CreateRolloutResource,
    GenerateResource,
    GetRollout,
    PauseRollout,
    PromoteRollout,
    SetRolloutImage,
    StatusRollout,
    VerifyArgoRolloutsControllerInstall,
    VerifyKubectlPluginInstall,
)
from kagent.tools.docs import Config, QueryTool
from kagent.tools.k8s import ApplyManifest, GetPods, GetResources, GetServices

model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
)

argo_agent = AssistantAgent(
    "argo_agent",
    description="An agent for Argo Rollouts operations",
    tools=[
        CreateRolloutResource(),
        GetRollout(),
        PauseRollout(),
        PromoteRollout(),
        SetRolloutImage(),
        StatusRollout(),
        VerifyKubectlPluginInstall(),
        VerifyArgoRolloutsControllerInstall(),
        GenerateResource(),
    ],
    model_client=model_client,
    system_message="""
    You are an Argo Rollouts agent, specialized in managing and automating the deployment of applications using Argo Rollouts 
    in a Kubernetes cluster. Your expertise includes creating, updating, and monitoring rollouts, as well as handling traffic
    management and canary deployments. Your job is to assist with Argo Rollouts operations efficiently and effectively. 
    DO NOT MAKE UP ADDITIONAL AND UNNECESSARY SUBTASKS.
    """,
)

docs_config = Config(
    docs_path="",  # empty string means the database will be downloaded
)

cncf_docs_agent = AssistantAgent(
    "kubernetes_docs_agent",
    description="This agent allows you to get data from the CNCF docs databases.",
    tools=[
        QueryTool(config=docs_config),
    ],
    model_client=model_client,
    system_message="""
    You are a support agent.

    You have access to several tools:
    - 'searchDocumentation' to search in the documentation.

    Execute all the following steps:
    1. Product identification
    - Check if you know what product the question is about (Only supported products are: 'kubernetes', 'istio', 'argo', 'helm', 'prometheus')
    - If it's not the case, ask what the product is

    2. Information Gathering
    - Search in the documentation for information related to the question the user has submitted

    3. Answer the question
    - Use all the information you gathered to provide a valuable answer to the user
    - Provide links to the documentation whenever possible`,
    """,
)

k8s_agent = AssistantAgent(
    "k8s_agent",
    description="An agent for k8s operations",
    tools=[GetPods(), GetServices(), GetResources(), ApplyManifest()],
    model_client=model_client,
    system_message="""
    You are a k8s agent. You know how to interact with a Kubernetes cluster.

    If you don't have any explicit tasks left to complete, return TERMINATE.
    """,
)

team = RoundRobinGroupChat(
    max_turns=3,
    participants=[k8s_agent, argo_agent, cncf_docs_agent],
    termination_condition=TextMentionTermination(text="TERMINATE"),
)

# task = "Create an Argo Rollout to deploy a new version of the demo application with the color purple using the Kubernetes Gateway API in my cluster."
task = "Check if there are any argo rollout in the cluster in the process of promotion?"
# task = "Use the Kubernetes Gateway API and Argo Rollouts to create rollout resources for the canary and stable services for the demo application in my cluster."
# task = "Check if the Argo Rollouts controller is running and in a healthy state in the cluster?"

# Use asyncio.run(...) if you are running this in a script.
await Console(team.run_stream(task=task))

---------- user ----------
Check if there are any argo rollout in the cluster in the process of promotion?
---------- k8s_agent ----------
[FunctionCall(id='call_1XGIGxSEMPMM3XDV99lyDFof', arguments='{"name":"","resource_type":"rollout","all_namespaces":true,"ns":null,"output":"wide"}', name='get_resources')]
---------- k8s_agent ----------
[FunctionExecutionResult(content='NAMESPACE   NAME               DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE\ndefault     demo-application   3                                            21h\n', call_id='call_1XGIGxSEMPMM3XDV99lyDFof')]
---------- k8s_agent ----------
NAMESPACE   NAME               DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
default     demo-application   3                                            21h

---------- argo_agent ----------
There is an Argo Rollout named "demo-application" in the "default" namespace with 3 desired replicas, but it currently doesn't have any up-to-date or available replicas, indicating it may no

TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Check if there are any argo rollout in the cluster in the process of promotion?', type='TextMessage'), ToolCallRequestEvent(source='k8s_agent', models_usage=RequestUsage(prompt_tokens=436, completion_tokens=33), content=[FunctionCall(id='call_1XGIGxSEMPMM3XDV99lyDFof', arguments='{"name":"","resource_type":"rollout","all_namespaces":true,"ns":null,"output":"wide"}', name='get_resources')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='k8s_agent', models_usage=None, content=[FunctionExecutionResult(content='NAMESPACE   NAME               DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE\ndefault     demo-application   3                                            21h\n', call_id='call_1XGIGxSEMPMM3XDV99lyDFof')], type='ToolCallExecutionEvent'), ToolCallSummaryMessage(source='k8s_agent', models_usage=None, content='NAMESPACE   NAME               DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE\ndefau

In [None]:
print(team.dump_component().model_dump_json(indent=2))