In [2]:
# Import necessary libraries
import os
import json
import asyncio
from typing import Dict, Any

from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureChatPromptExecutionSettings
)
from semantic_kernel.functions import kernel_function, KernelArguments
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.agents import AgentGroupChat, ChatCompletionAgent
from semantic_kernel.functions import KernelFunctionFromPrompt
from semantic_kernel.contents import ChatHistoryTruncationReducer
from semantic_kernel.agents.strategies import (
    KernelFunctionSelectionStrategy,
    KernelFunctionTerminationStrategy,
)

from dotenv import load_dotenv

# Load environment variables
load_dotenv()

True

In [3]:
def create_kernel():
    """Create a kernel with Azure OpenAI service"""
    kernel = Kernel()
    
    # Add Azure OpenAI service
    kernel.add_service(
        AzureChatCompletion(
            service_id="azure_openai",
            deployment_name=os.environ.get("AZURE_OPENAI_COMPLETION_DEPLOYMENT_NAME"),
            api_key=os.environ.get("AZURE_OPENAI_API_KEY"),
            endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
            api_version=os.environ.get("AZURE_OPENAI_API_VERSION"),
        )
    )
    
    return kernel

# Let's create our kernel
kernel = create_kernel()

In [5]:
REVIEWER_NAME = "Reviewer"
WRITER_NAME = "Writer"

execution_settings = AzureChatPromptExecutionSettings(
    service_id="azure_openai",
    max_tokens=1000,
)

reviewer = ChatCompletionAgent(
    kernel=kernel,
    name=REVIEWER_NAME,
    instructions="""Your responsibility is to review and identify how to improve user provided content.
If the user has provided input or direction for content already provided, specify how to address this input.
Never directly perform the correction or provide an example.
Once the content has been updated in a subsequent response, review it again until it is satisfactory.

RULES:
- Only identify suggestions that are specific and actionable.
- Verify previous suggestions have been addressed.
- Never repeat previous suggestions.",
)
"""
)

copywriter = ChatCompletionAgent(
    kernel=kernel,
    name=WRITER_NAME,
    instructions="""
Your sole responsibility is to rewrite content according to review suggestions.
- Always apply all review directions.
- Always revise the content in its entirety without explanation.
- Never address the user.  
    """,
    arguments=KernelArguments(settings=execution_settings)
)

In [6]:
chat = AgentGroupChat(agents=[reviewer, copywriter])

In [7]:
selection_function = KernelFunctionFromPrompt(
    function_name="selection", 
    prompt=f"""
Examine the provided RESPONSE and choose the next participant.
State only the name of the chosen participant without explanation.
Never choose the participant named in the RESPONSE.

Choose only from these participants:
- {REVIEWER_NAME}
- {WRITER_NAME}

Rules:
- If RESPONSE is user input, it is {REVIEWER_NAME}'s turn.
- If RESPONSE is by {REVIEWER_NAME}, it is {WRITER_NAME}'s turn.
- If RESPONSE is by {WRITER_NAME}, it is {REVIEWER_NAME}'s turn.

RESPONSE:
{{{{$lastmessage}}}}
"""
)

In [9]:
termination_keyword = "yes"

termination_function = KernelFunctionFromPrompt(
    function_name="termination", 
    prompt=f"""
Examine the RESPONSE and determine whether the content has been deemed satisfactory.
If the content is satisfactory, respond with a single word without explanation: {termination_keyword}.
If specific suggestions are being provided, it is not satisfactory.
If no correction is suggested, it is satisfactory.

RESPONSE:
{{{{$lastmessage}}}}
"""
)

In [10]:
history_reducer = ChatHistoryTruncationReducer(target_count=1)

In [11]:
chat = AgentGroupChat(
    agents=[reviewer, copywriter],
    selection_strategy=KernelFunctionSelectionStrategy(
        initial_agent=reviewer,
        function=selection_function,
        kernel=kernel,
        result_parser=lambda result: str(result.value[0]).strip() if result.value[0] is not None else WRITER_NAME,
        history_variable_name="lastmessage",
        history_reducer=history_reducer,
    ),
    termination_strategy=KernelFunctionTerminationStrategy(
        agents=[reviewer],
        function=termination_function,
        kernel=kernel,
        result_parser=lambda result: termination_keyword in str(result.value[0]).lower(),
        history_variable_name="lastmessage",
        maximum_iterations=10,
        history_reducer=history_reducer,
    ),
)

In [12]:
async def run(user_input: str):
    is_complete = False
    while not is_complete:
        print()

        user_input = input("User > ").strip()
        if not user_input:
            continue

        if user_input.lower() == "exit":
            is_complete = True
            break

        if user_input.lower() == "reset":
            await chat.reset()
            print("[Conversation has been reset]")
            continue

        # Try to grab files from the script's current directory
        if user_input.startswith("@") and len(user_input) > 1:
            file_name = user_input[1:]
            script_dir = os.path.dirname(os.path.abspath(__file__))
            file_path = os.path.join(script_dir, file_name)
            try:
                if not os.path.exists(file_path):
                    print(f"Unable to access file: {file_path}")
                    continue
                with open(file_path, "r", encoding="utf-8") as file:
                    user_input = file.read()
            except Exception:
                print(f"Unable to access file: {file_path}")
                continue

        # Add the current user_input to the chat
        await chat.add_chat_message(message=user_input)

        try:
            async for response in chat.invoke():
                if response is None or not response.name:
                    continue
                print()
                print(f"# {response.name.upper()}:\n{response.content}")
        except Exception as e:
            print(f"Error during chat invocation: {e}")

        # Reset the chat's complete flag for the new conversation round.
        chat.is_complete = False

In [None]:
is_complete = False
while not is_complete:
    print()

    user_input = input("User > ").strip()
    if not user_input:
        continue

    if user_input.lower() == "exit":
        is_complete = True
        break

    if user_input.lower() == "reset":
        await chat.reset()
        print("[Conversation has been reset]")
        continue

    # Try to grab files from the script's current directory
    if user_input.startswith("@") and len(user_input) > 1:
        file_name = user_input[1:]
        script_dir = os.path.dirname(os.path.abspath(__file__))
        file_path = os.path.join(script_dir, file_name)
        try:
            if not os.path.exists(file_path):
                print(f"Unable to access file: {file_path}")
                continue
            with open(file_path, "r", encoding="utf-8") as file:
                user_input = file.read()
        except Exception:
            print(f"Unable to access file: {file_path}")
            continue

    # Add the current user_input to the chat
    await chat.add_chat_message(message=user_input)

    try:
        async for response in chat.invoke():
            if response is None or not response.name:
                continue
            print()
            print(f"# {response.name.upper()}:\n{response.content}")
    except Exception as e:
        print(f"Error during chat invocation: {e}")

    # Reset the chat's complete flag for the new conversation round.
    chat.is_complete = False
