# Semantic Kernel 

In this code sample, you will use the [Semantic Kernel](https://aka.ms/ai-agents-beginners/semantic-kernel) AI Framework to create a basic agent. 

The goal of this sample is to show you the steps that we will later use in the additional code samples when implementing the different agentic patterns. 

## Import the Needed Python Packages 

In [1]:
import json
import os

from typing import Annotated

from dotenv import load_dotenv

from IPython.display import display, HTML

from openai import AsyncOpenAI

from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.contents import FunctionCallContent, FunctionResultContent, StreamingTextContent
from semantic_kernel.functions import kernel_function

## Creating the Client

In this sample, we will use [GitHub Models](https://aka.ms/ai-agents-beginners/github-models) for access to the LLM. 

The `ai_model_id` is defined as `gpt-4o-mini`. Try changing the model to another model available on the GitHub Models marketplace to see the different results. 

For us to use the `Azure Inference SDK` that is used for the `base_url` for GitHub Models, we will use the `OpenAIChatCompletion` connector within Semantic Kernel. There are also other [available connectors](https://learn.microsoft.com/semantic-kernel/concepts/ai-services/chat-completion) to use Semantic Kernel for other model providers.

In [2]:
import random
from typing import Annotated
from semantic_kernel.functions import kernel_function

class NiFiErrorPlugin:
    """Returns a random NiFi error for debugging or simulation."""

    def __init__(self):
        # Common NiFi error messages
        self.errors = [
            "S3 upload failed: Access Denied",
            "Kafka processor restart loop detected",
            "FlowFile queue overflow at connection: d365-outbound",
            "Processor 'ConsumeKafka' stuck in 'Validating' state",
            "Missing controller service: AWS_Credentials_Service",
            "Connection timeout to remote system",
            "JDBC connection pool exhausted",
            "NiFi UI unresponsive due to high CPU usage",
            "Deadlock detected between two processors",
            "Backpressure object threshold reached"
        ]
        self.last_error = None

    @kernel_function(description="Returns a random NiFi error message for simulation or testing.")
    def get_random_nifi_error(self) -> Annotated[str, "Returns a random NiFi error message"]:
        # Avoid repeating the same error
        available_errors = self.errors.copy()
        if self.last_error and len(available_errors) > 1:
            available_errors.remove(self.last_error)

        error = random.choice(available_errors)
        self.last_error = error

        return error


In [3]:
load_dotenv()
client = AsyncOpenAI(
    api_key=os.environ.get("GITHUB_TOKEN"), 
    base_url="https://models.inference.ai.azure.com/",
)

# Create an AI Service that will be used by the `ChatCompletionAgent`
chat_completion_service = OpenAIChatCompletion(
    ai_model_id="gpt-4o-mini",
    async_client=client,
)

## Creating the Agent 

Below we create the Agent called `TravelAgent`.

For this example, we are using very simple instructions. You can change these instructions to see how the agent responds differently. 

In [4]:
AGENT_INSTRUCTIONS = """You are a helpful AI Agent that assists engineers in simulating and understanding NiFi pipeline issues.

Important: When users describe a specific issue (e.g. "Kafka stuck" or "S3 upload failed"), focus your response on that specific case. Only suggest random NiFi errors when the user hasn't provided a clear problem.

When the conversation begins, introduce yourself with this message:
"Hello! I'm your NiFi Debug Agent. I can help simulate, analyze, or explain common NiFi pipeline issues. Here are some things you can ask me:
1. Simulate a random NiFi error
2. Diagnose a known issue (e.g. Kafka stuck, S3 upload failures)
3. Explain what a specific NiFi error message means
4. Suggest how to fix common flow problems

What NiFi issue are you facing or trying to simulate today?"

Always prioritize the user's input. If they mention a specific NiFi error or processor, address that directly instead of giving random messages.
"""
agent = ChatCompletionAgent(
    service=chat_completion_service,
    plugins=[NiFiErrorPlugin()],  # 🛠 nuovo plugin!
    name="NiFiDebugAgent",
    instructions=AGENT_INSTRUCTIONS,
)


## Running the Agents 

Now we can run the Agent by defining the `ChatHistory` and adding the `system_message` to it. We will use the `AGENT_INSTRUCTIONS` that we defined earlier. 

After these are defined, we create a `user_inputs` that will be what the user is sending to the agent. In this case, we have set this message to `Plan me a sunny vacation`. 

Feel free to change this message to see how the agent responds differently. 

In [5]:
import json
from IPython.display import display, HTML
from semantic_kernel.contents import FunctionCallContent, FunctionResultContent, StreamingTextContent

user_inputs = [
    "Simulate a NiFi error.",
    "Explain why Kafka might be stuck in restart loop.",
]

async def main():
    thread: ChatHistoryAgentThread | None = None

    for user_input in user_inputs:
        html_output = (
            f"<div style='margin-bottom:10px'>"
            f"<div style='font-weight:bold'>User:</div>"
            f"<div style='margin-left:20px'>{user_input}</div></div>"
        )

        agent_name = None
        full_response: list[str] = []
        function_calls: list[str] = []

        current_function_name = None
        argument_buffer = ""

        async for response in agent.invoke_stream(
            messages=user_input,
            thread=thread,
        ):
            thread = response.thread
            agent_name = response.name
            content_items = list(response.items)

            for item in content_items:
                if isinstance(item, FunctionCallContent):
                    if item.function_name:
                        current_function_name = item.function_name

                    if isinstance(item.arguments, str):
                        argument_buffer += item.arguments

                elif isinstance(item, FunctionResultContent):
                    if current_function_name:
                        formatted_args = argument_buffer.strip()
                        try:
                            parsed_args = json.loads(formatted_args)
                            formatted_args = json.dumps(parsed_args)
                        except Exception:
                            pass

                        function_calls.append(f"Calling function: {current_function_name}({formatted_args})")
                        current_function_name = None
                        argument_buffer = ""

                    function_calls.append(f"\nFunction Result:\n\n{item.result}")

                elif isinstance(item, StreamingTextContent) and item.text:
                    full_response.append(item.text)

        if function_calls:
            html_output += (
                "<div style='margin-bottom:10px'>"
                "<details>"
                "<summary style='cursor:pointer; font-weight:bold; color:#0066cc;'>Function Calls (click to expand)</summary>"
                "<div style='margin:10px; padding:10px; background-color:#f8f8f8; "
                "border:1px solid #ddd; border-radius:4px; white-space:pre-wrap; font-size:14px; color:#333;'>"
                f"{chr(10).join(function_calls)}"
                "</div></details></div>"
            )

        html_output += (
            "<div style='margin-bottom:20px'>"
            f"<div style='font-weight:bold'>{agent_name or 'Assistant'}:</div>"
            f"<div style='margin-left:20px; white-space:pre-wrap'>{''.join(full_response)}</div></div><hr>"
        )

        display(HTML(html_output))

await main()
