# Lab 4: Develop a Finance Analyst Multi-Agent System

In this lab, we’ll build a multi-agent system where a team of agents works together to generate detailed analyst reports about corporate finance data. The system consists of three task agents plus an orchestrator:

1. FinanceDataAgent – This agent searches an Azure AI Search index to retrieve recent financial information and performance data for your company.
2. AnalystReportAgent – This agent writes a detailed analyst report synthesizing the retrieved data, including insights on financial performance, Finance trends, and risk analysis.
3. ValidationAgent – This agent validates that the final report includes a detailed risk assessment.
4. FinanceOrchestratorAgent – The orchestrator that communicates with the above agents to create the final analyst report.

We use the Azure AI Agent Service for the individual task agents and Semantic Kernel to build the orchestrator.

### Part 1: Create the FinanceData, AnalystReport, and Validation Agents

In [2]:
import os
import logging
import json
from semantic_kernel.functions import kernel_function
from dotenv import load_dotenv
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import ConnectionType
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import AzureAISearchTool
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.kernel import Kernel

load_dotenv()

True

#### Step 2: Create the FinanceDataAgent

This agent from Lab 2a searches your existing vector store retrieves relevant financial information.

In [3]:
import json
import time
from lida import Manager, TextGenerationConfig
from llmx import llm, TextGenerationConfig
import os
from typing import Set, Callable, Dict, Any, List, Optional
from pathlib import Path
from dotenv import load_dotenv
import datetime
import base64
from openai import AzureOpenAI

# Azure and telemetry imports
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import (
    AsyncFunctionTool, 
    RequiredFunctionToolCall, 
    SubmitToolOutputsAction, 
    ToolOutput, 
    AsyncToolSet,
    CodeInterpreterTool,
    BingGroundingTool
)
from azure.ai.projects.telemetry.agents import AIAgentsInstrumentor
from azure.identity.aio import DefaultAzureCredential
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
import pandas as pd


def fetch_current_datetime(format: Optional[str] = None) -> str:
    """
    Get the current time as a JSON string, optionally formatted.

    :param format (Optional[str]): The format in which to return the current time. Defaults to None.
    :return: The current time in JSON format.
    :rtype: str
    """
    current_time = datetime.datetime.now()

    # Use the provided format if available, else use a default format
    if format:
        time_format = format
    else:
        time_format = "%Y-%m-%d %H:%M:%S"

    time_json = json.dumps({"current_time": current_time.strftime(time_format)})
    return time_json

async def generate_analysis(question: str,model_deployment:str, data_input: str, output_folder:str) -> str:
    """
    Generate a summary and visualize the data based on the question.

    :param question (str): The analysis question.
    :param data_input (str): The data to analyze.
    :return: Path to the saved chart image or an error message.
    :rtype: str
    """
    # Initialize LLM with Azure OpenAI
    text_gen = llm(
    provider="openai",
    api_type="azure",
    azure_endpoint=os.environ.get("CHAT_MODEL_ENDPOINT"),
    api_key=os.environ.get("CHAT_MODEL_API_KEY"),
    api_version="2023-07-01-preview",
        )
        # Load the Excel File into a DataFrame
    df = pd.read_excel(data_input)
    print(f"Workbook '{data_input}' successfully loaded.")

    lida = Manager(text_gen=text_gen)

    # Configure text generation
    textgen_config = TextGenerationConfig(n=1, temperature=0.5, model=model_deployment, use_cache=False)

    # Summarize the input data
    summary = lida.summarize(df, summary_method="default", textgen_config=textgen_config)  

    # Visualize the summary based on the question
    charts = lida.visualize(summary=summary, goal=question, textgen_config=textgen_config)  

    if len(charts) > 0:
        chart = charts[0]
        code = chart.code

        # Create a timestamp for saving files
        run_timestamp = str(int(time.time()))
        
        #create output folder if it doesn't exist
        os.makedirs(output_folder, exist_ok=True)
        
        # Save summary to a Python file
        #with open(f'{output_folder}/summary_{run_timestamp}.py', 'w') as f:
        #    f.write(str(summary))
        
        # Save generated code to a Python file
        with open(f'{output_folder}/code_{run_timestamp}.py', 'w') as f:
            f.write(code)
        
        # Save the chart image
        chart.savefig(f'{output_folder}/chart_{run_timestamp}.png')

        return f'{output_folder}/chart_{run_timestamp}.png'
    else:
        field_list = str(summary['field_names'])
        return f"Unable to visualize question, please try again with these fields: {field_list}"
    
async def visual_analysis(question: str, img_path: str, deployment_model:str) -> str:
    """
    Analyze the chart image and provide detailed insights.

    :param question (str): The analysis question.
    :param img_path (str): Path to the chart image.
    :return: Detailed analysis in JSON format.
    :rtype: str
    """
    # Initialize Azure OpenAI client
    async with DefaultAzureCredential() as creds:
       async with AIProjectClient.from_connection_string(
            credential=creds, conn_str=os.environ["AIPROJECT_CONNECTION_STRING"],
        ) as project_client:
            client = await project_client.inference.get_azure_openai_client(api_version="2024-06-01")

            # Read and encode the image
            with open(img_path, 'rb') as image_file:
                encoded_image = base64.b64encode(image_file.read()).decode('ascii')

            # Define the chat prompt
            chat_prompt = [
                {
                    "role": "system",
                    "content": [
                        {
                            "type": "text",
                            "text": "You are an AI assistant that helps people find information."
                        }
                    ]
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": f"Please analyze the image and provide a detailed analysis of the chart for this question: {question}"
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{encoded_image}"
                            }
                        }
                    ]
                }
            ] 
            
            # Include speech result if speech is enabled  
            messages = chat_prompt  
                
            # Generate the completion  
            completion = await client.chat.completions.create(  
                model=deployment_model,
                messages=messages,
                max_tokens=800,  
                temperature=0,  
                top_p=0.95,  
                frequency_penalty=0,  
                presence_penalty=0,
                stop=None,  
                stream=False
            )

            return completion.to_json()  
                
        

# Statically defined user functions for fast reference with send_email as async but the rest as sync
user_async_function_tools: Set[Callable[..., Any]] = {
    generate_analysis,
    visual_analysis
}

In [4]:


class FinanceDataScientistAgent:
    """
    A class to represent the Finance Data Agent.
    """
    # def __init__(self, data_input, model_deployment, output_folder):
    #     self.data_input = data_input
    #     self.model_deployment = model_deployment
    #     self.output_folder = output_folder

    @kernel_function(description='An agent that analyzes financial information from internal data.')
    async def search_finance_data(self, question: str) -> str:
        async with DefaultAzureCredential() as creds:
            async with AIProjectClient.from_connection_string(
                credential=creds, conn_str=os.environ["AIPROJECT_CONNECTION_STRING"],
            ) as project_client:
                model_deployment = os.environ["CHAT_MODEL"]
                data_input="./data/financial_sample.xlsx"
                output_folder= "output"
                # Configure Azure Monitor for telemetry
                application_insights_connection_string = await project_client.telemetry.get_connection_string()
                configure_azure_monitor(connection_string=application_insights_connection_string)
                
                # Initialize assistant functions
                functions = AsyncFunctionTool(functions=user_async_function_tools)
                code_interpreter = CodeInterpreterTool()

                # Setup toolset
                toolset = AsyncToolSet()
                toolset.add(functions)
                # Uncomment the next line if you want to add the code interpreter
                # toolset.add(code_interpreter)

                agent_name = "data-science-assistant"

                # Check if the agent already exists
                agents = await project_client.agents.list_agents()
                agent = next((a for a in agents.data if a.name == agent_name), None)

                if agent is None:
                    # Create a new agent if not found
                    agent = await project_client.agents.create_agent(
                        model=model_deployment,
                        name=agent_name,
                        instructions=(
                            'You are a data scientist with access to a tool called generate_analysis '
                            'which can perform analysis and save results for you. Use the data_input provided. '
                            'Use the generate_analysis tool to answer the question, then use the visual_analysis '
                            'to process the image. The answer should be no greater than 1000 characters in length.'
                        ),
                        tools=functions.definitions #+ code_interpreter.definitions
                    )
                    print(f"Created agent, agent ID: {agent.id}")
                else:
                    print(f"Found existing agent: {agent.id}")

                # Create a thread for communication
                thread = await project_client.agents.create_thread()
                print(f"Created thread, ID: {thread.id}")
                
                # Send a message to the agent
                message = await project_client.agents.create_message(
                    thread_id=thread.id, 
                    role="user", 
                    content=f"Current date is {datetime.datetime.now().strftime('%Y-%m-%d')}.model_deployment:{model_deployment}, {question},output_folder:{output_folder}, data_input:{data_input}"
                )
                print(f"Created message, ID: {message.id}")

                # Process the agent run with the provided tools
                run = await project_client.agents.create_and_process_run(
                    thread_id=thread.id, 
                    agent_id=agent.id, 
                    toolset=toolset
                )
                print(f"Run finished with status: {run.status}")

                if run.status == "failed":
                    print(f"Run failed: {run.last_error}")

                print(f"Run completed with status: {run.status}")

                # Fetch and log all messages from the thread
                messages = await project_client.agents.list_messages(thread_id=thread.id)
                print(f"Messages: {messages}")

                # Save any generated files (e.g., images)
                for file_path_annotation in messages.file_path_annotations:
                    print(f"File Paths:")
                    print(f"Type: {file_path_annotation.type}")
                    print(f"Text: {file_path_annotation.text}")
                    print(f"File ID: {file_path_annotation.file_path.file_id}")
                    print(f"Start Index: {file_path_annotation.start_index}")
                    print(f"End Index: {file_path_annotation.end_index}")
                    file_name = Path(file_path_annotation.text).name
                    await project_client.agents.save_file(
                        file_id=file_path_annotation.file_path.file_id, 
                        file_name=file_name
                    )
                    print(f"Saved image file to: {Path.cwd() / file_name}")

                await project_client.agents.delete_agent(agent.id)

                # Get the last message from the conversation
                last_message = messages.text_messages[0].text
                response = last_message
                return response

#### Step 3: Create the AnalystReportAgent

In [5]:
class AnalystReportAgent:
    """
    A class to represent the Analyst Report Agent.
    """
    @kernel_function(description='An agent that writes detailed analyst reports on finance data.')
    async def write_report(self, finance_data:str, data_description: str) -> str:
        """
        Writes a detailed analyst report for a company.
        
        Parameters:
        finance_data (str): the financial data to be included in the report.
        data_description (str): the topic of the report.

        Returns:
        last_msg (json): The final message containing the detailed analyst report.
        """
        print("Calling AnalystReportAgent...")
        
        project_client = AIProjectClient.from_connection_string(
            credential=DefaultAzureCredential(),
            conn_str=os.environ["AIPROJECT_CONNECTION_STRING"],
        )
        
        report_agent = await project_client.agents.create_agent(
            model="gpt-4o",
            name="analyst-report-agent",
            instructions="You are a helpful agent specializing in writing comprehensive analyst reports. Your report should include full analysis of the data.",
        )
        
        thread = await project_client.agents.create_thread()
        
        message = await project_client.agents.create_message(
            thread_id=thread.id,
            role="user",
            content=f"Write a detailed analyst report regarding {data_description}. Include insights provided by the {finance_data}.",
        )
        
        run = await project_client.agents.create_and_process_run(thread_id=thread.id, agent_id=report_agent.id)
        
        if run.status == "failed":
            print(f"Run failed: {run.last_error}")
        
        await project_client.agents.delete_agent(report_agent.id)
        
        messages =await project_client.agents.list_messages(thread_id=thread.id)
        last_msg = messages.get_last_text_message_by_role("assistant")
        
        print("AnalystReportAgent completed successfully.")
        return last_msg

#### Step 4: Create the Validation Agent

This agent validates that the generated analyst report meets our standards – specifically, it checks that the report includes a detailed risk assessment.

In [6]:
class ValidationAgent:
    """
    A class to represent the Validation Agent.
    """
    @kernel_function(description='An agent that runs validation checks to ensure that the generated analyst report meets required standards.')
    async def validate_report(self, report: str) -> str:
        """
        Validates the generated analyst report.
        Requirement: The report must include a detailed risk assessment.
        
        Parameters:
        report (str): The analyst report produced by the AnalystReportAgent.
        
        Returns:
        last_msg (json): The final message containing the validation result.
        """
        print("Calling ValidationAgent...")
        
        project_client = AIProjectClient.from_connection_string(
            credential=DefaultAzureCredential(),
            conn_str=os.environ["AIPROJECT_CONNECTION_STRING"],
        )
        
        validation_agent = await project_client.agents.create_agent(
            model="gpt-4o",
            name="validation-agent",
            instructions="You are an expert agent that validates analyst reports. Return 'Pass' if the report includes a detailed assessment, otherwise return 'Fail'. You must only return 'Pass' or 'Fail'.",
        )
        
        thread = await project_client.agents.create_thread()
        
        message = await project_client.agents.create_message(
            thread_id=thread.id,
            role="user",
            content=f"Validate that the generated analyst report includes a detailed assessment of data. Here is the report: {report}",
        )
        
        run = await project_client.agents.create_and_process_run(thread_id=thread.id, agent_id=validation_agent.id)
        
        if run.status == "failed":
            print(f"Run failed: {run.last_error}")
        
        await project_client.agents.delete_agent(validation_agent.id)
        
        messages = await project_client.agents.list_messages(thread_id=thread.id)
        last_msg = messages.get_last_text_message_by_role("assistant")
        
        print("ValidationAgent completed successfully.")
        return last_msg

### Part 2: Create a Multi-Agent System for Generating Analyst Reports

The orchestrator (FinanceOrchestratorAgent) will coordinate the above agents to generate an analyst report. When you run the notebook, you will be prompted for a publicly traded company name. If the generated report meets the validation criteria, it will be saved as a file.

Try the following prompts:

##

In [8]:
# Environment variables to connect to the gpt-4o model
deployment_name = os.environ["CHAT_MODEL"]
endpoint = os.environ["CHAT_MODEL_ENDPOINT"]
api_key = os.environ["CHAT_MODEL_API_KEY"]
output_folder = "output"

async def main():
    # Initialize the Semantic Kernel.
    kernel = Kernel()
    
    # Add services and plugins to the kernel.
    service_id = "orchestrator_agent"
    kernel.add_service(AzureChatCompletion(service_id=service_id, deployment_name=deployment_name, endpoint=endpoint, api_key=api_key))
    kernel.add_plugin(AnalystReportAgent(), plugin_name="AnalystReportAgent")
    kernel.add_plugin(FinanceDataScientistAgent(), plugin_name="FinanceDataScientistAgent")
    kernel.add_plugin(ValidationAgent(), plugin_name="ValidationAgent")
    
    settings = kernel.get_prompt_execution_settings_from_service_id(service_id=service_id)
    settings.function_choice_behavior = FunctionChoiceBehavior.Auto()
    
    # Create the FinanceOrchestratorAgent to coordinate the agents.
    agent = ChatCompletionAgent(
        service_id="orchestrator_agent",
        kernel=kernel,
        name="FinanceOrchestratorAgent",
        instructions=f"""
        You are an agent designed to create detailed analyst reports for finance data. The user will provide a data_description, and you will generate an analyst report by orchestrating the plugin agents:
        
        - AnalystReportAgent: Formats finance data into report, writes comprehensive analyst reports.
        - FinanceDataScientistAgent: Retrieves financial data and Finance performance information.
        - ValidationAgent: Checks that the report includes a detailed risk assessment.
        
        It is crucial that the final report includes a risk assessment section. If the report does not meet this requirement, it should be rejected.
        Format your final response as a JSON object with two attributes, report_was_generated and content:
        
        - report_was_generated: Boolean that is true if a valid report was produced, otherwise false.
        - content: A string containing the source finance data from the FinanceDataAgent and detailed analyst report if valid, or an error message if not.
        
        Example response:
        {{"report_was_generated": false, "content": "The analyst report for the requested company could not be generated because it lacks a risk assessment section."}}
        
        Your response must be a single valid JSON object using lowercase booleans (true/false) and double quotes for all keys and string values.
        """,
        execution_settings=settings,
    )
    
    history = ChatHistory()
    
    print("FinanceOrchestratorAgent is starting...")
    
    question = 'Analyze Velo discount sales for all segments in Europe'
    
    history.add_message(ChatMessageContent(role=AuthorRole.USER, content=question))
    
    async for response in agent.invoke(history=history):
        fixed_content = response.content.replace("False", "false").replace("True", "true")
        print(f"Response: {fixed_content}")
        try:
            response_json = json.loads(fixed_content)
            report_was_generated = response_json['report_was_generated']
            report_content = response_json['content']
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError: {e}")
            print(f"Problematic content: {fixed_content}")
            report_was_generated = False
            report_content = f"Error decoding JSON: {e}. Please check the agent's response format."
        
        if report_was_generated:
            report_name = f"Analyst Report - {question}.md"

            os.makedirs(output_folder, exist_ok=True)
            with open(f"{output_folder}/{report_name}".format(report_name), "w") as f:
                f.write(report_content)
            print(f"The analyst report for '{question}' has been generated. Please check the file {report_name}.")
        else:
            print(report_content)
await main()

FinanceOrchestratorAgent is starting...
Created agent, agent ID: asst_0MkxNY1wDrYPbE4Rk2ty5vSk
Created thread, ID: thread_AJNvHKJ6eXXsKbbb6bg12ZR7
Created message, ID: msg_eBNvb7HKlUZCyUTvBZpLWWce
Workbook './data/financial_sample.xlsx' successfully loaded.
Run finished with status: RunStatus.COMPLETED
Run completed with status: RunStatus.COMPLETED
Messages: {'object': 'list', 'data': [{'id': 'msg_wQUP2hsoFmywho9DLtuu3b5Q', 'object': 'thread.message', 'created_at': 1743445528, 'assistant_id': 'asst_0MkxNY1wDrYPbE4Rk2ty5vSk', 'thread_id': 'thread_AJNvHKJ6eXXsKbbb6bg12ZR7', 'run_id': 'run_UbHdqnWHkowPEpdPLeqaeDn1', 'role': 'assistant', 'content': [{'type': 'text', 'text': {'value': "The financial performance of Velo discount sales in Europe, analyzed across various segments, shows the following:\n\n1. **Government Segment**: \n   - **Gross Sales**: Highest at approximately 5 million units.\n   - **Discounts**: Moderate, indicating discounts don't heavily impact the volume.\n   - **Profit