In [None]:
import os
import asyncio
import json
import re
import textwrap
from dotenv import load_dotenv
from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent, AgentGroupChat
from semantic_kernel.agents.strategies import KernelFunctionTerminationStrategy, KernelFunctionSelectionStrategy
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.functions import KernelFunctionFromPrompt, KernelArguments
from pydantic import BaseModel, Field
from typing import List
from IPython.display import display, Markdown



load_dotenv()

True

In [None]:
observability = False
if observability :
    import asyncio
import logging

from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider




# Endpoint to the Aspire Dashboard
endpoint = "http://localhost:4317"

# Create a resource to represent the service/sample
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "telemetry-aspire-dashboard-quickstart"})


def set_up_logging():
    exporter = OTLPLogExporter(endpoint=endpoint)

    # Create and set a global logger provider for the application.
    logger_provider = LoggerProvider(resource=resource)
    # Log processors are initialized with an exporter which is responsible
    # for sending the telemetry data to a particular backend.
    logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
    # Sets the global default logger provider
    set_logger_provider(logger_provider)

    # Create a logging handler to write logging records, in OTLP format, to the exporter.
    handler = LoggingHandler()
    # Add filters to the handler to only process records from semantic_kernel.
    handler.addFilter(logging.Filter("semantic_kernel"))
    # Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
    # Events from all child loggers will be processed by this handler.
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)


def set_up_tracing():
    exporter = OTLPSpanExporter(endpoint=endpoint)

    # Initialize a trace provider for the application. This is a factory for creating tracers.
    tracer_provider = TracerProvider(resource=resource)
    # Span processors are initialized with an exporter which is responsible
    # for sending the telemetry data to a particular backend.
    tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
    # Sets the global default tracer provider
    set_tracer_provider(tracer_provider)


def set_up_metrics():
    exporter = OTLPMetricExporter(endpoint=endpoint)

    # Initialize a metric provider for the application. This is a factory for creating meters.
    meter_provider = MeterProvider(
        metric_readers=[PeriodicExportingMetricReader(exporter, export_interval_millis=5000)],
        resource=resource,
        views=[
            # Dropping all instrument names except for those starting with "semantic_kernel"
            View(instrument_name="*", aggregation=DropAggregation()),
            View(instrument_name="semantic_kernel*"),
        ],
    )
    # Sets the global default meter provider
    set_meter_provider(meter_provider)


# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()



  

In [None]:
class ContentOutput(BaseModel):
    flightnumber: str
    gate: str

In [None]:
service_id: str = "default_service_id"
def create_kernel(service_id= service_id) -> Kernel:
    kernel = Kernel()
    kernel.add_service(AzureChatCompletion(service_id=service_id))


    return kernel

kernel = create_kernel()

In [None]:
from utils.agent_instructions import (
    FLIGHT_INFO_PROVIDER_AGENT_INSTRUCTIONS,
    AIRPORT_GATE_COORDINATOR_AGENT_INSTRUCTIONS

)

def format_instruction(instruction_template: str, subject: str) -> str:
    return instruction_template.format(subject=subject)

subject = "We need to figure out the best use of gate space for the flights provided."

flight_info_instr = format_instruction(FLIGHT_INFO_PROVIDER_AGENT_INSTRUCTIONS, subject)
gate_coordindator_instr = format_instruction(AIRPORT_GATE_COORDINATOR_AGENT_INSTRUCTIONS, subject)


In [None]:
class Flight(BaseModel):
            arriveOrDepart: str
            airline: str
            flightNumber: str
            gate: str
            terminal: str
            scheduledDepartureDate: str
            scheduledArrivalDate: str
            estimatedDepartureDate: str
            estimatedArrivalDate: str
            originDate: str
            aircraftTypeIATA: str
            aircraftRegistrationNumber: str
            departureAirport: str
            arrivalAirport: str

In [None]:
from semantic_kernel.functions import kernel_function
import requests
from bs4 import BeautifulSoup
from typing import List
import os

class FlightInfoPlugin:
    def __init__(self):
        self.base_url = "https://api.phx.aero/open-data/FlightInformation/details"
        self.api_key = "c663f585665c45fdb2507b3f677b4842"
    
    def _convert_to_flight_objects(self, data: List[dict]) -> List[Flight]:
        flights = []
        for item in data:
            flight = Flight(
                arriveOrDepart=item.get('arriveOrDepart', ''),
                airline=item.get('airline', ''),
                flightNumber=item.get('flightNumber', ''),
                gate=item.get('gate', ''),
                terminal=item.get('terminal', ''),
                scheduledDepartureDate=item.get('scheduledDepartureDate', ''),
                scheduledArrivalDate=item.get('scheduledArrivalDate', ''),
                estimatedDepartureDate=item.get('estimatedDepartureDate', ''),
                estimatedArrivalDate=item.get('estimatedArrivalDate', ''),
                originDate=item.get('originDate', ''),
                aircraftTypeIATA=item.get('aircraftTypeIATA', ''),
                aircraftRegistrationNumber=item.get('aircraftRegistrationNumber', ''),
                departureAirport=item.get('departureAirport', ''),
                arrivalAirport=item.get('arrivalAirport', '')
            )
            flights.append(flight)
        return flights

    @kernel_function(
        name="GetFlightDetails",
        description="Fetches flight information for a given date range."
    )
    async def GetFlightDetails(self) -> List[Flight]:
        print("Fetching flight information...")
        from_date = "2025-04-23"
        to_date = "2025-04-23"
        url = f"{self.base_url}/daterange?fromDate={from_date}&toDate={to_date}&subscription-key=c663f585665c45fdb2507b3f677b4842"
        
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            data = response.json()
            # Filter for specific gates
            valid_gates = ['B21', 'B22', 'B23', 'B24', 'B25']
            filtered_data = [flight for flight in data if flight.get('gate') in valid_gates]
            # Convert to Flight objects and return first 5 records
            return self._convert_to_flight_objects(filtered_data[:5])
        except requests.exceptions.RequestException as e:
            print(f"Error fetching flight information: {e}")
            return []

client = FlightInfoPlugin()

flight_info_tool = FlightInfoPlugin()


In [None]:
class GateAssignmentPlugin:
    def __init__(self):
# Define Flight model using Pydantic
        
        # Mock data for demonstration
        self.gate_assignments = [
            {'terminal': 2, 'gate': 'B21', 'currentFlight': Flight},
            {'terminal': 2, 'gate': 'B22', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'B23', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'B24', 'currentFlight': Flight},
            {'terminal': 4, 'gate': 'B25', 'currentFlight': Flight},
            {'terminal': 2, 'gate': 'B26', 'currentFlight': Flight},
            {'terminal': 2, 'gate': 'B27', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'B28', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'E1', 'currentFlight': Flight},
            {'terminal': 4, 'gate': 'E2', 'currentFlight': Flight},
            {'terminal': 2, 'gate': 'E3', 'currentFlight': Flight},
            {'terminal': 2, 'gate': 'E4', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'E5', 'currentFlight': Flight},
            {'terminal': 4, 'gate': 'F1', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'F2', 'currentFlight': Flight},
            {'terminal': 4, 'gate': 'F3', 'currentFlight': Flight},
            {'terminal': 3, 'gate': 'F4', 'currentFlight': Flight},
            {'terminal': 4, 'gate': 'F5', 'currentFlight': Flight},
        ]

        
        # Allowed aircraft types per gate
        self.allowed_aircraft_types = {
            "B21": ["319", "320", "321", "737", "E75", "E90", "M80", "M88"],
            "B22": ["E70", "E75", "319", "32D", "320", "32A", "32N", "738", "73H"],
            "B23": ["319", "320", "321", "737", "738", "739", "752", "753", "CRJ", "DC9", "E70", "E75", "E75L", "E75S", "E90", "ER4", "M80", "M88", "M89", "M90"],
            "B24": ["E70", "E75", "E75L", "E75S", "E90", "319", "320", "737"],
            "B25": ["319", "320", "321", "332", "333", "351", "737", "738", "739", "744", "752", "753", "762", "763", "772", "773", "788", "789", "B727", "E70", "E75", "E75L", "E75S", "E90", "ER4", "M80"],
            "E1": ["319", "320", "321", "738", "73G", "752", "753", "7M9", "CR2", "CR5", "CR7", "CR9", "E70", "E75", "E75L", "E75S", "E90", "E75", "ER3", "ER4"],
            "E2": ["319", "320", "321", "738", "73G", "7M9", "CR2", "CR7", "CR9", "E75", "ER3", "ER4"],
            "E3": ["319", "320", "321", "738", "73G", "752", "7M9", "CR2", "CR5", "CR7", "CR9", "E70", "E75", "E75L", "E75S", "E90", "E75", "ER3", "ER4"],
            "E4": ["319", "320", "321", "738", "73G", "752", "753", "762", "763", "7M9", "E75", "E90"],
            "E5": ["319", "320", "321", "738", "73G", "752", "753", "7M9", "CR2", "CR5", "CR7", "CR9", "E70", "E75", "E90"],
            "F1": ["319", "320", "321", "717", "737", "738", "739", "CR7", "CR9", "E75", "E90"],
            "F2": ["319", "320", "321", "717", "737", "738", "739", "752", "753", "CR2", "CR7", "CR9", "E75", "E90"],
            "F3": ["221", "223", "319", "320", "321", "717", "737", "738", "739", "CR7", "CR9", "E75", "E90"],
            "F4": ["221", "223", "319", "320", "321", "717", "737", "738", "739", "CR7", "CR9", "E75", "E90", "ER3"],
            "F5": ["221", "223", "319", "320", "321", "717", "737", "738", "739", "E75", "E90", "ER3"]
        }

    def is_aircraft_allowed(self, gate: str, aircraft_type: str) -> bool:
        """
        Check if an aircraft type is allowed at a specific gate.
        """
        if gate not in self.allowed_aircraft_types:
            return False
        return aircraft_type in self.allowed_aircraft_types[gate]

    @kernel_function(
        name="GetGateAssignments",
        description="Retrieves current gate assignments and aircraft information"
    )
    async def get_gate_assignments(self, gate_number: str = None) -> dict:
        """
        Fetches current gate assignments and aircraft information.
        
        Args:
            gate_number (str, optional): Specific gate to query. If None, returns all gates.
            
        Returns:
            dict: JSON response containing gate assignment information.
        """
        if gate_number:
            assignments = [gate for gate in self.gate_assignments if gate['gate'] == gate_number]
            if assignments and gate_number in self.allowed_aircraft_types:
                assignments[0]['allowedAircraftTypes'] = self.allowed_aircraft_types[gate_number]
            return assignments
        
        # Add allowed aircraft types to each gate assignment
        result = []
        for gate in self.gate_assignments:
            gate_info = gate.copy()
            if gate['gate'] in self.allowed_aircraft_types:
                gate_info['allowedAircraftTypes'] = self.allowed_aircraft_types[gate['gate']]
            result.append(gate_info)
        return result

    @kernel_function(
        name="AssignFlight",
        description="Assigns a flight to a specific gate"
    )
    async def assign_flight(self, gate: str, flight: str, aircraft_type: str) -> dict:
        """
        Assigns a flight to a specific gate.
        
        Args:
            gate (str): Gate number to assign
            flight (str): Flight number to assign
            aircraft_type (str): Type of aircraft
            
        Returns:
            dict: Updated gate assignment or error message
        """
        # Validate aircraft type compatibility
        if not self.is_aircraft_allowed(gate, aircraft_type):
            return {
                "error": f"Aircraft type {aircraft_type} is not allowed at gate {gate}. " 
                f"Allowed types: {', '.join(self.allowed_aircraft_types.get(gate, []))}"
            }

        # Assign flight if aircraft type is compatible
        for g in self.gate_assignments:
            if g['gate'] == gate:
                g['currentFlight'] = flight
                g['currentAircraftType'] = aircraft_type
                g['allowedAircraftTypes'] = self.allowed_aircraft_types[gate]
                return g
        return {"error": f"Gate {gate} not found"}

gate_assignment_tool = GateAssignmentPlugin()

In [None]:
selection_function = KernelFunctionFromPrompt(
    function_name="selection",
    prompt="""
    Based on the conversation history, determine the next agent to take a turn. 
    Only return the name of the agent from the following list:
    - FlightInfoRetriever
    - GateCoordinator


    Rules:
    - GateCoordinator starts first.
    - If GateCoordinator needs flight information they will ask the FlightInfoRetriever to get it.
    - The GateCoordinator will assign the flights based on his instructions

    History:
    {{$history}}
    """,
)

In [None]:
termination_function = KernelFunctionFromPrompt(
    function_name="termination",
    prompt="""
    Determine if the gates are used in the most efficient way possible.
    If the output is complete and valid, respond with "yes". Otherwise, respond with "no".

    History:
    {{$history}}
    """
)

In [None]:
output_settings = AzureChatPromptExecutionSettings(response_format=ContentOutput)

In [None]:
flight_info_provider_agent = ChatCompletionAgent(kernel=kernel,
                                        name="FlightInfoRetriever",
                                        instructions=flight_info_instr,
                                        plugins=[flight_info_tool])

                                           
gate_coordindator_agent = ChatCompletionAgent(
    kernel=kernel,
    name="GateCoordinator",
    instructions=gate_coordindator_instr,
    plugins=[gate_assignment_tool]
)


agent_group = AgentGroupChat(
    agents=[flight_info_provider_agent, gate_coordindator_agent],
    termination_strategy = KernelFunctionTerminationStrategy(
        agents=[gate_coordindator_agent],
        function=termination_function,
        kernel=kernel,
        result_parser=lambda result: any("yes" in item.content.lower() for item in result.value) if isinstance(result.value, list) else "yes" in result.value.content.lower(),
        history_variable_name="history",
        maximum_iterations=10
    ),
    selection_strategy=KernelFunctionSelectionStrategy(
        function=selection_function,
        kernel=kernel,
        result_parser=lambda result: result.value[0].content.strip() if isinstance(result.value, list) and result.value else result.value.content.strip(),
        agent_variable_name="agents",
        history_variable_name="history"
    )
)

In [None]:
async def main(subject: str):

    initial_message = f"Subject: {subject}\n\nRetrieve flight information.\n\n"
    await agent_group.add_chat_message(message=initial_message)

    final_response = ""
    async for content in agent_group.invoke():
        print(f"# {content.name}: {content.content}\n")
        final_response = content.content

    return final_response

In [None]:
await main("Let's optimize the gate space for the flights provided.")

Fetching flight information...
# FlightInfoRetriever: I have retrieved the flight information. Here are the details of the flights:

1. **Flight Number**: 6512
   - **Airline**: American Airlines
   - **Gate**: B22
   - **Arrival Airport**: LBB
   - **Scheduled Arrival Date**: 2025-04-23 at 06:40 AM
   - **Estimated Arrival Date**: 2025-04-23 at 06:28 AM
   - **Aircraft Type**: CR7

2. **Flight Number**: 6304
   - **Airline**: American Airlines
   - **Gate**: B22
   - **Departure Airport**: TUS
   - **Scheduled Departure Date**: 2025-04-23 at 07:20 AM
   - **Estimated Departure Date**: 2025-04-23 at 07:20 AM
   - **Aircraft Type**: CR7

3. **Flight Number**: 2027
   - **Airline**: American Airlines
   - **Gate**: B23
   - **Arrival Airport**: LAX
   - **Scheduled Arrival Date**: 2025-04-23 at 07:29 AM
   - **Estimated Arrival Date**: 2025-04-23 at 07:07 AM
   - **Aircraft Type**: 73H

4. **Flight Number**: 6212
   - **Airline**: American Airlines
   - **Gate**: B21
   - **Departure Air

'### Current Flight and Gate Status:\n\n1. **Gate B21**: \n   - Current Flight: 6212 \n   - Aircraft Type: E75 \n   - Assigned: **Compatible**\n\n2. **Gate B22**: \n   - Current Flights: 6304, 6512 (both CR7) \n   - **Not Assigned**: CR7 incompatible.\n\n3. **Gate B23**: \n   - Current Flight: 2027 \n   - Aircraft Type: 73H \n   - **Not Assigned**: 73H incompatible.\n\n### Issues Encountered:\n- **Flight Compatibility**: \n   - All attempts to assign CR7 aircraft to Gates B22, B24, and B25 have failed due to restrictions on aircraft type.\n   - Flights with 73H also cannot be assigned to Gate B23.\n\n### Options Moving Forward:\n1. **Alternative Gates**: Identify any other gates that may support CR7 or 73H aircraft. \n2. **Review Flight Schedules**: Reassess arrival and departure timings to create openings for compatible gate assignments. \n3. **Consider Aircraft Adjustments**: If feasible, exploring options to modify aircraft types for affected flights.\n\nGiven the current restrictio