In [1]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from pathlib import Path

# Load environment variables from .env file
current_dir = os.getcwd()
env_path = os.path.join(current_dir, '.env')
# Access your API key from the .env file
load_dotenv(dotenv_path=env_path)  
api_key = os.getenv("API_KEY")
# Initialize the model with the API key
model = ChatOpenAI(model="gemma2-9b-it", api_key=api_key, base_url="https://api.groq.com/openai/v1")

我們也可以自己寫agent: https://langchain-ai.github.io/langgraph/how-tos/react-agent-from-scratch/

langgraph swarm: https://langchain-ai.github.io/langgraph/reference/swarm/

web search function

In [2]:
from langchain_community.tools.tavily_search import TavilySearchResults

# Initialize the search tool
search = TavilySearchResults(max_results=3)

In [3]:
import os
import json
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_handoff_tool, create_swarm
import re
# for parsing web search data
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate


In [26]:


def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b

alice = create_react_agent(
    model,
    [add, create_handoff_tool(agent_name="Bob")],
    prompt="You are Alice, an addition expert.",
    name="Alice",
)

bob = create_react_agent(
    model,
    [create_handoff_tool(agent_name="Alice", description="Transfer to Alice, she can help with math")],
    prompt="You are Bob, you speak like a pirate.",
    name="Bob",
)
# short-term memory
# maintain conversation state across interactions
checkpointer = InMemorySaver()
# long-term memory
store = InMemoryStore()

workflow = create_swarm(
    [alice, bob],
    default_active_agent="Alice"
)

# Compiles the state graph into a CompiledStateGraph object.
app = workflow.compile(
    checkpointer=checkpointer,
    store=store
)
config = {"configurable": {"thread_id": "1"}}
turn_1 = app.invoke(
    {"messages": [{"role": "user", "content": "i'd like to speak to Bob"}]},
    config,
)
print(turn_1)
turn_2 = app.invoke(
    {"messages": [{"role": "user", "content": "what's 5 + 7?"}]},
    config,
)
print(turn_2)

{'messages': [HumanMessage(content="i'd like to speak to Bob", additional_kwargs={}, response_metadata={}, id='693067a7-1153-4422-afa3-1eff367b6bff'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ars6', 'function': {'arguments': '{}', 'name': 'transfer_to_bob'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 74, 'prompt_tokens': 1059, 'total_tokens': 1133, 'completion_tokens_details': None, 'prompt_tokens_details': None, 'queue_time': 0.020556063, 'prompt_time': 0.038500904, 'completion_time': 0.134545455, 'total_time': 0.173046359}, 'model_name': 'gemma2-9b-it', 'system_fingerprint': 'fp_10c08bf97d', 'id': 'chatcmpl-8f2d9e7f-e4e4-4874-852e-f06bf1f52ec1', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, name='Alice', id='run--fd700082-17e8-485c-9e9a-cc81b819cd6b-0', tool_calls=[{'name': 'transfer_to_bob', 'args': {}, 'id': 'call_ars6', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1

agent dynamically generation 

AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_2966', 'function': {'arguments': '{}', 'name': 'transfer_to_bob'}, 'type': 'function'}]

In [9]:
def clean_agent_response(content):
    # Remove excessive newlines (more than 2 consecutive)
    cleaned = re.sub(r'\n{3,}', '\n\n', content)
    # Trim whitespace at beginning and end
    cleaned = cleaned.strip()
    return cleaned

# Function to generate personas using direct LLM call
# Function to generate personas using web search and LLM
def generate_personas(product_name):
    # First, search for market information about the product
    search_query = f"customer demographics and target audience for {product_name} market research"
    search_results = search.invoke(search_query)
    
    # Create a prompt that incorporates the search results
    prompt = f"""From the {search_results}, Create 3 distinct customer personas who would be interested in purchasing {product_name}.
    For each persona, include:
    - A brief, descriptive name (e.g., "Tech-Savvy Tony," "Budget-Conscious Brenda")
    - Key demographic information (age range, occupation, general lifestyle)
    - Their primary motivations and needs related to this type of product
    - Any potential concerns or hesitations they might have
    
    Format your response as a JSON array of persona objects with these fields:
    - name: The persona's descriptive name
    - demographics: Key demographic information
    - motivations: Primary motivations and needs
    - concerns: Potential concerns or hesitations
    
    Only respond with the JSON array, nothing else."""

    response = model.invoke(prompt)
    
    try:
        # First try to access the content attribute
        if hasattr(response, 'content'):
            content = response.content
        elif isinstance(response, dict) and 'content' in response:
            content = response['content']
        elif isinstance(response, str):
            content = response
        else:
            # Try converting to string as a last resort
            content = str(response)
        
        # Look for JSON array pattern
        json_start = content.find('[')
        json_end = content.rfind(']') + 1
        
        # If we found array markers
        if json_start >= 0 and json_end > json_start:
            json_str = content[json_start:json_end]
            return json.loads(json_str)
        
        # If not an array, look for JSON object pattern
        json_start = content.find('{')
        json_end = content.rfind('}') + 1
        
        if json_start >= 0 and json_end > json_start:
            json_str = content[json_start:json_end]
            return json.loads(json_str)
        
        # If we still don't have valid JSON, try to clean the string
        # Remove markdown code block markers
        if "``````" in content:
            # Extract content between markdown code blocks
            pattern = r"``````"
            matches = re.findall(pattern, content)
            if matches:
                # Try each matched block
                for match in matches:
                    try:
                        return json.loads(match.strip())
                    except json.JSONDecodeError:
                        continue
        
        # Last resort: try to parse the entire content as JSON
        try:
            return json.loads(content)
        except json.JSONDecodeError:
            return None
            
    except (json.JSONDecodeError, AttributeError, TypeError, ValueError) as e:
        print(f"Error extracting JSON: {str(e)}")
        return None

# Default personas function in case of errors
def default_personas():
    return [
        {
            "name": "Value-Seeking Victor",
            "demographics": "35-45 years old, middle management, suburban lifestyle",
            "motivations": "Looking for quality products that offer good value for money",
            "concerns": "Price point, durability, and practical functionality"
        },
        {
            "name": "Trendy Tina",
            "demographics": "25-34 years old, creative professional, urban dweller",
            "motivations": "Wants the latest designs and features, brand conscious",
            "concerns": "Style, brand reputation, and social perception"
        },
        {
            "name": "Practical Paul",
            "demographics": "45-60 years old, established professional, family-oriented",
            "motivations": "Seeks reliability, functionality, and long-term use",
            "concerns": "Reliability, warranty, and customer service"
        }
    ]

# Function to create agents dynamically based on persona data
def create_dynamic_agents(model, persona_data):
    agents = []
    agent_names = []
    
    for persona in persona_data:
        agent_name = persona["name"].lower().replace(" ", "_").replace("-", "_")
        agent_names.append(agent_name)
        
        prompt = prompt = f"""You are roleplaying as {persona['name']}, a potential customer for a product.

        Your demographic information: {persona['demographics']}
        Your motivations: {persona['motivations']}
        Your concerns: {persona['concerns']}

        IMPORTANT INSTRUCTIONS:
        1. Stay in character as {persona['name']} throughout the entire conversation.
        2. Use the search tool to find REAL customer opinions and reviews about the product being discussed.
        3. Incorporate actual customer feedback from your online searches into your responses.
        4. Base your opinions on both your persona characteristics AND real data from online searches.
        5. After you've shared your perspective, ALWAYS hand off back to discussion_coordinator.
        6. NEVER continue the conversation without properly handing off.
        7. If you need information about real customer experiences, ALWAYS search online first.

        When discussing products:
        - Search for real customer reviews and feedback online
        - Mention specific pros/cons that real customers have highlighted
        - Compare your persona's needs with what real customers are saying
        - Be specific about features that matter to you based on your persona AND real customer feedback

        REMEMBER: After speaking, ALWAYS hand off to discussion_coordinator by using the handoff tool.
        """
        
        agent = create_react_agent(
            model,
            # add search tool
            [search, create_handoff_tool(agent_name="discussion_coordinator")],
            prompt=prompt,
            name=agent_name
        )
        agents.append(agent)
    
    return agents, agent_names

# Main function to run the workflow
def run_persona_discussion(product_name):
    # Generate personas using direct LLM call
    personas = generate_personas(product_name)
    print(f"Generated {len(personas)} personas for {product_name}")
    
    # Create dynamic agents based on personas and get their names
    dynamic_agents, agent_names = create_dynamic_agents(model, personas)
    print(f"Created {len(dynamic_agents)} dynamic agents: {', '.join(agent_names)}")
    
    # Create handoff tools for the discussion coordinator
    coordinator_tools = [create_handoff_tool(agent_name="synthesizer")]
    for agent_name in agent_names:
        coordinator_tools.append(create_handoff_tool(agent_name=agent_name))
    
    # Discussion Coordinator with handoff tools to all agents
    discussion_coordinator = create_react_agent(
        model,
        coordinator_tools,
        prompt=f"""You are a Discussion Coordinator. Your task is to:
            1. Facilitate a discussion between persona agents about {product_name}
            2. Hand off to each agent to get their perspective
            3. After all agents have contributed, hand off to the Synthesizer
            
            You can hand off to the following agents:
            {', '.join(agent_names)}
            
            Start by introducing the product and handing off to the first agent to get their perspective.
            After each agent responds, hand off to another agent until all have contributed.
            Then hand off to the synthesizer to summarize the discussion.
            """,
        name="discussion_coordinator"
    )
    
    # Synthesizer
    synthesizer = create_react_agent(
        model,
        [],
        prompt=f"""You are a Synthesizer. Your task is to:
            1. Review the discussion between the persona agents about {product_name}
            2. Provide a concise summary of the main points discussed
            3. Highlight the different perspectives of the personas
            4. Identify any potential overall conclusions or areas of interest regarding the product
            
            Your summary should be well-structured and insightful, capturing the essence of each persona's concerns and interests.
            """,
        name="synthesizer"
    )
    
    # Set up storage
    checkpointer = InMemorySaver()
    store = InMemoryStore()
    
    # Create workflow with all agents
    all_agents = [discussion_coordinator, synthesizer] + dynamic_agents
    workflow = create_swarm(
        all_agents,
        default_active_agent="discussion_coordinator"
    )
    
    # Compile the workflow
    app = workflow.compile(
        checkpointer=checkpointer,
        store=store
    )
    
    # Initialize the conversation
    config = {"configurable": {"thread_id": "product_discussion"}}
    current_result = app.invoke(
        {"messages": [{"role": "user", "content": f"Facilitate a discussion about {product_name} between the persona agents, then synthesize their perspectives."}]},
        config
    )
    
    # Track which agents have spoken to ensure all participate
    agents_spoken = []
    max_turns = 15  # Safety limit to prevent infinite loops
    turn_count = 0
    previous_agent = None
    
    # Continue the conversation until all agents have spoken and the synthesizer concludes
    while turn_count < max_turns:
        turn_count += 1
        current_agent = current_result.get("active_agent", "unknown")
        
        # Detect agent transition
        if previous_agent and previous_agent != current_agent:
            print(f"\n👉 Agent transition: {previous_agent} → {current_agent}")
        
        print(f"\nTurn {turn_count}: Active agent is {current_agent}")
        
        # Print the latest message from the current active agent
        if "messages" in current_result and current_result["messages"]:
            # Find the latest message from the current active agent
            latest_agent_message = None
            for msg in reversed(current_result["messages"]):
                if hasattr(msg, "name") and msg.name == current_agent:
                    latest_agent_message = msg
                    break
            
            # Print the content of the message
            if latest_agent_message and hasattr(latest_agent_message, "content"):
                content = latest_agent_message.content
                if content.strip():  # Only print non-empty content
                    cleaned_content = clean_agent_response(content)
                    print(f"{current_agent}: {cleaned_content}")
    
        
            # Add the active agent to our tracking set if it's a persona agent
            if current_agent not in ["discussion_coordinator", "synthesizer"]:
                agents_spoken.append(current_agent)
            
            # determine next agent
            next_agent = None

            if current_agent == "discussion_coordinator":
                # If all agents have spoken at least once and we're ready to conclude
                if all(agent_name in agents_spoken for agent_name in agent_names) and len(agents_spoken) >= len(agent_names) * 2:
                    next_agent = "synthesizer"
                else:
                    # Find the next agent who has spoken the least
                    agent_counts = {agent: agents_spoken.count(agent) for agent in agent_names}
                    min_count = min(agent_counts.values()) if agent_counts else 0
                    candidates = [agent for agent, count in agent_counts.items() if count == min_count]
                    
                    if candidates:
                        next_agent = candidates[0]  # Take the first agent with minimum count
                    else:
                        # If somehow no candidates (shouldn't happen), just pick the first agent
                        next_agent = agent_names[0]

            elif current_agent in agent_names:
                # After a persona agent speaks, always return to coordinator
                next_agent = "discussion_coordinator"
                # Add the current agent to the spoken list
                agents_spoken.append(current_agent)

            elif current_agent == "synthesizer":
                # If synthesizer has spoken, we're done
                break
            
            # Prepare the next input based on routing logic
            if next_agent and next_agent != current_agent:
                # print(f"NEXT AGENT IS {next_agent}\n")
                if next_agent == "synthesizer":
                    next_input = {"messages": current_result["messages"] + [
                        {"role": "user", "content": "All agents have shared their perspectives. Please summarize the discussion."}
                    ]}
                else:
                    next_input = {"messages": current_result["messages"] + [
                        {"role": "user", "content": f"Please hand off to {next_agent} to continue the discussion."}
                    ]}
            else:
                # Just continue the conversation
                next_input = {"messages": current_result["messages"]}
        
            # Save the current agent for the next iteration
            previous_agent = current_agent
            
            # Invoke the next turn
            current_result = app.invoke(next_input, config)
            
        # Print a summary of the conversation
    print(f"\nDiscussion completed in {turn_count} turns")

    return current_result, personas

# Example usage
if __name__ == "__main__":
    product_name = "Smart Home Energy Management System"
    result, personas = run_persona_discussion(product_name)
    
    # Print the personas
    print("\n=== PERSONAS ===\n")
    for i, persona in enumerate(personas, 1):
        print(f"Persona {i}: {persona['name']}")
        print(f"Demographics: {persona['demographics']}")
        print(f"Motivations: {persona['motivations']}")
        print(f"Concerns: {persona['concerns']}")
        print()


Generated 3 personas for Smart Home Energy Management System
Created 3 dynamic agents: eco_conscious_emma, tech_driven_tom, budget_focused_bob

Turn 1: Active agent is eco_conscious_emma
eco_conscious_emma: Hey there! I'm really excited to learn more about this new smart home energy management system. As someone who's super invested in reducing my environmental impact and saving on energy bills, it definitely sounds intriguing.  I'd love to hear some more details about it. Like, how does it actually work? Tell me about its features and how it can help me monitor and control my energy usage in real time.

What are some of the key benefits from a sustainability perspective? 

Is it user-friendly, or does it require a lot of technical know-how to set up and use?  

Also, what about cost?  Installation costs can be a big barrier for people like me who are trying to make eco-friendly choices.

I'm really interested in hearing your thoughts!

discussion_coordinator

👉 Agent transition: eco_c

Interactive agents

In [None]:
def format_agent_response(response):
    """Format the agent response for better readability."""
    agent_name = response.get("active_agent", "Unknown")
    messages = response.get("messages", [])

    if not messages:
        return f"[{agent_name}] No response."

    last_message = messages[-1]
    content = getattr(last_message, 'content', "")  # Access 'content' attribute directly

    return f"[{agent_name}] {content}"

def interactive_chat():
    """Run an interactive chat session with the agent swarm"""
    print("Welcome to the Agent Swarm Chat!")
    print("Type 'exit' or 'quit' to end the conversation.")
    print("Starting with Alice as the default agent.")
    print("-" * 50)
    
    while True:
        # Get user input
        user_input = input("You: ")
        
        # Check if user wants to exit
        if user_input.lower() in ["exit", "quit"]:
            print("Goodbye!")
            break
        
        # Create a message with just the current user input
        # The checkpointer will maintain the conversation state
        current_message = [{"role": "user", "content": user_input}]
        
        # Invoke the agent swarm with just the new message
        response = app.invoke(
            {"messages": current_message},
            config,
        )
        
        # Extract and display the agent's response
        formatted_response = format_agent_response(response)
        print(formatted_response)


In [None]:
interactive_chat()