# Data analytical multi-agent workflow:

![image info](../../images/agent_demo2.png)

You can also use multiple agents in a workflow. Here is an example:

1. Data Analysis Planning:

  The planning agent writes a comprehensive data analysis plan, outlining the steps required to analyze the data.

2. Code Generation and Execution:

  For each step in the analysis plan, the Python agent generates the corresponding code.
The Python agent then executes the generated code to perform the specified analysis.

3. Analysis Report Summarization:

  Based on the results of the executed code, the summarization agent writes an analysis report.
The report summarizes the findings and insights derived from the data analysis.


## Install dependencies

First we will install the python SDK and set our API key!

In [None]:
!pip install mistralai==1.0.0

In [2]:
import os
from mistralai import Mistral
import re
from getpass import getpass

if not (api_key := os.getenv("MISTRAL_API_KEY")):
    api_key = getpass("🔑 Enter your Mistral API key: ")
os.environ["MISTRAL_API_KEY"] = api_key

client = Mistral(api_key=api_key)

## Agents
You can create an Agent in https://console.mistral.ai/build/agents/new, for this notebook we will use mistral-large-2407 as the model powering our agents!

Here are the instructions provided to the agents we created:

### Planning agent:

```
You are a data analytical planning assistant. Given a dataset and its description,
your task is to provide specific and simple analysis plans, detailed instructions,
and suggested Python code that can later be given to a separate Python agent to generate
the Python code for executing the analysis plan.
Do not create figures.

Return output with the following format:

## Total number of steps:

## Step 1:
```

### Python agent:
```
You are a Python coding assistant that only outputs Python code without any explanations or comments.
Given an instruction and the suggested Python code, return the correct Python code.
```

### Summarization agent:
```
You are an analysis summarization assistant.
Given a dataset's description and the analysis results. Provide an analysis report.
```

### Agents IDs
Next, we will retrieve the Agents IDs from the UI where we created the agents.


In [3]:
planning_agent_id =  "ag:ad73bfd7:20241009:planning-agent:40a0d3e8"
summarization_agent_id = "ag:ad73bfd7:20241009:summarization-agent:3036db8a"
python_agent_id = "ag:ad73bfd7:20240912:python-codegen-agent:0375a7cf"

# Analysis Planning

In [4]:
def run_analysis_planning_agent(query):
    """
    Sends a user query to a Python agent and returns the response.

    Args:
        query (str): The user query to be sent to the Python agent.

    Returns:
        str: The response content from the Python agent.
    """
    print("### Run Planning agent")
    print(f"User query: {query}")
    try:
        response = client.agents.complete(
            agent_id= planning_agent_id,
            messages = [
                {
                    "role": "user",
                    "content":  query
                },
            ]
        )
        result = response.choices[0].message.content
        return result
    except Exception as e:
        print(f"Request failed: {e}. Please check your request.")
        return None

In [5]:
query = """
Load this data: https://raw.githubusercontent.com/fivethirtyeight/data/master/bad-drivers/bad-drivers.csv

The dataset consists of 51 datapoints and has eight columns:
- State
- Number of drivers involved in fatal collisions per billion miles
- Percentage Of Drivers Involved In Fatal Collisions Who Were Speeding
- Percentage Of Drivers Involved In Fatal Collisions Who Were Alcohol-Impaired
- Percentage Of Drivers Involved In Fatal Collisions Who Were Not Distracted
- Percentage Of Drivers Involved In Fatal Collisions Who Had Not Been Involved In Any Previous Accidents
- Car Insurance Premiums ($)
- Losses incurred by insurance companies for collisions per insured driver ($)
"""

In [None]:
planning_result = run_analysis_planning_agent(query)

In [None]:
print(planning_result)

# Generate and execute Python code for each planning step

In [8]:
class PythonAgentWorkflow:
    def __init__(self):
        pass

    def extract_pattern(self, text, pattern):
        """
        Extracts a pattern from the given text.

        Args:
            text (str): The text to search within.
            pattern (str): The regex pattern to search for.

        Returns:
            str: The extracted pattern or None if not found.
        """
        match = re.search(pattern, text, flags=re.DOTALL)
        if match:
            return match.group(1).strip()
        return None

    def extract_step_i(self, planning_result, i, n_step):
        """
        Extracts the content of a specific step from the planning result.

        Args:
            planning_result (str): The planning result text.
            i (int): The step number to extract.
            n_step (int): The total number of steps.

        Returns:
            str: The extracted step content or None if not found.
        """
        if i < n_step:
            pattern = rf'## Step {i}:(.*?)## Step {i+1}'
        elif i == n_step:
            pattern = rf'## Step {i}:(.*)'
        else:
            print(f"Invalid step number {i}. It should be between 1 and {n_step}.")
            return None

        step_i = self.extract_pattern(planning_result, pattern)
        if not step_i:
            print(f"Failed to extract Step {i} content.")
            return None

        return step_i

    def extract_code(self, python_agent_result):
          """
          Extracts Python function and test case from the response content.

          Args:
              result (str): The response content from the Python agent.

          Returns:
              tuple: A tuple containing the extracted Python function and a retry flag.
          """
          retry = False
          print("### Extracting Python code")
          python_code = self.extract_pattern(python_agent_result, r'```python(.*?)```')
          if not python_code:
              retry = True
              print("Python function failed to generate or wrong output format. Setting retry to True.")

          return python_code, retry

    def run_python_agent(self, query):
        """
        Sends a user query to a Python agent and returns the response.

        Args:
            query (str): The user query to be sent to the Python agent.

        Returns:
            str: The response content from the Python agent.
        """
        print("### Run Python agent")
        print(f"User query: {query}")
        try:
            response = client.agents.complete(
                agent_id= python_agent_id,
                messages = [
                    {
                        "role": "user",
                        "content":  query
                    },
                ]
            )
            result = response.choices[0].message.content
            return result

        except Exception as e:
            print(f"Request failed: {e}. Please check your request.")
            return None

    def check_code(self, python_function, state):
        """
        Executes the Python function and checks for any errors.

        Args:
            python_function (str): The Python function to be executed.

        Returns:
            bool: A flag indicating whether the code execution needs to be retried.

        Warning:
            This code is designed to run code that’s been generated by a model, which may not be entirely reliable.
            It's strongly recommended to run this in a sandbox environment.
        """
        retry = False
        try:
            print(f"### Python function to run: {python_function}")
            exec(python_function, state)
            print("Code executed successfully.")
        except Exception:
            print(f"Code failed.")
            retry = True
            print("Setting retry to True")
        return retry

    def process_step(self, planning_result, i, n_step, max_retries, state):
        """
        Processes a single step, including retries.

        Args:
            planning_result (str): The planning result text.
            i (int): The step number to process.
            n_step (int): The total number of steps.
            max_retries (int): The maximum number of retries.

        Returns:
            str: The extracted step content or None if not found.
        """

        retry = True
        j = 0
        while j < max_retries and retry:
            print(f"TRY # {j}")
            j += 1
            step_i = self.extract_step_i(planning_result, i, n_step)
            if step_i:
                print(step_i)
                python_agent_result = self.run_python_agent(step_i)
                python_code, retry = self.extract_code(python_agent_result)
                print(python_code)
                retry = self.check_code(python_code, state)
        return None

    def workflow(self, planning_result):
        """
        Executes the workflow for processing planning results.

        Args:
            planning_result (str): The planning result text.
        """
        state = {}
        print("### ENTER WORKFLOW")
        n_step = int(self.extract_pattern(planning_result, '## Total number of steps:\s*(\d+)'))
        for i in range(1, n_step + 1):
            print(f"STEP # {i}")
            self.process_step(planning_result, i, n_step, max_retries=2, state=state)


        print("### Exit WORKFLOW")
        return None

In [None]:
import sys
import io

# See the output of print statements in the console while also capturing it in a variable,
class Tee(io.StringIO):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.original_stdout = sys.stdout

    def write(self, data):
        self.original_stdout.write(data)
        super().write(data)

    def flush(self):
        self.original_stdout.flush()
        super().flush()

# Create an instance of the Tee class
tee_stream = Tee()

# Redirect stdout to the Tee instance
sys.stdout = tee_stream


Python_agent = PythonAgentWorkflow()
Python_agent.workflow(planning_result)

# Restore the original stdout
sys.stdout = tee_stream.original_stdout

# Get the captured output
captured_output = tee_stream.getvalue()

# Summarization

In [10]:
response = client.agents.complete(
    agent_id= summarization_agent_id,
    messages = [
        {
            "role": "user",
            "content":  query + captured_output
        },
    ]
)
result = response.choices[0].message.content


In [None]:
print(result)

# (Optional) Trace and Evaluate your Agent

Now that your agent is running, you can optionally trace and evaluate it with Arize Phoenix. Phoenix is an open-source framework for tracing and evaluating LLM applications, including agents and RAG pipelines.

Tracing refers to the process of recording the calls made between your application and the LLM. Evaluation can be thought of as the performance testing of your agent. Phoenix provides a UI for you to view traces and evaluations, as well as a suite of evaluation templates.

To start off, create a Phoenix account and get your API key [here](https://phoenix.arize.com).

### Set up Phoenix

In [31]:
!pip install -q openinference-instrumentation-mistralai arize-phoenix 'arize-phoenix-evals>=0.18.0'

In [13]:
from getpass import getpass
if not (api_key := os.getenv("PHOENIX_API_KEY")):
    api_key = getpass("🔑 Enter your Phoenix API key: ")
os.environ["PHOENIX_API_KEY"] = api_key

In [None]:
from openinference.instrumentation.mistralai import MistralAIInstrumentor
from phoenix.otel import register
import os

# Add Phoenix API Key for tracing
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={os.getenv('PHOENIX_API_KEY')}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"

# configure the Phoenix tracer
tracer_provider = register() 

# Phoenix provides an openinference package that automatically traces all requests to Mistral
MistralAIInstrumentor().instrument(tracer_provider=tracer_provider, skip_dep_check=True)

### Run your agent

In [None]:
planning_result = run_analysis_planning_agent(query)

In [None]:
# Create an instance of the Tee class
tee_stream = Tee()

# Redirect stdout to the Tee instance
sys.stdout = tee_stream


Python_agent = PythonAgentWorkflow()
Python_agent.workflow(planning_result)

# Restore the original stdout
sys.stdout = tee_stream.original_stdout

# Get the captured output
captured_output = tee_stream.getvalue()

In [17]:
response = client.agents.complete(
    agent_id= summarization_agent_id,
    messages = [
        {
            "role": "user",
            "content":  query + captured_output
        },
    ]
)
result = response.choices[0].message.content

In [None]:
print(result)

### View your traces
You should now be able to view traces in [Phoenix](https://app.phoenix.arize.com).

### Evaluate your agent

Now let's evaluate your agent. The flow for batch evaluation is as follows:

1. Export traces from Phoenix
2. Attach labels to the traces. These can be created using an LLM as a judge, using code-based evaluation, or using a combination of both.
3. Import the labeled traces into Phoenix.

#### Export traces from Phoenix

In [None]:
import phoenix as px

spans = px.Client().get_spans_dataframe()

spans.head()

When it comes to evaluating agents, a good general approach is to break down the steps your agent must complete, and evaluate each step individually.

In this case, we can evaluate:
1. The code generated by the Python agent
2. The analysis report written by the summarization agent

We'll evaluate each of these steps individually.

#### Evaluate Code Generation

Phoenix has a [built-in LLM Judge template that can be used to evaluate Code Generation Agents](https://docs.arize.com/phoenix/evaluation/how-to-evals/running-pre-tested-evals/code-generation-eval). We'll use that template here.

The template requires two columns to be added to the dataframe:
- output: The code generated by the agent
- input: The original user query

We already have the input, so just need to extract solely the generated code from the `attributes.llm.output_messages` column:

In [None]:
code_gen_spans = spans[spans['attributes.llm.invocation_parameters'].str.contains(python_agent_id)].copy()
code_gen_spans.loc[:,'input'] = code_gen_spans['attributes.input.value']
code_gen_spans.loc[:,'output'] = code_gen_spans['attributes.llm.output_messages'].apply(lambda x: PythonAgentWorkflow().extract_code(x[0]['message.content']))
code_gen_spans.head()

In [None]:
from phoenix.evals import (CODE_READABILITY_PROMPT_TEMPLATE, 
                           CODE_READABILITY_PROMPT_RAILS_MAP, 
                           llm_classify, 
                           MistralAIModel)

import nest_asyncio
nest_asyncio.apply()

eval_model = MistralAIModel(api_key=os.getenv("MISTRAL_API_KEY"))

code_gen_evals = llm_classify(
    model=eval_model,
    template=CODE_READABILITY_PROMPT_TEMPLATE,
    dataframe=code_gen_spans,
    concurrency=20,
    provide_explanation=True,
    rails=list(CODE_READABILITY_PROMPT_RAILS_MAP.values())
)
code_gen_evals['score'] = code_gen_evals['label'].apply(lambda x: 1 if x == "readable" else 0)

This leaves us with a dataframe of evaluations. We'll add this back into Phoenix later on.

In [None]:
code_gen_evals.head()

#### Evaluate Summarization

We'll use a simple prompt to evaluate the summarization agent, this time using a different prebuilt evaluation template.

In [None]:
# Filter the spans to only include those from our summarization agent
summarization_spans = spans[spans['attributes.llm.invocation_parameters'].str.contains(summarization_agent_id)].copy()
summarization_spans = summarization_spans.assign(
    input=summarization_spans['attributes.input.value'],
    output=summarization_spans['attributes.llm.output_messages']
)
summarization_spans.head()

In [None]:
import phoenix.evals.default_templates as templates

summarization_evals = llm_classify(
    model=eval_model,
    template=templates.SUMMARIZATION_PROMPT_TEMPLATE,
    dataframe=summarization_spans,
    concurrency=20,
    provide_explanation=True,
    rails=list(templates.SUMMARIZATION_PROMPT_RAILS_MAP.values())
)
summarization_evals['score'] = summarization_evals['label'].apply(lambda x: 1 if x == "good" else 0)

In [None]:
summarization_evals.head()

#### Import labeled traces into Phoenix

In [None]:
from phoenix.trace import SpanEvaluations

px.Client().log_evaluations(
    SpanEvaluations(eval_name="Code Quality", dataframe=code_gen_evals),
    SpanEvaluations(eval_name="Summarization", dataframe=summarization_evals)
)

You can now view your evaluations in the Phoenix UI:

![image info](../../third_party/Phoenix/images/phoenix-agent-summarization-eval.png)

Congratulations! You've now successfully evaluated your agent.

Check out [Phoenix](https://phoenix.arize.com) for more!
