In [3]:
import os
import json

import requests
from autogen_agentchat.agents import AssistantAgent
from autogen_core.models import UserMessage
from autogen_ext.models.azure import AzureAIChatCompletionClient
from azure.core.credentials import AzureKeyCredential
from autogen_core import CancellationToken
from autogen_core.tools import FunctionTool
from autogen_agentchat.messages import TextMessage
from autogen_agentchat.ui import Console
from typing import Any, Callable, Set, Dict, List, Optional

In [11]:
client = AzureAIChatCompletionClient(
    model="gpt-4o-mini",
    endpoint="https://models.inference.ai.azure.com",
    # To authenticate with the model you will need to generate a personal access token (PAT) in your GitHub settings.
    # Create your PAT token by following instructions here: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens
    credential=AzureKeyCredential(os.environ["GITHUB_TOKEN"]),
    model_info={
        "json_output": True,
        "function_calling": True,
        "vision": True,
        "family": "unknown",
    },
)

result = await client.create([UserMessage(content="What is the capital of France?", source="user")])
print(result)

finish_reason='stop' content='The capital of France is Paris.' usage=RequestUsage(prompt_tokens=14, completion_tokens=7) cached=False logprobs=None thought=None


In [5]:
from typing import Dict, List, Optional


def get_weather(city: str = "new york") -> Optional[Dict]:
    """
    Fetches weather forecast from Tomorrow.io API
    
    Args:
        city (str): City name for weather forecast (default: "new york")
        
    Returns:
        Dict containing weather data or None if the request fails
    """
    # URL encode the city name
    encoded_city = requests.utils.quote(city)
    url = f"https://api.tomorrow.io/v4/weather/realtime?location={encoded_city}&apikey=5RXcTkDdWgQjHr8Ot9sLwQWtGQLfe4RM"

    headers = {
        "accept": "application/json",
        "accept-encoding": "deflate, gzip, br"
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        print(f"Error fetching weather data: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON response: {e}")
        return None

In [7]:
def vacation_destinations(city: str) -> tuple[str, str]:
    """
    Checks if a specific vacation destination is available
    
    Args:
        city (str): Name of the city to check
        
    Returns:
        tuple: Contains city name and availability status ('Available' or 'Unavailable')
    """
    destinations = {
        "Barcelona": "Available",
        "Tokyo": "Unavailable",
        "Cape Town": "Available",
        "Vancouver": "Available",
        "Dubai": "Unavailable",
    }

    if city in destinations:
        return city, destinations[city]
    else:
        return city, "City not found"

# Example usage:
# city, status = vacation_destinations("Barcelona")
# print(f"How about visiting {city}? It's currently {status} there!")

In [8]:
get_vacations = FunctionTool(
    vacation_destinations, description="Search for vacation destinations and if they are availabe or not."
)

In [12]:
agent = AssistantAgent(
    name="assistant",
    model_client=client,
    tools=[get_vacations],
    system_message="You are a travel agent that helps users find vacation destinations.",
    reflect_on_tool_use=True,
)

In [14]:
async def assistant_run() -> None:
    response = await agent.on_messages(
        [TextMessage(content="I would like to take a trip to Tokoyo", source="user")],
        cancellation_token=CancellationToken(),
    )
    print(response.inner_messages)
    print(response.chat_message)


# Use asyncio.run(assistant_run()) when running in a script.
await assistant_run()

[ToolCallRequestEvent(source='assistant', models_usage=RequestUsage(prompt_tokens=362, completion_tokens=17), content=[FunctionCall(id='call_opch2ZMS86SkDU8L1MNal1hZ', arguments='{"city":"Tokyo"}', name='vacation_destinations')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='assistant', models_usage=None, content=[FunctionExecutionResult(content="('Tokyo', 'Unavailable')", call_id='call_opch2ZMS86SkDU8L1MNal1hZ')], type='ToolCallExecutionEvent')]
source='assistant' models_usage=RequestUsage(prompt_tokens=352, completion_tokens=65) content="It looks like there are currently no available vacation options for Tokyo. However, if you're flexible about your destination or have any other places in mind, I can help you explore different travel options! Alternatively, I can provide information about Tokyo if you're planning a trip in the future. Just let me know how you'd like to proceed!" type='TextMessage'
