# ADK Gov AI Agents

<p style="font-size: 1.5em; line-height: 1.3;">
Automação de Workflow de Data com Dados Públicos.
</p>

### Imports

In [1]:
from sklearn.metrics import precision_score, recall_score, f1_score, mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier, XGBRegressor

from google.adk.agents import LlmAgent, LoopAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.tool_context import ToolContext
from google.genai import types
import google.genai as genai
from dotenv import load_dotenv

import pandas as pd
import numpy as np
import requests
import asyncio
import pprint
import io
import os
import zipfile
import tarfile
from typing import Optional



### $$$ Definições Iniciais $$$

In [2]:
# Constantes do Agente
APP_NAME = "full_datascience_pipeline_app"
USER_ID = "dev_user_01"
SESSION_ID = "session_01"
GEMINI_MODEL = "gemini-2.0-flash"    # gemini-2.0-flash | gemini-2.0-flash-exp | gemini-2.5-flash-lite

# Outras constantes
URL = "https://download.inep.gov.br/dados_abertos/microdados_censo_escolar_2024.zip"    # url para download dos dados
DATA_DIR = "DATA"                                                                       # diretório local onde os dados brutos serão baixados e extraídos
TIME = 1                                                                                # pausa em segundos para evitar erros de "quota exceeded" da API (limite de requisições por minuto)

# Chaves de Estados da sessão
DATA_WORKSPACE = {}
STATE_ENGINEERING_SUMMARY = "engineering_summary"
STATE_PERFORMANCE_METRICS = "performance_metrics"
STATE_HYPERPARAMETERS = "hyperparameters"
STATE_CRITIQUE = "critique_output"
REENGINEER_SIGNAL = "REVISAR ENGENHARIA DE DADOS"
EXIT_SIGNAL = "PROCESSO CONCLUÍDO"

# Query do usuário
INITIAL_QUERY = (
    f"Verifique os dados contidos na pasta '{DATA_DIR}' e encontre o arquivo principal referente às escolas. "
    "Utilize os dicionários de dados dos datasets (se existirem). "
    "O objetivo é prever se uma escola possui internet (`IN_INTERNET`). "
    "Selecione colunas relevantes (como a localização e infraestrutura da escola) para construir o modelo."
    "Para o workflow, utilize somente as ferramentas previamente dadas."
)

# Carrega variáveis de ambiente
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_GENAI_USE_VERTEXAI = os.getenv("GOOGLE_GENAI_USE_VERTEXAI")

In [3]:
# print("Modelos disponíveis que suportam 'generateContent':")
# print("-------------------------------------------------")

# client = genai.Client(api_key=GOOGLE_API_KEY)

# for model in client.models.list():
#     print(f"Nome da API: {model.name}")
#     print(f"  Nome de Exibição: {model.display_name}")
#     print(f"  Descrição: {model.description}\n")

### Dowload e Extração dos Dados

In [4]:
def download_and_extract(url: str, data_dir: str):
    """
    Baixa um arquivo de uma URL, salva-o em um diretório específico e o extrai.

    A função verifica se o arquivo já foi baixado e se o conteúdo já foi
    extraído antes de executar as operações, evitando trabalho redundante.

    Args:
        url: A URL do arquivo a ser baixado.
        data_dir: O diretório para salvar o arquivo e extrair seu conteúdo.
    """
    # Garante que o diretório de destino exista.
    print(f"--- Garantindo que o diretório '{data_dir}' exista. ---")
    os.makedirs(data_dir, exist_ok=True)

    # Define o caminho de salvamento do arquivo dentro do diretório de dados.
    filename = os.path.basename(url)
    archive_path = os.path.join(data_dir, filename)

    # Verifica se o arquivo já existe para evitar um novo download.
    if not os.path.exists(archive_path):
        print(f"--- Baixando arquivo de {url} para {archive_path} ---")
        try:
            response = requests.get(url, stream=True)
            # Lança uma exceção para respostas com erro (ex: 404, 500).
            response.raise_for_status()
            with open(archive_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print("--- Download concluído com sucesso. ---")
        except requests.exceptions.RequestException as e:
            print(f"Erro ao baixar o arquivo: {e}")
            return # Interrompe a execução se o download falhar.
    else:
        print(f"--- O arquivo '{archive_path}' já existe. Pulando o download. ---")

    # Verifica se o conteúdo já foi extraído.
    # Esta verificação assume que o arquivo .zip contém uma pasta principal
    # com o mesmo nome do arquivo (ex: 'ml-latest-small.zip' -> 'ml-latest-small/').
    extracted_folder_name = os.path.splitext(filename)[0]
    expected_extracted_path = os.path.join(data_dir, extracted_folder_name)

    if os.path.exists(expected_extracted_path):
        print(f"--- O conteúdo já parece ter sido extraído em '{data_dir}'. Pulando a extração. ---")
    else:
        # Extrai o arquivo baixado para o mesmo diretório.
        print(f"--- Extraindo {archive_path} para {data_dir} ---")
        try:
            if archive_path.endswith('.zip'):
                with zipfile.ZipFile(archive_path, 'r') as zip_ref:
                    zip_ref.extractall(data_dir)
            elif archive_path.endswith(('.tar', '.tar.gz', '.tgz', '.tar.bz2')):
                with tarfile.open(archive_path, 'r:*') as tar_ref:
                    tar_ref.extractall(path=data_dir)
            else:
                print(f"Formato de arquivo não suportado para extração: {archive_path}")
                return

            print(f"--- Arquivo extraído com sucesso para '{data_dir}'. ---")

        except (zipfile.BadZipFile, tarfile.ReadError) as e:
            print(f"Erro ao extrair o arquivo: {e}")


print("Iniciando o processo de download e extração...")
download_and_extract(url=URL, data_dir=DATA_DIR)
print("\nProcesso finalizado.")
print(f"Verifique a pasta '{DATA_DIR}' para ver os resultados.")

Iniciando o processo de download e extração...
--- Garantindo que o diretório 'DATA' exista. ---
--- O arquivo 'DATA/microdados_censo_escolar_2024.zip' já existe. Pulando o download. ---
--- O conteúdo já parece ter sido extraído em 'DATA'. Pulando a extração. ---

Processo finalizado.
Verifique a pasta 'DATA' para ver os resultados.


### Ferramentas (Tools) dos AI Agents

In [5]:
def list_project_files(start_path: str) -> dict:
    """
    Lists all folders, subfolders, and their files within a directory.

    Args:
        start_path: The directory to start listing from.

    Returns:
        A dictionary with the status and a string representing the file tree.
    """
    if '..' in start_path:
        return {"status": "error", "message": "Path cannot contain '..'. Access is restricted."}

    try:
        tree_string = ""
        for root, dirs, files in os.walk(start_path):
            if any(d in root for d in ['__pycache__', '.venv', 'env', '.git']):
                continue
            level = root.replace(start_path, '').count(os.sep)
            indent = " " * 4 * level
            tree_string += f"{indent}{os.path.basename(root)}/\n"
            sub_indent = " " * 4 * (level + 1)
            for f in files:
                tree_string += f"{sub_indent}{f}\n"

        print(f"--- Tool: Listing files in {start_path} ---")
        return {"status": "success", "file_tree": tree_string or "No files or directories found."}
    except Exception as e:
        return {"status": "error", "message": str(e)}


def read_text_file(file_path: str) -> dict:
    """
    Reads the content of a simple text file (e.g., .txt, .md, .csv).

    Args:
        file_path: The path to the text file.

    Returns:
        A dictionary with the status and the content of the file.
    """
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            content = f.read()
        print(f"--- Tool: Reading text file {file_path} ---")
        return {"status": "success", "file_path": file_path, "content": content}
    except Exception as e:
        return {"status": "error", "message": f"Error reading file {file_path}: {e}"}


def read_dataset(file_name: str, delimiter: str, use_columns: list[str]) -> dict:
    """
    Reads a dataset into the workspace or inspects its columns.
    If 'use_columns' is omitted or empty, it only inspects and returns column names.

    Args:
        file_name: The path of the file to read.
        delimiter: The character used to separate values.
        use_columns: A list of column names to read. If empty/None, inspects columns.

    Returns:
        A dictionary containing the outcome. On success, includes the dataframe key and info.
        On inspection, returns the list of columns. On error, returns an error message.
    """
    try:
        # Inspection mode: returns only the column names
        if not use_columns:
            header_df = pd.read_csv(file_name, sep=delimiter, nrows=0, encoding='latin1', low_memory=False)
            columns = header_df.columns.tolist()
            print(f"--- Tool: Inspected columns in {file_name} ---")
            return {"status": "inspection_success", "columns": columns, "file_name": file_name}

        # Full read mode: loads the dataset into the workspace
        df = pd.read_csv(file_name, sep=delimiter, usecols=use_columns, encoding='latin1', low_memory=False)
        df_key = f"df_{os.path.basename(file_name).split('.')[0]}"
        DATA_WORKSPACE[df_key] = df
        print(f"--- Tool: read_dataset successful. Stored under key: {df_key} ---")
        return {
            "status": "success",
            "df_key": df_key,
            "rows_loaded": len(df),
            "columns_loaded": len(df.columns)
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}
    

def preview_dataset(df_key: str) -> dict:
    """
    Previews the first 5 rows of a DataFrame from the workspace.

    Args:
        df_key: The key of the DataFrame in the workspace.

    Returns:
        A dictionary with status and a string representation of the DataFrame's head.
    """
    if df_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"DataFrame key '{df_key}' not found."}
    
    df = DATA_WORKSPACE[df_key]
    return {"status": "success", "df_key": df_key, "preview": df.head().to_string()}


def dataset_info(df_key: str) -> dict:
    """
    Provides technical information about a DataFrame (columns, types, non-null counts).

    Args:
        df_key: The key of the DataFrame in the workspace.

    Returns:
        A dictionary with status and the DataFrame's info as a string.
    """
    if df_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"DataFrame key '{df_key}' not found."}
    
    df = DATA_WORKSPACE[df_key]
    buffer = io.StringIO()
    df.info(buf=buffer)
    return {"status": "success", "df_key": df_key, "info": buffer.getvalue()}


def clean_dataset(df_key: str) -> dict:
    """
    Cleans a DataFrame by removing rows with NaN values and duplicates. Modifies in place.

    Args:
        df_key: The key of the DataFrame to clean.

    Returns:
        A dictionary with status and statistics about the cleaning process.
    """
    if df_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"DataFrame key '{df_key}' not found."}

    try:
        df = DATA_WORKSPACE[df_key]
        rows_before = len(df)
        df.dropna(inplace=True)
        df.drop_duplicates(inplace=True)
        rows_after = len(df)
        DATA_WORKSPACE[df_key] = df
        return {
            "status": "success",
            "df_key": df_key,
            "rows_before": rows_before,
            "rows_after": rows_after,
            "rows_removed": rows_before - rows_after
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}


def convert_to_categorical(df_key: str, columns_to_convert: list[str]) -> dict:
    """
    Converts specified columns in a DataFrame to the 'category' dtype. Modifies in place.

    Args:
        df_key: The key of the DataFrame to modify.
        columns_to_convert: A list of column names to convert.

    Returns:
        A dictionary confirming the status and listing the converted columns.
    """
    if df_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"DataFrame key '{df_key}' not found."}
    
    try:
        df = DATA_WORKSPACE[df_key]
        converted = []
        not_found = []
        for col in columns_to_convert:
            if col in df.columns:
                df[col] = df[col].astype('category')
                converted.append(col)
            else:
                not_found.append(col)
        
        DATA_WORKSPACE[df_key] = df
        return {
            "status": "success",
            "df_key": df_key,
            "converted_columns": converted,
            "columns_not_found": not_found
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}


def split_features_target(df_key: str, target_column: str) -> dict:
    """
    Splits a DataFrame into features (X) and target (y). Stores them in the workspace.

    Args:
        df_key: The key of the DataFrame to split.
        target_column: The name of the target column (y).

    Returns:
        A dictionary with status and the new keys for features (X) and target (y).
    """
    if df_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"DataFrame key '{df_key}' not found."}
    
    df = DATA_WORKSPACE[df_key]
    if target_column not in df.columns:
        return {"status": "error", "message": f"Target column '{target_column}' not in DataFrame."}
    
    X = df.drop(columns=[target_column])
    y = df[target_column]
    
    X_key = f"X_{df_key}"
    y_key = f"y_{df_key}"
    
    DATA_WORKSPACE[X_key] = X
    DATA_WORKSPACE[y_key] = y
    
    return {"status": "success", "features_key": X_key, "target_key": y_key}


def train_test_split_data(X_key: str, y_key: str, test_size: float, random_state: int) -> dict:
    """
    Splits features (X) and target (y) into training and testing sets.

    Args:
        X_key: The workspace key for the features DataFrame (X).
        y_key: The workspace key for the target Series (y).
        test_size: Proportion for the test split.
        random_state: Seed for reproducibility.

    Returns:
        A dictionary with status and the keys for X_train, X_test, y_train, and y_test.
    """
    if X_key not in DATA_WORKSPACE or y_key not in DATA_WORKSPACE:
        return {"status": "error", "message": f"Feature key '{X_key}' or target key '{y_key}' not found."}
        
    X = DATA_WORKSPACE[X_key]
    y = DATA_WORKSPACE[y_key]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    
    keys = {
        "X_train": f"{X_key}_train", "X_test": f"{X_key}_test",
        "y_train": f"{y_key}_train", "y_test": f"{y_key}_test"
    }
    
    DATA_WORKSPACE[keys["X_train"]] = X_train
    DATA_WORKSPACE[keys["X_test"]] = X_test
    DATA_WORKSPACE[keys["y_train"]] = y_train
    DATA_WORKSPACE[keys["y_test"]] = y_test
    
    return {"status": "success", "data_keys": keys}
    

def apply_xgboost_classifier(X_train_key: str, y_train_key: str, n_estimators: Optional[int], max_depth: Optional[int], subsample: Optional[float], colsample_bytree: Optional[float]) -> dict:
    """
    Trains an XGBoostClassifier model and stores it in the workspace.

    Args:
        X_train_key: The workspace key for the training features (X_train).
        y_train_key: The workspace key for the training target (y_train).
        n_estimators: Number of boosting rounds.
        max_depth: Maximum tree depth.
        subsample: Subsample ratio of the training instance.
        colsample_bytree: Subsample ratio of columns when constructing each tree.

    Returns:
        A dictionary with status and the key for the trained model.
    """
    if X_train_key not in DATA_WORKSPACE or y_train_key not in DATA_WORKSPACE:
        return {"status": "error", "message": "Training data keys not found in workspace."}
    
    try:
        X_train = DATA_WORKSPACE[X_train_key]
        y_train = DATA_WORKSPACE[y_train_key]
        
        # Start with default hyperparameters
        params = {
            'n_estimators': 100,
            'max_depth': 3,
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'enable_categorical': True
        }
        
        # Create a dictionary of provided hyperparameters that are not None
        provided_params = {
            'n_estimators': n_estimators,
            'max_depth': max_depth,
            'subsample': subsample,
            'colsample_bytree': colsample_bytree
        }
        
        # Update defaults only with the values that were actually provided
        # This filters out the 'None' values from the arguments
        params.update({k: v for k, v in provided_params.items() if v is not None})

        model = XGBClassifier(**params)
        model.fit(X_train, y_train)
        
        model_key = "xgb_classifier_model"
        DATA_WORKSPACE[model_key] = model
        
        return {"status": "success", "model_key": model_key, "hyperparameters_used": params}
    except Exception as e:
        return {"status": "error", "message": str(e)}


def apply_xgboost_regressor(X_train_key: str, y_train_key: str, n_estimators: Optional[int], max_depth: Optional[int], subsample: Optional[float], colsample_bytree: Optional[float]) -> dict:
    """
    Trains an XGBoostRegressor model and stores it in the workspace.

    Args:
        X_train_key: The workspace key for the training features (X_train).
        y_train_key: The workspace key for the training target (y_train).
        n_estimators: Number of boosting rounds.
        max_depth: Maximum tree depth.
        subsample: Subsample ratio of the training instance.
        colsample_bytree: Subsample ratio of columns when constructing each tree.

    Returns:
        A dictionary with status and the key for the trained model.
    """
    if X_train_key not in DATA_WORKSPACE or y_train_key not in DATA_WORKSPACE:
        return {"status": "error", "message": "Training data keys not found in workspace."}

    try:
        X_train = DATA_WORKSPACE[X_train_key]
        y_train = DATA_WORKSPACE[y_train_key]

        # Start with default hyperparameters
        params = {
            'n_estimators': 100,
            'max_depth': 3,
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'enable_categorical': True
        }

        # Create a dictionary of provided hyperparameters that are not None
        provided_params = {
            'n_estimators': n_estimators,
            'max_depth': max_depth,
            'subsample': subsample,
            'colsample_bytree': colsample_bytree
        }

        # Update defaults only with the values that were actually provided
        params.update({k: v for k, v in provided_params.items() if v is not None})

        model = XGBRegressor(**params)
        model.fit(X_train, y_train)
        
        model_key = "xgb_regressor_model"
        DATA_WORKSPACE[model_key] = model
        
        return {"status": "success", "model_key": model_key, "hyperparameters_used": params}
    except Exception as e:
        return {"status": "error", "message": str(e)}


def evaluate_classifier_performance(model_key: str, X_test_key: str, y_test_key: str) -> dict:
    """
    Evaluates a classifier model using precision, recall, and F1-score.

    Args:
        model_key: The workspace key of the trained classifier model.
        X_test_key: The workspace key of the test features (X_test).
        y_test_key: The workspace key of the true test target values (y_test).

    Returns:
        A dictionary containing the status and performance metrics.
    """
    if model_key not in DATA_WORKSPACE or X_test_key not in DATA_WORKSPACE or y_test_key not in DATA_WORKSPACE:
        return {"status": "error", "message": "Model or test data keys not found in workspace."}
        
    try:
        model = DATA_WORKSPACE[model_key]
        X_test = DATA_WORKSPACE[X_test_key]
        y_test = DATA_WORKSPACE[y_test_key]
        y_pred = model.predict(X_test)
        
        precision = precision_score(y_test, y_pred, average='binary', zero_division=0)
        recall = recall_score(y_test, y_pred, average='binary', zero_division=0)
        f1 = f1_score(y_test, y_pred, average='binary', zero_division=0)
        
        return {
            "status": "success",
            "metrics": {"Precision": precision, "Recall": recall, "F1-Score": f1}
        }
    except Exception as e:
        return {"status": "error", "message": f"Failed to evaluate classifier: {e}"}


def evaluate_regressor_performance(model_key: str, X_test_key: str, y_test_key: str) -> dict:
    """
    Evaluates a regressor model using MAE, RMSE, and R-squared.

    Args:
        model_key: The workspace key of the trained regressor model.
        X_test_key: The workspace key of the test features (X_test).
        y_test_key: The workspace key of the true test target values (y_test).

    Returns:
        A dictionary containing the status and performance metrics.
    """
    if model_key not in DATA_WORKSPACE or X_test_key not in DATA_WORKSPACE or y_test_key not in DATA_WORKSPACE:
        return {"status": "error", "message": "Model or test data keys not found in workspace."}

    try:
        model = DATA_WORKSPACE[model_key]
        X_test = DATA_WORKSPACE[X_test_key]
        y_test = DATA_WORKSPACE[y_test_key]
        y_pred = model.predict(X_test)
        
        mae = mean_absolute_error(y_test, y_pred)
        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
        r2 = r2_score(y_test, y_pred)

        return {
            "status": "success",
            "metrics": {"MAE": mae, "RMSE": rmse, "R2": r2}
        }
    except Exception as e:
        return {"status": "error", "message": f"Failed to evaluate regressor: {e}"}


def exit_loop(tool_context: ToolContext) -> dict:
    """
    Signals the main agent loop to stop iterating.

    Args:
        tool_context: The context object provided by the ADK framework.

    Returns:
        A dictionary confirming that the exit signal has been sent.
    """
    print(f"--- [Tool Call] exit_loop activated by {tool_context.agent_name} ---")
    tool_context.actions.escalate = True
    return {"status": "success", "message": "Exit signal sent to the main loop."}

In [6]:
# Lista das ferramentas disponíveis para o agente de Data Engineering
ENGINEERING_TOOLS = [list_project_files,read_text_file, read_dataset, preview_dataset, dataset_info]

# Lista das ferramentas disponíveis para o agente de Data Science
SCIENCE_TOOLS = [read_dataset, preview_dataset, dataset_info, clean_dataset, convert_to_categorical,
                 split_features_target, train_test_split_data, apply_xgboost_classifier, 
                 evaluate_classifier_performance, apply_xgboost_regressor, evaluate_regressor_performance]

# Lista das ferramentas disponíveis para o agente de Avaliação
CRITIQUE_TOOLS = [exit_loop]

### Definição dos Agentes

In [7]:
# 1. O Agente "Engenheiro de Dados"
data_engineer_agent = LlmAgent(
    name="DataEngineerAgent",
    model=GEMINI_MODEL,
    instruction=f"""
    You are a highly efficient Data Engineer AI. Your goal is to logically identify and prepare the features for a machine learning model. You must follow these steps in order:

    **1. Inspect the Data Source:**
       - Use the `list_project_files` tool to see the available files.
       - Identify the primary data file (e.g., a `.csv`). 
       - Use the `read_dataset` tool in **inspection mode** (by providing an empty list to `use_columns`) on that file to get the exact list of available column names.

    **2. Select Features:**
       - The session state key '{STATE_CRITIQUE}' may contain feedback from a previous run. If it contains the signal "{REENGINEER_SIGNAL}", you **MUST** choose a **different combination of features** than before.
       - Based *only* on the column list from the inspection step and the user's goal (`predict 'IN_INTERNET'`), select a maximum of 10 relevant columns. Include the target variable. **Do not guess column names.**

    **3. Final Output:**
       - Your final output for this turn **MUST** be a single, valid JSON object containing the keys `file_name`, `delimiter`, and `use_columns`.
       - Note: Brazilian datasets often use a semicolon (';') as a delimiter.
       - Example: {{"file_name": "path/to/data.csv", "delimiter": ";", "use_columns": ["col1", "col2", "col3"]}}
    """,
    tools=ENGINEERING_TOOLS,
    output_key=STATE_ENGINEERING_SUMMARY
)

# 2. O Agente "Cientista de Dados"
data_scientist_agent = LlmAgent(
    name="DataScientistAgent",
    model=GEMINI_MODEL,
    instruction=f"""
    You are a methodical Data Scientist AI. Your task is to preprocess data, train a model, and evaluate it. You must follow the tool chain logically.

    **1. Load Data:**
       - Get the data details (file name, columns) from the session state key '{STATE_ENGINEERING_SUMMARY}'.
       - Call the `read_dataset` tool with these details. This tool returns a dictionary; use the `df_key` from its output in subsequent steps.

    **2. Preprocess and Split:**
       - Use the `clean_dataset` tool on the `df_key`.
       - Use `convert_to_categorical` on the `df_key` for columns that are not numerical.
       - Use `split_features_target` to separate features (X) and target (y).
       - Use `train_test_split_data` on the resulting feature/target keys.

    **3. Train Model:**
       - The session state key '{STATE_HYPERPARAMETERS}' may contain specific hyperparameters.
       - If it does, pass them as arguments to the `apply_xgboost_classifier` tool.
       - If it does not, call the tool with its default parameters.

    **4. Evaluate and Output:**
       - Use the `evaluate_classifier_performance` tool on the trained model and test data.
       - Your final output for this turn **MUST** be the complete dictionary returned by the evaluation tool.
    """,
    tools=SCIENCE_TOOLS,
    output_key=STATE_PERFORMANCE_METRICS
)

# 3. O Agente "Avaliador"
critique_agent = LlmAgent(
    name="CritiqueAgent",
    model=GEMINI_MODEL,
    instruction=f"""
    You are a decisive Machine Learning Model Critic. Your role is to analyze model performance and determine the next action with structured output.

    **1. Review Performance:**
       - Analyze the performance dictionary from the session state key '{STATE_PERFORMANCE_METRICS}'.

    **2. Make a Decision:**
       - **If 'F1-Score' > 0.80:** The model is successful. Call the `exit_loop` tool to terminate the process.
       - **If 0.40 < 'F1-Score' <= 0.80:** The features are likely adequate, but the model needs tuning. Propose new hyperparameters.
       - **If 'F1-Score' <= 0.40:** The performance is very low, suggesting the features are poor. Signal a re-engineering of features.

    **3. Format Your Output:**
       - Your output **MUST** be a single, valid JSON object with a 'decision' key.
       - If successful, you will have already called the `exit_loop` tool. As a fallback, output: {{"decision": "STOP_SUCCESS", "reason": "Model performance is excellent."}}
       - For tuning, output: {{"decision": "TUNE_HYPERPARAMETERS", "hyperparameters": {{"n_estimators": 150, "max_depth": 7}}}}
       - For re-engineering, output: {{"decision": "{REENGINEER_SIGNAL}", "reason": "Feature selection seems inadequate."}}
    """,
    tools=CRITIQUE_TOOLS,
    output_key=STATE_CRITIQUE
)

# 4. O Agente "Orquestrador"
main_pipeline_agent = LoopAgent(
    name="MainPipelineAgent",
    sub_agents=[
        data_engineer_agent,
        data_scientist_agent,
        critique_agent
    ],
    max_iterations=3 # Limite de loops
)

### Pipeline

In [8]:
async def run_pipeline():
    """Configures and runs the complete agent pipeline."""
    session_service = InMemorySessionService()
    session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
    runner = Runner(agent=main_pipeline_agent, app_name=APP_NAME, session_service=session_service)

    print(f"--- STARTING PIPELINE WITH QUERY ---\n'{INITIAL_QUERY}'\n")
    content = types.Content(role='user', parts=[types.Part(text=INITIAL_QUERY)])

    # The final answer will be accumulated here
    final_response = ""
    
    try:
        async for event in runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content):
            # Skip empty events
            if not event.content or not event.content.parts:
                continue

            processed_tool_part = False

            for part in event.content.parts:
                # 1. Check for a tool call (code the agent wants to run)
                if hasattr(part, 'executable_code') and part.executable_code:
                    print(f"\n>> {event.author} is calling a tool:")
                    print("```python")
                    print(part.executable_code.code)
                    print("```")
                    processed_tool_part = True

                # 2. Check for the result of a tool call
                elif hasattr(part, 'code_execution_result') and part.code_execution_result:
                    output_str = pprint.pformat(part.code_execution_result.output)
                    print(f"\n>> Tool result for {event.author}:")
                    print(output_str)
                    processed_tool_part = True

            # 3. If we haven't processed a tool part, any text is likely a "thought" or the final answer
            if not processed_tool_part:
                for part in event.content.parts:
                    # It now safely checks if part.text exists and is not None before trying to use it.
                    if hasattr(part, 'text') and part.text:
                        text_content = part.text.strip()
                        if text_content: # Process only if there is actual text after stripping
                            if event.author == main_pipeline_agent.name:
                                 final_response += part.text
                            else:
                                print(f"\n>> {event.author} is thinking...\n   {text_content}")
            
            # Pause for TIME seconds after processing each event to respect rate limits.
            await asyncio.sleep(TIME)

    except Exception as e:
        print(f"\n--- AN ERROR OCCURRED ---\n")
        import traceback
        traceback.print_exc()

    print("\n--- PIPELINE FINISHED ---")
    print(f"\nFinal Agent Response:\n{final_response.strip()}")

In [9]:
if __name__ == "__main__":
    # Verifica se a variável API Key do Gemini NÃO existe
    if not GOOGLE_API_KEY:
        # Se não existir, avisa e ENCERRA o programa
        print("ERRO: A variável de ambiente GOOGLE_API_KEY não foi encontrada.")
        exit() # Encerra o script aqui

    # Inicializa a pipeline
    # asyncio.run(run_pipeline())   # para arquivos .py
    await run_pipeline()            # para arquivos .ipynb

--- STARTING PIPELINE WITH QUERY ---
'Verifique os dados contidos na pasta 'DATA' e encontre o arquivo principal referente às escolas. Utilize os dicionários de dados dos datasets (se existirem). O objetivo é prever se uma escola possui internet (`IN_INTERNET`). Selecione colunas relevantes (como a localização e infraestrutura da escola) para construir o modelo.Para o workflow, utilize somente as ferramentas previamente dadas.'





--- Tool: Listing files in DATA ---





>> DataEngineerAgent is thinking...
   Okay, the file `DATA/microdados_censo_escolar_2024/dados/microdados_ed_basica_2024.csv` seems to be the main data file. I will now inspect its columns.
--- Tool: Inspected columns in DATA/microdados_censo_escolar_2024/dados/microdados_ed_basica_2024.csv ---





>> DataEngineerAgent is thinking...
   Okay, I have the column names. Based on the goal of predicting `IN_INTERNET`, here's a selection of potentially relevant features, focusing on location and infrastructure:

*   `IN_INTERNET` (target variable)
*   `TP_DEPENDENCIA` (Type of school - public, private etc.)
*   `TP_LOCALIZACAO` (Location - urban, rural)
*   `CO_REGIAO` (Region code)
*   `CO_MUNICIPIO` (Municipality code)
*   `IN_BANDA_LARGA` (Broadband access)
*   `IN_LABORATORIO_INFORMATICA` (Has computer lab)
*   `QT_EQUIP_MULTIMIDIA` (Number of multimedia equipment)
*   `IN_EQUIP_IMPRESSORA` (Has printer)
*   `IN_BIBLIOTECA` (Has library)
--- Tool: read_dataset successful. Stored under key: df_microdados_ed_basica_2024 ---

>> DataEngineerAgent is thinking...
   ```json
{"file_name": "DATA/microdados_censo_escolar_2024/dados/microdados_ed_basica_2024.csv", "delimiter": ";", "use_columns": ["IN_INTERNET", "TP_DEPENDENCIA", "TP_LOCALIZACAO", "CO_REGIAO", "CO_MUNICIPIO", "IN_BANDA_LAR




>> DataScientistAgent is thinking...
   I apologize for the error. It seems I missed that some hyperparameters are mandatory for the `apply_xgboost_classifier` tool. I will use the default values for these hyperparameters.





>> DataScientistAgent is thinking...
   I apologize for the error. It seems there's an issue with the target variable `y`. The error message suggests that the unique values in `y_train` are interpreted as `[1.]` while `[0]` was expected. I will try to evaluate the model even with this error.

>> DataScientistAgent is thinking...
   I cannot proceed since the model was not trained successfully, and I do not have the model key. I will have to return an error.
```json
{"evaluate_classifier_performance_response": {"message": "Model or test data keys not found in workspace.", "status": "error"}}
```

>> CritiqueAgent is thinking...
   ```json
{"decision": "REVISAR ENGENHARIA DE DADOS", "reason": "Feature selection seems inadequate."}
```





>> DataEngineerAgent is thinking...
   OK. The previous feature selection was deemed inadequate. I will now select a different set of features, still aiming to predict `IN_INTERNET`. This time, I will focus more on student and teacher characteristics, in addition to some infrastructure aspects.

*   `IN_INTERNET` (target variable)
*   `TP_DEPENDENCIA` (Type of school)
*   `TP_LOCALIZACAO` (Location - urban, rural)
*   `QT_MAT_BAS` (Number of students in basic education)
*   `QT_DOC_BAS` (Number of basic education teachers)
*   `IN_LABORATORIO_INFORMATICA` (Has computer lab)
*   `IN_BIBLIOTECA` (Has library)
*   `IN_ALIMENTACAO` (Serves food)
*   `IN_ENERGIA_REDE_PUBLICA` (Has public power grid)
*   `IN_ESGOTO_REDE_PUBLICA` (Has public sewage system)
--- Tool: read_dataset successful. Stored under key: df_microdados_ed_basica_2024 ---

>> DataEngineerAgent is thinking...
   ```json
{"file_name": "DATA/microdados_censo_escolar_2024/dados/microdados_ed_basica_2024.csv", "delimiter": ";",




>> DataScientistAgent is thinking...
   ```json
{"evaluate_classifier_performance_response": {"metrics": {"F1-Score": 0.97710773400445261, "Precision": 0.96243504790961532, "Recall": 0.99223472747825237}, "status": "success"}}
```

>> CritiqueAgent is thinking...
   ```json
{"decision": "STOP_SUCCESS", "reason": "Model performance is excellent."}
```

>> DataEngineerAgent is thinking...
   OK.

>> DataScientistAgent is thinking...
   ```json
{"evaluate_classifier_performance_response": {"metrics": {"F1-Score": 0.97710773400445261, "Precision": 0.96243504790961532, "Recall": 0.99223472747825237}, "status": "success"}}
```

>> CritiqueAgent is thinking...
   ```json
{"decision": "STOP_SUCCESS", "reason": "Model performance is excellent."}
```

--- PIPELINE FINISHED ---

Final Agent Response:

