# The Semantic Kernel Agents Framework

It is part of the core Semantic Kernel framework, provides structured components for defining AI-driven workflows, enabling agents to interact with users, APIs, and external services.

## Core concepts
- **Agents**: They use language models, functions, and memory to make decisions dynamically.
- **Agent collaboration**: Agents (of different types) can collaborate together through an agent group chat. Agent group chats determine which agent should respond and how to determine if the conversation is finished.
- **Kernel**: central component of the Semantic Kernel, acts as the execution engine, managing AI interactions, function orchestration, and memory.
- **Tools and plugins**: Agents use tools and plugins to perform specific tasks.
- **History**: Agents can maintain chat history across multiple interactions, allowing them to track previous interactions and adapt responses accordingly. The conversation history is always accessible by the agents, either as a whole or for a specific agent's chat history.

## Types of agents
- **Azure AI Agent**: a specialized agent within the Semantic Kernel Agent Framework. The AzureAIAgent also supports a variety of built-in tools, including file retrieval, code execution, and data interaction via Bing, Azure AI Search, Azure Functions, and OpenAPI.
- **Chat Completion Agent**: designed for chat completion and conversation interfaces. The ChatCompletionAgent type mirrors the features and patterns in the underlying AI Service to support natural language processing, contextual understanding, and dialogue management.
- **OpenAI Assistant Agent**: designed for more advanced capabilities and multi-step tasks. The OpenAIAssistantAgent type supports goal-driven interactions with additional features like code interpretation and file search.

## Agent group chat
- **Single-turn conversation**: a designated agent provides a response based on user input.
    - You can invoke a response from a single-turn chat by using AgentGroupChat.invoke and specifying the agent that should respond
- **Multi-turn conversation**: multiple agents take turns responding, continuing the conversation until a termination condition is met.
    - Agent responses are returned asynchronously as they are generated, allowing the conversation to unfold in real-time.
    - You can invoke a response from a multi-turn chat by using `AgentGroupChat.invoke`.

## Agent selection strategy
It's important to choose the agent that's best suited to respond to a user's query, especially in multi-agent systems where the agents specialize in different domains.

**Single-turn conversations**: 
- **Intent recognition**: The framework analyzes the user's query to identify the intent and match it with the most relevant agent.
- **Predefined rules**: Developers can configure routing rules to direct specific queries to designated agents in their application.

**Multi-turn conversations**:
- **Context tracking**: The framework maintains a record of the conversation history to understand the user's intent and select the appropriate agent.
- **Dynamic switching**: If the topic shifts, the framework dynamically switches to an agent specializing in the new domain in the middle of the conversation.

The selection strategy is defined within the framework either by using predefined selection strategy or by extending a `SelectionStrategy` class. Predefined are:
- `SequentialSelectionStrategy`: select the agent based on the order in which the agents were added to the chat, has the option to specify an initial agent.
- `KernelFunctionSelectionStrategy`: allows you to define your selection strategy by creating a kernel function from a prompt.
- `SelectionStrategy`: base class with an overridable `select_agent` method that returns the agent name.

Truncating chat history: To reduce token usage and help improve performance use the `KernelFunctionSelectionStrategy.history_reducer` parameter.

## Termination strategy
A termination strategy ensures that conversations or tasks conclude appropriately. 

Each termination strategy supports a maximum_iterations parameter that will end the chat after a maximum number of iterations. The default value is 99 iterations.

- `DefaultTerminationStrategy`: will only terminate after the specified number of maximum iterations.
- `KernelFunctionTerminationStrategy`: allows you to define your termination strategy by creating a kernel function from a prompt. This class requires a `result_parser` parameter, which is a Boolean function that processes the output of your prompt function to determine whether the termination condition has been met.
- `TerminationStrategy`: base class with an overridable `should_agent_terminate` method that returns a Boolean.

Truncating chat history: To reduce token usage and help improve performance use the `KernelFunctionTerminationStrategy.history_reducer` parameter.

## Conversation state
The AgentGroupChat state updates to 'completed' once it meets the termination criteria.

To keep using the same chat instance, you'll need to reset the completion state to False. Without a state reset, the AgentGroupChat can't accept new interactions.

When a conversation hits the maximum number of iterations allowed, the conversation will end but won't be marked as 'completed'. In this case, you can extend the conversation without resetting the conversation state.

# Setup

In [7]:
import os
import textwrap
from datetime import datetime
from dotenv import load_dotenv
from pathlib import Path

from azure.identity.aio import DefaultAzureCredential
from semantic_kernel.agents import AgentGroupChat
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings
from semantic_kernel.agents.strategies import TerminationStrategy, SequentialSelectionStrategy
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.functions.kernel_function_decorator import kernel_function

load_dotenv()

AGENT_PROJECT_ENDPOINT = os.getenv("AGENT_PROJECT_ENDPOINT")
AGENT_MODEL_DEPLOYMENT = os.getenv("AGENT_MODEL_DEPLOYMENT")

In [12]:
ai_agent_settings = AzureAIAgentSettings(
    endpoint=AGENT_PROJECT_ENDPOINT,
    model_deployment_name=AGENT_MODEL_DEPLOYMENT,
)

project_client = AzureAIAgent.create_client(
    endpoint=AGENT_PROJECT_ENDPOINT,
    credential=DefaultAzureCredential(
        exclude_environment_credential=True,
        exclude_managed_identity_credential=True
    )
)

In [10]:
# Agent parameters

INCIDENT_MANAGER = "INCIDENT_MANAGER"
INCIDENT_MANAGER_INSTRUCTIONS = """
Analyze the given log file or the response from the devops assistant.
Recommend which one of the following actions should be taken:

Restart service {service_name}
Rollback transaction
Redeploy resource {resource_name}
Increase quota

If there are no issues or if the issue has already been resolved, respond with "INCIDENT_MANAGER > No action needed."
If none of the options resolve the issue, respond with "Escalate issue."

RULES:
- Do not perform any corrective actions yourself.
- Read the log file on every turn.
- Prepend your response with this text: "INCIDENT_MANAGER > {logfilepath} | "
- Only respond with the corrective action instructions.
"""

DEVOPS_ASSISTANT = "DEVOPS_ASSISTANT"
DEVOPS_ASSISTANT_INSTRUCTIONS = """
Read the instructions from the INCIDENT_MANAGER and apply the appropriate resolution function. 
Return the response as "{function_response}"
If the instructions indicate there are no issues or actions needed, 
take no action and respond with "No action needed."

RULES:
- Use the instructions provided.
- Do not read any log files yourself.
- Prepend your response with this text: "DEVOPS_ASSISTANT > "
"""

In [11]:
# Plugin setup

# class for DevOps functions
class DevopsPlugin:
    """A plugin that performs developer operation tasks."""
    
    def append_to_log_file(self, filepath: str, content: str) -> None:
        with open(filepath, 'a', encoding='utf-8') as file:
            file.write('\n' + textwrap.dedent(content).strip())

    @kernel_function(description="A function that restarts the named service")
    def restart_service(self, service_name: str = "", logfile: str = "") -> str:
        log_entries = [
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: Multiple failures detected in {service_name}. Restarting service.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO  {service_name}: Restart initiated.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO  {service_name}: Service restarted successfully.",
        ]

        log_message = "\n".join(log_entries)
        self.append_to_log_file(logfile, log_message)
        
        return f"Service {service_name} restarted successfully."

    @kernel_function(description="A function that rollsback the transaction")
    def rollback_transaction(self, logfile: str = "") -> str:
        log_entries = [
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: Transaction failure detected. Rolling back transaction batch.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   TransactionProcessor: Rolling back transaction batch.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   Transaction rollback completed successfully.",
        ]

        log_message = "\n".join(log_entries)
        self.append_to_log_file(logfile, log_message)
        
        return "Transaction rolled back successfully."

    @kernel_function(description="A function that redeploys the named resource")
    def redeploy_resource(self, resource_name: str = "", logfile: str = "") -> str:
        log_entries = [
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: Resource deployment failure detected in '{resource_name}'. Redeploying resource.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   DeploymentManager: Redeployment request submitted.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   DeploymentManager: Service successfully redeployed, resource '{resource_name}' created successfully.",
        ]

        log_message = "\n".join(log_entries)
        self.append_to_log_file(logfile, log_message)
        
        return f"Resource '{resource_name}' redeployed successfully."

    @kernel_function(description="A function that increases the quota")
    def increase_quota(self, logfile: str = "") -> str:
        log_entries = [
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: High request volume detected. Increasing quota.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   APIManager: Quota increase request submitted.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO   APIManager: Quota successfully increased to 150% of previous limit.",
        ]

        log_message = "\n".join(log_entries)
        self.append_to_log_file(logfile, log_message)

        return "Successfully increased quota."

    @kernel_function(description="A function that escalates the issue")
    def escalate_issue(self, logfile: str = "") -> str:
        log_entries = [
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: Cannot resolve issue.",
            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ALERT  DevopsAssistant: Requesting escalation.",
        ]
        
        log_message = "\n".join(log_entries)
        self.append_to_log_file(logfile, log_message)
        
        return "Submitted escalation request."


# class for Log File functions
class LogFilePlugin:
    """A plugin that reads and writes log files."""

    @kernel_function(description="Accesses the given file path string and returns the file contents as a string")
    def read_log_file(self, filepath: str = "") -> str:
        with open(filepath, 'r', encoding='utf-8') as file:
            return file.read()

In [13]:
# Create the incident manager agent on the Azure AI agent service
incident_agent_definition = await project_client.agents.create_agent(
     model=ai_agent_settings.model_deployment_name,
     name=INCIDENT_MANAGER,
     instructions=INCIDENT_MANAGER_INSTRUCTIONS
)

# Create a Semantic Kernel agent for the Azure AI incident manager agent
agent_incident = AzureAIAgent(
     client=project_client,
     definition=incident_agent_definition,
     plugins=[LogFilePlugin()]
)

# Create the devops agent on the Azure AI agent service
devops_agent_definition = await project_client.agents.create_agent(
     model=ai_agent_settings.model_deployment_name,
     name=DEVOPS_ASSISTANT,
     instructions=DEVOPS_ASSISTANT_INSTRUCTIONS,
)

# Create a Semantic Kernel agent for the devops Azure AI agent
agent_devops = AzureAIAgent(
     client=project_client,
     definition=devops_agent_definition,
     plugins=[DevopsPlugin()]
)

In [19]:
# Agent Selection Strategy
class SelectionStrategy(SequentialSelectionStrategy):
     """A strategy for determining which agent should take the next turn in the chat."""
     async def select_agent(self, agents, history):
          """"Check which agent should take the next turn in the chat."""
          # The Incident Manager should go after the User or the Devops Assistant
          if (history[-1].name == DEVOPS_ASSISTANT or history[-1].role == AuthorRole.USER):
               agent_name = INCIDENT_MANAGER
               return next((agent for agent in agents if agent.name == agent_name), None)
          # Otherwise it is the Devops Assistant's turn
          return next((agent for agent in agents if agent.name == DEVOPS_ASSISTANT), None)

# Agent Termination Strategy
class ApprovalTerminationStrategy(TerminationStrategy):
     """A strategy for determining when an agent should terminate."""
     async def should_agent_terminate(self, agent, history):
          """Check if the agent should terminate."""
          return "no action needed" in history[-1].content.lower()

In [20]:
chat = AgentGroupChat(
     agents=[agent_incident, agent_devops],
     termination_strategy=ApprovalTerminationStrategy(
         agents=[agent_incident], 
         maximum_iterations=10, 
         automatic_reset=True
     ),
     selection_strategy=SelectionStrategy(agents=[agent_incident,agent_devops]),      
)

# Execution

In [21]:
# Current log file
logfile = "data/logs/log1.log"
with open(logfile, "r") as f:
    print(f.read())

[2025-02-21 10:05:12] INFO  ServiceX: Initialization complete.
[2025-02-21 10:15:45] ERROR  ServiceX: Connection timeout with DatabaseY
[2025-02-21 10:16:30] ERROR  ServiceX: Connection retry failed.
[2025-02-21 10:18:14] ERROR  ServiceX: Critical failure - unable to connect to DatabaseY.



In [22]:
logfile_msg = ChatMessageContent(role=AuthorRole.USER, content=f"USER > {logfile}")
print(f"\nReady to process log file: {logfile}\n")


Ready to process log file: data/logs/log1.log



In [23]:
# Append the current log file to the chat
await chat.add_chat_message(logfile_msg)
print()




In [24]:
async for response in chat.invoke():
     if response is None or not response.name:
         continue
     print(f"{response.content}")

INCIDENT_MANAGER > data/logs/log1.log | Restart service ServiceX
DEVOPS_ASSISTANT > Service ServiceX restarted successfully.
INCIDENT_MANAGER > data/logs/log1.log | No action needed.


In [26]:
# Log file after the run
with open(logfile, "r") as f:
    print(f.read())

[2025-02-21 10:05:12] INFO  ServiceX: Initialization complete.
[2025-02-21 10:15:45] ERROR  ServiceX: Connection timeout with DatabaseY
[2025-02-21 10:16:30] ERROR  ServiceX: Connection retry failed.
[2025-02-21 10:18:14] ERROR  ServiceX: Critical failure - unable to connect to DatabaseY.

[2025-06-29 16:14:58] ALERT  DevopsAssistant: Multiple failures detected in ServiceX. Restarting service.
[2025-06-29 16:14:58] INFO  ServiceX: Restart initiated.
[2025-06-29 16:14:58] INFO  ServiceX: Service restarted successfully.


# Cleaning up

In [25]:
await project_client.agents.delete_agent(incident_agent_definition.id)
await project_client.agents.delete_agent(devops_agent_definition.id)