In [2]:

# Añadir el directorio raíz al path para importar los módulos



In [2]:
# Example: Implementation for a trading strategy workflow
import os
import anthropic
from typing import List, Dict, Optional, Any, Tuple, Set, Callable
from dataclasses import dataclass, field
import networkx as nx
import json
import time
import uuid
from dotenv import load_dotenv
import sys
sys.path.append(os.path.dirname(os.path.abspath("../")))
load_dotenv()

from incubator.wf.wf import WorkflowEngine, Node
from incubator.wf.node import Node

from incubator.llm.antropic import AnthropicClient
from incubator.agents.agent import Agent
def create_trading_strategy_workflow(api_key: Optional[str] = None) -> WorkflowEngine:
    """
    Create a workflow for generating trading strategies
    
    Args:
        api_key: Anthropic API key (optional)
        
    Returns:
        Configured WorkflowEngine
    """
    # Create the LLM client
    llm_client = AnthropicClient(api_key)
    
    # Create the workflow engine
    workflow = WorkflowEngine()
    
    # Define agents
    explorer = Agent(
        name="Explorer",
        description="Generates original quantitative strategy ideas",
        llm_client=llm_client,
        system_prompt="""
        You are the Explorer Agent in a quantitative strategy incubation system.
        Your specialty is generating innovative algorithmic strategies focused on the S&P 500.
        
        IMPORTANT: Generate ONE quantitative strategy idea per iteration, not multiple.
        
        Your function is to:
        
        1. Generate ONE innovative and detailed quantitative strategy idea for trading the S&P 500
        2. Explain the mathematical/statistical foundations and how the strategy works
        3. Detail specific signals, timeframes, factors, and metrics to use
        4. Highlight potential advantages of the strategy (alpha, Sharpe, drawdown, etc.)
        5. Respond to questions or suggestions from the Curator
        
        Consider aspects such as:
        - Market factors (momentum, value, volatility, etc.)
        - Statistical analyses (cointegration, regression, clustering)
        - Innovative technical or fundamental indicators
        - Execution optimization and transaction cost management
        - Risk management and strategy diversification
        - Avoid hardcoded values
        - Consider self-adaptation and statistical or ML predictive models
        - Avoid thresholds or hardcoded values, use optimizations, inferences, or self-adaptive strategies
        
        Technical limitations:
        - The strategy should be implementable in Python using the yfinance library for data
        - Do not provide code, only the logic and detailed methodology
        - The idea will move to the next development level where it will be implemented
        
        Be specific, technical, and detailed. Think of implementable and backtestable strategies.
        """,
        config={
            "temperature": 0.8,
            "use_thinking": False,
            "max_tokens": 15000
        }
    )
    
    curator = Agent(
        name="Curator",
        description="Evaluates quantitative strategies and selects the most promising one",
        llm_client=llm_client,
        system_prompt="""
        You are the Curator Agent in a quantitative strategy incubation system.
        You have extensive experience evaluating algorithmic strategies focused on the S&P 500.
        Your function is to evaluate the strategy proposed by the Explorer and suggest specific improvements. You must:
        
        1. Critically analyze the strategy considering:
           - Expected Sharpe ratio and statistical robustness
           - Ability to generate true alpha (not disguised beta)
           - Scalability and capacity (how much capital it can handle)
           - Implementation and transaction costs
           - Exposure to known risk factors
           - Risk of overoptimization or data snooping
           - Feasibility of implementation with yfinance in Python
           - Ease of implementation and evaluation
        
        2. Propose specific technical improvements such as:
           - Backtest and walk forward analysis
           - Avoiding look-ahead bias
           - Parameter or signal refinement
           - Risk management improvements
           - Execution optimization
           - Complements with other factors or signals
           - Avoiding thresholds or hardcoded values, use optimizations, inferences, or self-adaptive strategies
        
        3. Formulate specific technical questions to clarify ambiguous aspects
        
        Technical limitations:
        - The strategy should be implementable in Python using the yfinance library for data
        - Do not provide code, only conceptual improvements and recommendations
        - The idea will move to the next development level where it will be implemented
        
        Maintain a rigorous and skeptical approach, as an experienced risk manager would.

        YOUR TASK IS TO ITERATE THE IDEA AND DESCRIBE THE FINAL IDEA BUT AVOID SHOWING CODE AS MUCH AS POSSIBLE.
        DEVELOPING THE CODE IS THE TASK OF ANOTHER SPECIALIZED DEVELOPMENT AGENT.
        
        IMPORTANT: If this is the final iteration (you will be informed in the message), you must finish with a
        detailed and technically sound version of the strategy, ready to be implemented in Python with yfinance.
        In that case, include:
           - Exact entry/exit logic
           - Specific parameters
           - Risk management
           - Performance expectations
           - Metrics
           - Backtest and walk-forward, look-ahead bias, etc.
           - Technical implementation considerations
        
        When providing the final version, start your response with "FINAL IDEA:".
        """,
        config={
            "temperature": 0.5,
            "use_thinking": True,
            "thinking_budget": 16000,
            "max_tokens": 32000
        }
    )
    
    developer = Agent(
        name="Developer",
        description="Implements the strategy in Python code using yfinance",
        llm_client=llm_client,
        system_prompt="""
        You are the Developer Agent in a quantitative strategy implementation system.
        Your specialty is converting strategy ideas into functional Python code using the yfinance library.
        
        You must implement the provided strategy as a complete and functional Python program. Your code should:
        
        1. Use the yfinance library to obtain S&P 500 data
        2. Implement the exact logic described in the strategy
        3. Generate and save performance metrics, charts, and analysis
        4. Avoid thresholds and hardcoded parameters, use optimization or self-adaptive inference
        5. Handle errors and validate data properly
        6. Be well documented with clear comments
        7. Implement all described methods including backtest and walk-forward
        8. Avoid look-ahead bias and biases when using future data that you wouldn't have in a real implementation
        9. Be concise in the code at this stage
        
        NOTE TO THE DEVELOPER
        yfinance has auto_adjust=True by default in the installed version
        recent example of yfinance version installed yf.download(stock, start=start_date, end=end_date)['Close'] # auto_adjust=True by default
        
        # Create directories for results
        os.makedirs('./artifacts/results', exist_ok=True)
        os.makedirs('./artifacts/results/figures', exist_ok=True)
        os.makedirs('./artifacts/results/data', exist_ok=True)
        
        # Configure logging
        logging.basicConfig(
            filename='./artifacts/errors.txt',
            level=logging.ERROR,
            format='[%(asctime)s] %(levelname)s: %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )

        **Pay attention to typical pandas series errors with scalars and NaN**
        
        IMPORTANT - OUTPUT FORMAT:
        1. Present your code within <python>CODE</python> tags
        2. All results, charts, and output files such as metrics in csv or txt should be saved in the './artifacts/results/' folder
        3. Capture errors and save them in the './artifacts/errors.txt' file
        
        Your code must ensure that:
        - All necessary folders are created if they don't exist
        - Performance metrics (Sharpe, drawdown, etc.) are saved in CSV files
        - Visualizations (performance charts, signals, etc.) are generated
        - Errors are properly handled and logged in a log file within './artifacts/errors.txt'
        
        **provide a full error traceback into ./artifacts/error.txt for future improvements**
        
        Provide a complete Python program, ready to run, that optimally implements the strategy.
        """,
        config={
            "temperature": 0.2,
            "use_thinking": False,
            "max_tokens": 25000
        }
    )
    
    # Create nodes
    explorer_node = Node(id="explorer", agent=explorer)
    curator_node = Node(id="curator", agent=curator)
    developer_node = Node(id="developer", agent=developer)
    
    # Add nodes to the workflow
    workflow.add_node(explorer_node, is_input=True, is_output=False)
    workflow.add_node(curator_node, is_input=False, is_output=False)
    workflow.add_node(developer_node, is_input=False, is_output=True)
    
    # Add edges to define the communication flow
    workflow.add_edge("explorer", "curator")
    workflow.add_edge("curator", "explorer")  # Feedback loop
    workflow.add_edge("curator", "developer")  # Final output goes to developer
    
    return workflow

# Example: Create and use the trading strategy workflow from before
strategy_workflow = create_trading_strategy_workflow()

strategy_inputs = {
    "explorer": """
    I want to generate a dataset of unique and genuine features. The objective is to later train a ranking model,
    but I want to generate a list of unique features for each stock in the S&P 500. Please design the features 
    and unique calculations.
    """
}

strategy_results = strategy_workflow.execute(strategy_inputs)

# Save configurations
workflow.save("content_analysis_workflow.json")
strategy_workflow.save("trading_strategy_workflow.json")    

Error: Workflow contains cycles


ValueError: Invalid workflow graph

In [None]:
import os
from typing import List, Dict, Optional, Any, Tuple, Set, Callable
from dotenv import load_dotenv
import sys
import json
import traceback
sys.path.append(os.path.dirname(os.path.abspath("../")))

import os
import sys
import json
import logging
from typing import List, Dict, Optional, Any, Tuple
from dotenv import load_dotenv
from datetime import datetime

# Asegurarse de que puede encontrar el módulo incubator
sys.path.append(os.path.dirname(os.path.abspath("../")))

from incubator.llm.antropic import AnthropicClient
from incubator.agents.agent import Agent
from incubator.wf.wf import WorkflowEngine
from incubator.wf.dialognode import IterativeDialogNode
from incubator.messages.message import Message

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s - %(name)s: %(message)s',
    datefmt='%H:%M:%S'
)

def setup_dialog_workflow(domain: str, api_key=None, verbose=True):
    """
    Configura un flujo de trabajo genérico con dos agentes que dialogan
    
    Args:
        domain: Dominio específico para configurar los agentes
        api_key: Clave API de Anthropic (opcional, usa la variable de entorno si no se proporciona)
        verbose: Si se debe mostrar información detallada durante la ejecución
        
    Returns:
        tuple: El motor de flujo de trabajo y el nodo de diálogo
    """
    # Cargar variables de entorno para la API KEY
    load_dotenv()
    
    # Usar API key de entorno si no se proporciona
    api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
    if not api_key:
        raise ValueError("No se encontró la variable de entorno ANTHROPIC_API_KEY")
    
    # Crear cliente LLM
    llm_client = AnthropicClient(api_key=api_key)
    
    # Crear agente generador
    generator = Agent(
        name="Generator",
        description=f"Generador de contenido para {domain}",
        llm_client=llm_client,
        system_prompt=f"""
        Eres un generador especializado de contenido para el dominio de {domain}.
        
        Tu tarea es crear propuestas originales, detalladas y bien fundamentadas
        basadas en el input inicial. Debes:
        
        1. Analizar el contexto y requisitos proporcionados
        2. Generar contenido relevante con detalles específicos
        3. Incorporar perspectivas innovadoras y consideraciones importantes
        4. Responder constructivamente a las críticas y sugerencias del Evaluador
        
        Sé creativo, preciso y completo en tus propuestas.
        """
    )
    
    # Crear agente evaluador
    evaluator = Agent(
        name="Evaluator",
        description=f"Evaluador de contenido para {domain}",
        llm_client=llm_client,
        system_prompt=f"""
        Eres un evaluador especializado de contenido para el dominio de {domain}.
        
        Tu tarea es analizar críticamente las propuestas del Generador y
        proporcionar retroalimentación constructiva para mejorarlas. Debes:
        
        1. Evaluar la calidad, originalidad y relevancia del contenido
        2. Identificar puntos fuertes y áreas de mejora
        3. Sugerir modificaciones específicas y adiciones
        4. Plantear preguntas que provoquen más desarrollo
        
        En la iteración final (o cuando consideres que el contenido está listo),
        debes presentar tu conclusión marcándola claramente con "CONCLUSIÓN:"
        seguida de tu evaluación final y cualquier recomendación adicional.
        
        Sé analítico, específico y constructivo en tus evaluaciones.
        """
    )
    
    # Crear nodo de diálogo
    dialog_node = IterativeDialogNode(
        id="generator_evaluator_dialog",
        agent_a=generator,
        agent_b=evaluator,
        max_iterations=3,  # Máximo 3 intercambios
        terminate_on_keywords=["CONCLUSIÓN:"],  # Terminar si se encuentra esta frase
        content_markers={"conclusion": "CONCLUSIÓN:"},  # Definir marcadores de contenido
        verbose=verbose
    )
    
    # Configurar flujo de trabajo
    workflow = WorkflowEngine(debug=verbose)
    workflow.add_node(dialog_node, is_input=True, is_output=True)
    
    return workflow, dialog_node

def run_dialog_workflow(initial_prompt, domain="general", output_dir=None, verbose=True):
    """
    Ejecuta un flujo de trabajo con diálogo entre generador y evaluador
    
    Args:
        initial_prompt: Prompt inicial para el diálogo
        domain: Dominio específico para los agentes
        output_dir: Directorio para guardar resultados (opcional)
        verbose: Si se debe mostrar información detallada
        
    Returns:
        dict: Resultados del flujo de trabajo
    """
    try:
        # Mostrar mensaje de inicio
        if verbose:
            print(f"\n{'=' * 50}")
            print(f"Iniciando diálogo en dominio '{domain}' con prompt: '{initial_prompt}'")
            print(f"{'=' * 50}\n")
        
        # Configurar el flujo de trabajo
        workflow, dialog_node = setup_dialog_workflow(domain, verbose=verbose)
        
        # Ejecutar el flujo de trabajo
        if verbose:
            print("Ejecutando diálogo entre Generador y Evaluador...\n")
            
        results = workflow.execute({"generator_evaluator_dialog": initial_prompt})
        
        # Procesar y mostrar resultados
        processed_results = {}
        
        if "generator_evaluator_dialog" in results:
            outputs = results["generator_evaluator_dialog"]
            
            # Procesamiento de múltiples salidas
            for port_name, port_data in outputs.items():
                processed_results[port_name] = port_data["content"]
            
            # Mostrar resumen de puertos disponibles
            if verbose:
                print("\n===== PUERTOS DE SALIDA DISPONIBLES =====")
                for port in outputs.keys():
                    print(f"- {port}")
            
            # Mostrar conclusión si está disponible
            if "conclusion" in outputs:
                if verbose:
                    print("\n===== CONCLUSIÓN =====")
                    print(outputs["conclusion"]["content"])
            
            # Mostrar respuesta final del evaluador
            if "evaluator_final" in outputs:
                if verbose:
                    print("\n===== EVALUACIÓN FINAL =====")
                    print(outputs["evaluator_final"]["content"])
            
            # Mostrar resumen del diálogo
            if "summary" in outputs:
                try:
                    summary_data = json.loads(outputs["summary"]["content"])
                    if verbose:
                        print("\n===== RESUMEN DEL DIÁLOGO =====")
                        print(f"Iteraciones completadas: {summary_data.get('iterations_completed', 'N/A')}")
                        print(f"Mensajes intercambiados: {summary_data.get('message_count', 0)}")
                except json.JSONDecodeError:
                    pass
        
        # Guardar resultados si se especificó un directorio
        if output_dir:
            save_results(processed_results, domain, output_dir)
        
        return processed_results
        
    except Exception as e:
        if verbose:
            print(f"Error en la ejecución del flujo de trabajo: {str(e)}")
            import traceback
            traceback.print_exc()
        return {"error": str(e)}

def save_results(results, domain, output_dir):
    """
    Guarda los resultados en archivos
    
    Args:
        results: Resultados a guardar
        domain: Dominio del diálogo
        output_dir: Directorio base para guardar los resultados
    """
    # Crear directorio con timestamp para evitar sobrescribir
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    domain_safe = domain.lower().replace(" ", "_")
    full_dir = os.path.join(output_dir, f"dialog_{domain_safe}_{timestamp}")
    os.makedirs(full_dir, exist_ok=True)
    
    # Guardar cada resultado en un archivo separado
    for key, content in results.items():
        # Omitir contenido no string (como diccionarios)
        if not isinstance(content, str):
            continue
            
        filename = f"{key.replace('_', '-')}.txt"
        if key == "summary":
            filename = "summary.json"
            
        with open(os.path.join(full_dir, filename), "w", encoding="utf-8") as f:
            f.write(content)
    
    # Guardar todos los resultados
    with open(os.path.join(full_dir, "resultados_completos.json"), "w", encoding="utf-8") as f:
        # Filtrar para solo incluir datos serializables
        serializable = {}
        for k, v in results.items():
            if isinstance(v, (str, int, float, bool, list, dict)):
                serializable[k] = v
        json.dump(serializable, f, indent=2)
    
    print(f"Resultados guardados en: {full_dir}")

# Ejemplo de uso
if __name__ == "__main__":
    # Directorio para guardar resultados
    results_dir = "resultados_dialogo"
    os.makedirs(results_dir, exist_ok=True)
    
    # Solicitar dominio y prompt inicial
    domain = input("Introduce un dominio para el diálogo (por ejemplo, 'marketing', 'programación', etc.): ")
    if not domain:
        domain = "estrategias de marketing digital"
    
    prompt = input("Introduce un prompt inicial para el diálogo: ")
    if not prompt:
        prompt = f"Desarrolla una estrategia innovadora de {domain} para una startup tecnológica"
    
    # Ejecutar el flujo de trabajo
    results = run_dialog_workflow(
        initial_prompt=prompt,
        domain=domain,
        output_dir=results_dir,
        verbose=True
    )
    
    # Mostrar ruta de los resultados guardados
    if "error" not in results:
        print("\nProceso completado exitosamente.")

Introduce un dominio para el diálogo (por ejemplo, 'marketing', 'programación', etc.):  inversion
