In [None]:
import os
from datetime import datetime
from enum import Enum
from typing import Annotated, Awaitable, Callable, TypedDict

from dotenv import load_dotenv
from openai import AsyncAzureOpenAI
from pydantic import BaseModel

from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureTextEmbedding,
    OpenAIEmbeddingPromptExecutionSettings,
)
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.connectors.memory.azure_ai_search import (
    AzureAISearchCollection,
    AzureAISearchStore,
)
from semantic_kernel.contents import ChatHistory
from semantic_kernel.core_plugins import HttpPlugin, MathPlugin
from semantic_kernel.data import (
    VectorStoreRecordDataField,
    VectorStoreRecordKeyField,
    VectorStoreRecordVectorField,
    vectorstoremodel,
)
from semantic_kernel.filters import FilterTypes, FunctionInvocationContext
from semantic_kernel.functions import (
    kernel_function,
    KernelArguments,
    KernelParameterMetadata,
)
from semantic_kernel.prompt_template import KernelPromptTemplate, PromptTemplateConfig

# Setup
load_dotenv(override=True)

# Step 1: Create the kernel

In [None]:
kernel = Kernel()

### 🧪 Try It Out!

Below is a chat function using the kernel. With just the **kernel**, try to chat and observe the behavior.

> 📝 *Notice that you cannot do much — it's limited without additional orchestration or tools.*


In [None]:
# Define the chat function
async def chat():

    # Prepare the prompt configuration
    template = """
    You are a helpful assistant. Solve the problems for the user using tools when possible. 

    Previous information from chat:
    {{$chat_history}}

    User: {{$user_input}}
    Assistant:
    """

    # Create the chat history
    chat_history = ChatHistory()

    while True:
        user_input = input("Enter your message:")
        print(f"\nYou: {user_input}")
        if user_input.lower() in ["exit", "quit"]:
            break

        result = await kernel.invoke_prompt(
            prompt=template,
            arguments=KernelArguments(
                PromptExecutionSettings(function_choice_behavior=FunctionChoiceBehavior.Auto()), 
                user_input=user_input,
                chat_history = chat_history
            )
        )

        assistant_reply = str(result)

        chat_history.add_user_message(user_input)
        chat_history.add_assistant_message(assistant_reply)
        
        print(f"Bot: {assistant_reply}")


# Step 2: Add a chat completion service

In [None]:
async_openai_client = AsyncAzureOpenAI(
    api_key=os.getenv("AZURE_OPENAI_KEY"),
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_version=os.getenv("MODEL_DEPLOYMENT_API_VERSION"),
)

kernel.add_service(
    AzureChatCompletion(
        service_id="azure_openai_chat",
        async_client=async_openai_client,
        deployment_name=os.getenv("MODEL_DEPLOYMENT_NAME"),
    )
)

# 🚀 Test It!
print("--- 📋 Results ---")
prompt = "You are a helpful assistant. What's the capital of Japan?"
result = await kernel.invoke_prompt(
    prompt=prompt,
    service_id="azure_openai_chat"
)
print(str(result)) 

### 🧪 Try It Out!

Now that you've added a **chat completion service**, you can chat with the kernel!

> 🧠 Keep in mind: it can only respond based on what it was trained on — it doesn't know real-time facts.

🔍 **Try asking:**  
> *"What’s the date today?"*

You’ll see that the kernel can’t answer accurately — yet. Let’s fix that next!

# Step 3: Add some plugins

- ## Add a simple function

In [None]:
@kernel_function(description="Get the current date")
def get_current_date():
    return datetime.now()
    
get_current_date_fn = kernel.add_function(
    plugin_name="Utils",
    function=get_current_date
)

# 🚀 Test It!
print("--- 📋 Results ---")
result = await kernel.invoke(function=get_current_date_fn)
print(str(result)) 

- ## Add a plugin

In [None]:
class Brightness(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class LightModel(TypedDict):
    id: int
    name: str
    is_on: bool | None
    brightness: Brightness | None
    color: Annotated[str | None, "The color of the light with a hex code (ensure you include the # symbol)"]

class LightsPlugin:
    def __init__(self, lights: list[LightModel]):
        self._lights = lights

    @kernel_function
    async def get_lights(self) -> list[LightModel]:
        """Gets a list of lights and their current state."""
        return self._lights

    @kernel_function
    async def change_state(
        self,
        change_state: LightModel
    ) -> LightModel | None:
        """Changes the state of the light."""
        for light in self._lights:
            if light["id"] == change_state["id"]:
                light["is_on"] = change_state.get("is_on", light["is_on"])
                light["brightness"] = change_state.get("brightness", light["brightness"])
                light["hex"] = change_state.get("hex", light["hex"])
                return light
        return None

# Create dependencies for the plugin
lights = [
    {"id": 1, "name": "Table Lamp", "is_on": False, "brightness": 100, "hex": "FF0000"},
    {"id": 2, "name": "Porch light", "is_on": False, "brightness": 50, "hex": "00FF00"},
    {"id": 3, "name": "Chandelier", "is_on": True, "brightness": 75, "hex": "0000FF"},
]

# Create the plugin
lights_plugin = LightsPlugin(lights)

# Add the plugin to the kernel
kernel.add_plugin(lights_plugin)

# 🚀 Test It!
print("--- 📋 Results ---")
result = await kernel.invoke(
    plugin_name="LightsPlugin",
    function_name="get_lights"
)
print(str(result)) 

- ## Add an OpenApi plugin

In [None]:
plugin = kernel.add_plugin_from_openapi(
    plugin_name="Currency",
    openapi_document_path="../data/exchangerate_api.yaml",
    description="Currency conversion tool",
)

# 🚀 Test It!
print("--- 📋 Results ---")
args = KernelArguments()
args["from"] = "USD"
args["to"] = "JPY"
args["amount"] = 10.0
result = await kernel.invoke(
    plugin_name="Currency",
    function_name="convertCurrency",
    arguments=args
)
print(str(result)) 

- ## Add a function from prompt template

In [None]:
# Adding a semantic kernel core plugin
kernel.add_plugin(HttpPlugin(), "http")

# Defining a function from prompt template
function_definition = """
The weather is: {{http.getAsync "https://wttr.in/?format=4"}}

Answer in the format:
Location: Tokyo, Japan
Weather: Sunny (Add an emoji)
Temperature: +30°C
"""

prompt_template = KernelPromptTemplate(prompt_template_config=PromptTemplateConfig(template=function_definition))
kind_of_day = kernel.add_function(
    plugin_name="TimePlugin",
    function_name="kind_of_day",
    description="Describe the weather.",
    prompt_template=prompt_template,
)

# 🚀 Test It!
print("--- 📋 Results ---")
result = await kernel.invoke(function=kind_of_day)
print(result)

### 🧪 Try it out!

Now that you've added several plugins, try chatting with the kernel and see what it can do!

You now have access to the following capabilities:

- 📅 **Get the current date**
- 💡 **Control smart lights**
- 💱 **Convert currency values**
- 🌤️ **Get the current weather for your location**

💬 Try asking questions like:

- *"What is the date today?"*
- *"Turn on the table lamp and set its brightness to high."*
- *"How much is 10 USD in JPY?"*
- *"What's the weather like right now?"*


# Step 4: Add some filters

In [None]:
async def logger_filter(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    plugin = context.function.plugin_name
    function = context.function.name
    args = context.arguments
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    await next(context)

    if plugin:
        print("\n\033[95m" + "═" * 60)
        print(f"\033[90m🕒 [{timestamp}]\033[0m")
        print(f"🔧 \033[94mFunction Invoked:\033[0m \033[93m{plugin}.{function} with args{args}\033[0m")
        print(f"📤 \033[94mResult:\033[0m {context.result}")
        print("\033[95m" + "═" * 60 + "\033[0m\n")

kernel.add_filter(FilterTypes.FUNCTION_INVOCATION, logger_filter)

# 🚀 Test It!
print("--- 📋 Results ---")

kernel.add_plugin(MathPlugin(), plugin_name="math")
args = KernelArguments()
args["input"] = 1
args["amount"] = 1
result = await kernel.invoke(plugin_name="math", function_name="Add", arguments=args)
print(result)


### 🧪 Try it out! 
Now that you’ve added a logging filter to the kernel, try chatting with it again!
Observe how the log shows which functions are being invoked and their results. This helps you **debug**, **understand tool usage**, and **trace behavior** during execution.

🎯 Bonus Challenge

Add a **prompt rendering filter** to your kernel!

🛠️ Try to log the following:

- 📝 The fully rendered prompt text  
- 🤖 Which function is generating the prompt  
- 📦 Any input arguments passed

This will give you complete visibility into the kernel’s orchestration behavior.

# Step 5: Add memory connector to perform RAG

In [None]:
# Add a text embedding service
kernel.add_service(
    AzureTextEmbedding(
        service_id="azure_openai_embeddings",
        async_client=async_openai_client,
        deployment_name=os.getenv("EMBEDDING_MODEL_DEPLOYMENT_NAME"),
    ),
    overwrite=True
)

index_name = os.getenv("AZURE_SEARCH_INDEX_NAME")

# Define the data model
@vectorstoremodel
class HotelBrochures(BaseModel):
    id: Annotated[str, VectorStoreRecordKeyField()]
    city: Annotated[str | None, VectorStoreRecordDataField()] = None
    filename: Annotated[str | None, VectorStoreRecordDataField()] = None
    content: Annotated[
        str,
        VectorStoreRecordDataField(
            has_embedding=True, embedding_property_name="content_vector", is_full_text_searchable=True
        ),
    ] = None
    content_vector: Annotated[
        list[float] | None,
        VectorStoreRecordVectorField(
            dimensions=1536,
            local_embedding=True,
            embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
        ),
    ] = None


ai_search_store = AzureAISearchStore(
    search_endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),
    api_key=os.getenv("AZURE_SEARCH_KEY"),
)
collection: AzureAISearchCollection = ai_search_store.get_collection(index_name, HotelBrochures)

text_search = collection.create_text_search_from_vector_text_search().create_search(
        function_name="hotel_search_brochure",
        description="A search engine for hotel brochures in different cities. Use this tool to lookup for places to stay.",
        parameters=[
            KernelParameterMetadata(
                name="query", description="The question.", type="str", is_required=True, type_object=str
            ),
            KernelParameterMetadata(
                name="top",
                description="Number of results to return.",
                type="int",
                default_value=2,
                type_object=int,
            ),
        ]
    )

kernel.add_function(
    plugin_name="azure_ai_search",
    function=text_search,
)

# 🚀 Test It!
print("--- 📋 Results ---")
result = await kernel.invoke_prompt(
    prompt="Where can I stay in Las Vegas?",
    service_id="azure_openai_chat",
    arguments=KernelArguments(PromptExecutionSettings(function_choice_behavior=FunctionChoiceBehavior.Auto()))
)
print(str(result)) 

### 🧪 Try it out!

You’ve just added a **RAG (Retrieval-Augmented Generation)** plugin using your own hotel brochure data.

💡 Try asking:

- *"Can you find me the most expensive hotel?"*
- *"Any hotels for cultural appreciation?"*

🔍 Notice:
- It **doesn’t rely on general model knowledge**.
- It works even without **exact keyword matches**, thanks to **semantic search**.
