In [1]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    import langchain, langgraph, chromadb, sentence_transformers, lightgbm, pandas, numpy, matplotlib, sklearn, asyncio, aiohttp, langsmith, mcp, guardrails
    print("✅ All packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain", "langchain-openai", "langgraph", "langchain-community", "chromadb", "sentence-transformers", "lightgbm", "pandas", "numpy", "matplotlib", "scikit-learn", "asyncio", "aiohttp", "langsmith", "mcp-client", "guardrails-ai"])
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable, run_trees

# MCP (Model Context Protocol) imports
# This is kept for conceptual context, though no specific client functions are called.
import mcp

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agent-pool")

# Set environment variables for LangChain and LangSmith
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
        langsmith_client = None
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")
    langsmith_client = None

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()
            run_tree = langsmith.get_run_tree()
            trace_id = run_tree.id if run_tree else 'unknown'

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp, "trace_id": trace_id
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }
        asyncio.create_task(self._register_with_a2a())

    async def _register_with_a2a(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }
        self.setup_enhanced_agent_army()

    @traceable
    def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        print(f"✅ Enhanced Agent Army Initialized: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(f"market analysis for {product_data.get('category')}", k=3)

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()


# Step 11: Initialize Enhanced Orchestrator
print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
observable_orchestrator = ObservableRetailOrchestrator()


# Step 12: Enhanced Demo with Full Observability
async def run_observable_demo():
    """Run the main demo workflow and print observability metrics."""
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    return final_observability


# Step 13: Main execution block
async def main():
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    print("\n🚀 Starting enhanced observable demo...")
    final_observability = await run_observable_demo()

    # Step 14: Visualization is better in a notebook environment
    # Here we print the key insights
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")


if __name__ == "__main__":
    # In a script, you run the asyncio event loop.
    # In a Jupyter notebook, you can often just use `await main()` in a cell.
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred during execution: {e}")

⏳ Installing required packages...


CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'install', 'langchain', 'langchain-openai', 'langgraph', 'langchain-community', 'chromadb', 'sentence-transformers', 'lightgbm', 'pandas', 'numpy', 'matplotlib', 'scikit-learn', 'asyncio', 'aiohttp', 'langsmith', 'mcp-client', 'guardrails-ai']' returned non-zero exit status 1.

In [1]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    import langchain, chromadb, sentence_transformers, lightgbm, pandas, numpy, matplotlib, sklearn, asyncio, aiohttp, langsmith, mcp, guardrails
    print("✅ All packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    # Use !pip install for Colab environment
    !pip install langchain langchain-openai langgraph langchain-community chromadb sentence-transformers lightgbm pandas numpy matplotlib scikit-learn asyncio aiohttp langsmith mcp-client guardrails-ai
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable, run_trees

# MCP (Model Context Protocol) imports
# This is kept for conceptual context, though no specific client functions are called.
import mcp

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agent-pool")

# Set environment variables for LangChain and LangSmith
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
        langsmith_client = None
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")
    langsmith_client = None

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()
            run_tree = langsmith.get_run_tree()
            trace_id = run_tree.id if run_tree else 'unknown'

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp, "trace_id": trace_id
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }
        asyncio.create_task(self._register_with_a2a())

    async def _register_with_a2a(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }
        self.setup_enhanced_agent_army()

    @traceable
    def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        print(f"✅ Enhanced Agent Army Initialized: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(f"market analysis for {product_data.get('category')}", k=3)

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()


# Step 11: Initialize Enhanced Orchestrator
print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
observable_orchestrator = ObservableRetailOrchestrator()


# Step 12: Enhanced Demo with Full Observability
async def run_observable_demo():
    """Run the main demo workflow and print observability metrics."""
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    return final_observability


# Step 13: Main execution block
async def main():
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    print("\n🚀 Starting enhanced observable demo...")
    final_observability = await run_observable_demo()

    # Step 14: Visualization is better in a notebook environment
    # Here we print the key insights
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")


if __name__ == "__main__":
    # In a script, you run the asyncio event loop.
    # In a Jupyter notebook, you can often just use `await main()` in a cell.
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred during execution: {e}")

⏳ Installing required packages...
Collecting langgraph
  Using cached langgraph-0.6.10-py3-none-any.whl.metadata (6.8 kB)
Collecting asyncio
  Using cached asyncio-4.0.0-py3-none-any.whl.metadata (994 bytes)
[31mERROR: Ignored the following versions that require a different python version: 0.0.0 Requires-Python >=3.13[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement mcp-client (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for mcp-client[0m[31m
[0m✅ All packages installed successfully!
📚 Libraries imported successfully!
🔑 Setting up API Keys and Observability...
✅ LangSmith client initialized successfully!
✅ OpenAI API key verified successfully!
🛠️ Initializing MCP Server...
🔄 Initializing A2A Communication Layer...
🛢️ Initializing Observable Vector Database...
✅ Observable Vector database initialized
📊 Generating sample data...
🎯 Initializing Observable Retail Agent Army Orchestrator...
✅ Enhanced Agent Army Initialize

  print(f"An error occurred during execution: {e}")


In [2]:
# !pip install chromadb

Collecting chromadb
  Using cached chromadb-1.1.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.0 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [

In [2]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    # Check for essential packages
    import langchain, chromadb, sentence_transformers, pandas, langsmith, mcp
    print("✅ Key packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    import subprocess
    import sys
    # Install the full list of packages. 'mcp-client' and 'asyncio' are removed as they are invalid/unnecessary.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain", "langchain-openai", "langgraph", "langchain-community", "chromadb", "sentence-transformers", "lightgbm", "pandas", "numpy", "matplotlib", "scikit-learn", "aiohttp", "langsmith", "guardrails-ai"])
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable

# MCP (Model Context Protocol) imports
# This is kept for conceptual context, though no specific client functions are called.
import mcp

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agent-army")

# Set environment variables for LangChain and LangSmith

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
langsmith_client = None
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }
        # In a notebook environment, it's better to explicitly call registration
        # after the event loop is confirmed to be running.
        # asyncio.create_task(self._register_with_a2a())

    async def _register_with_a2a(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }

    @traceable
    async def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army and register them."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        # Register agents explicitly
        for agent in self.agents.values():
            await agent._register_with_a2a()

        print(f"✅ Enhanced Agent Army Initialized and Registered: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(f"market analysis for {product_data.get('category')}", k=3)

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()

# This is the main execution function.
async def run_demo():
    """Initializes and runs the entire demo."""
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    # Step 11: Initialize Enhanced Orchestrator
    print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
    observable_orchestrator = ObservableRetailOrchestrator()
    await observable_orchestrator.setup_enhanced_agent_army()


    # Step 12: Enhanced Demo with Full Observability
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    # Step 13: Visualization and final summary
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")

# Execute the demo
# In a Jupyter/Colab notebook, you would run the following line in a cell:
# await run_demo()
# For a script, we handle the event loop.
if __name__ == "__main__":
    try:
        # This handles the "asyncio.run() cannot be called from a running event loop" error
        # by checking if a loop is already running, which is common in notebooks.
        loop = asyncio.get_running_loop()
        if loop.is_running():
            print("Event loop is already running. Creating a task.")
            loop.create_task(run_demo())
        else:
            asyncio.run(run_demo())
    except RuntimeError:
        # No event loop running, so we can safely start one.
        asyncio.run(run_demo())
    except Exception as e:
        print(f"An error occurred during execution: {e}")



✅ Key packages already installed.
📚 Libraries imported successfully!
🔑 Setting up API Keys and Observability...
✅ LangSmith client initialized successfully!
✅ OpenAI API key verified successfully!
🛠️ Initializing MCP Server...
🔄 Initializing A2A Communication Layer...
🛢️ Initializing Observable Vector Database...
✅ Observable Vector database initialized
📊 Generating sample data...
Event loop is already running. Creating a task.


In [3]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    # Check for essential packages
    import langchain, chromadb, sentence_transformers, pandas, langsmith
    print("✅ Key packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    import subprocess
    import sys
    # Install the full list of packages. 'mcp-client', 'langgraph', and 'asyncio' are removed as they are invalid/unnecessary.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain", "langchain-openai", "langchain-community", "chromadb", "sentence-transformers", "lightgbm", "pandas", "numpy", "matplotlib", "scikit-learn", "aiohttp", "langsmith", "guardrails-ai"])
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agent-army")

# Set environment variables for LangChain and LangSmith
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
langsmith_client = None
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }

    async def register(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }

    @traceable
    async def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army and register them."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        # Register agents explicitly
        for agent in self.agents.values():
            await agent.register()

        print(f"✅ Enhanced Agent Army Initialized and Registered: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(f"market analysis for {product_data.get('category')}", k=3)

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()

# Step 11: Main Execution Function
async def main():
    """Initializes and runs the entire demo."""
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    # Initialize Enhanced Orchestrator
    print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
    observable_orchestrator = ObservableRetailOrchestrator()
    await observable_orchestrator.setup_enhanced_agent_army()


    # Run Enhanced Demo with Full Observability
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    # Visualization and final summary
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")

# Step 12: Run the Demo
# In a Jupyter/Colab notebook, this final block should be in its own cell.
# The `asyncio.run()` function starts the event loop and runs the `main` coroutine.
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred during the demo run: {e}")



✅ Key packages already installed.
📚 Libraries imported successfully!
🔑 Setting up API Keys and Observability...
✅ LangSmith client initialized successfully!
✅ OpenAI API key verified successfully!
🛠️ Initializing MCP Server...
🔄 Initializing A2A Communication Layer...
🛢️ Initializing Observable Vector Database...
✅ Observable Vector database initialized
📊 Generating sample data...
An error occurred during the demo run: asyncio.run() cannot be called from a running event loop


  print(f"An error occurred during the demo run: {e}")


In [4]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    # Check for essential packages
    import langchain, chromadb, sentence_transformers, pandas, langsmith, mcp, nest_asyncio
    print("✅ Key packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    import subprocess
    import sys
    # Install the full list of packages, including nest_asyncio to handle event loop issues.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain", "langchain-openai", "langchain-community", "chromadb", "sentence-transformers", "lightgbm", "pandas", "numpy", "matplotlib", "scikit-learn", "aiohttp", "langsmith", "guardrails-ai", "nest_asyncio"])
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid
import nest_asyncio

# Apply the patch to allow nested event loops, which is common in notebooks.
nest_asyncio.apply()

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable

# MCP (Model Context Protocol) conceptual import
import mcp

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agents")

# Set environment variables for LangChain and LangSmith
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
langsmith_client = None
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }

    async def register(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }

    @traceable
    async def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army and register them."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        # Register agents explicitly
        for agent in self.agents.values():
            await agent.register()

        print(f"✅ Enhanced Agent Army Initialized and Registered: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(f"market analysis for {product_data.get('category')}", k=3)

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()

# Step 11: Main Execution Function
async def main():
    """Initializes and runs the entire demo."""
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    # Initialize Enhanced Orchestrator
    print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
    observable_orchestrator = ObservableRetailOrchestrator()
    await observable_orchestrator.setup_enhanced_agent_army()


    # Run Enhanced Demo with Full Observability
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    # Visualization and final summary
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")

# Step 12: Run the Demo
# In a Jupyter/Colab notebook, this final block should be in its own cell.
# The `asyncio.run()` function starts the event loop and runs the `main` coroutine.
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred during the demo run: {e}")



✅ Key packages already installed.
📚 Libraries imported successfully!
🔑 Setting up API Keys and Observability...
✅ LangSmith client initialized successfully!
✅ OpenAI API key verified successfully!
🛠️ Initializing MCP Server...
🔄 Initializing A2A Communication Layer...
🛢️ Initializing Observable Vector Database...
✅ Observable Vector database initialized
📊 Generating sample data...
🎯 Initializing Observable Retail Agent Army Orchestrator...
✅ Agent registered: pricing_alpha_enhanced (enhanced_pricing_strategist)
✅ Agent registered: pricing_beta_enhanced (enhanced_pricing_strategist)
✅ Enhanced Agent Army Initialized and Registered: 2 observable agents

🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO
🚀 Starting OBSERVABLE pricing workflow for Wireless Bluetooth Headphones...
❌ VectorDB Search Error: Expected where to have exactly one operator, got {} in query.
🔍 VectorDB Search: 'pricing strategy for electronics' → 3 results
🔍 VectorDB Search: 'pricing strategy for electronics' → 3 res

In [6]:
# Step 1: Install all required packages
# Note: In a local environment, you would run these commands in your terminal.
# In a Jupyter/Colab notebook, the '!' prefix executes them as shell commands.
try:
    # Check for essential packages
    import langchain, chromadb, sentence_transformers, pandas, langsmith, mcp, nest_asyncio
    print("✅ Key packages already installed.")
except ImportError:
    print("⏳ Installing required packages...")
    import subprocess
    import sys
    # Install the full list of packages, including nest_asyncio to handle event loop issues.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain", "langchain-openai", "langchain-community", "chromadb", "sentence-transformers", "lightgbm", "pandas", "numpy", "matplotlib", "scikit-learn", "aiohttp", "langsmith", "guardrails-ai", "nest_asyncio"])
    print("✅ All packages installed successfully!")


# Step 2: Import libraries
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import uuid
import nest_asyncio

# Apply the patch to allow nested event loops, which is common in notebooks.
nest_asyncio.apply()

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.schema import Document
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# LangSmith for AI Observability
import langsmith
from langsmith import Client, traceable

# MCP (Model Context Protocol) conceptual import
import mcp

print("📚 Libraries imported successfully!")


# Step 3: Configure API Keys and Observability
print("🔑 Setting up API Keys and Observability...")

# It's highly recommended to use environment variables or a secrets management tool.
# For this example, we'll use placeholders.
# IMPORTANT: Replace these with your actual keys.
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', "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")
LANGSMITH_API_KEY = os.environ.get('LANGSMITH_API_KEY', "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")
LANGSMITH_PROJECT = os.environ.get('LANGSMITH_PROJECT', "retail-agents-MCP")


# Set environment variables for LangChain and LangSmith
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGSMITH_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_TRACING_V2"] = "true" # Enables LangSmith tracing

# Initialize LangSmith client
langsmith_client = None
try:
    if LANGSMITH_API_KEY != "YOUR_LANGSMITH_API_KEY":
        langsmith_client = Client()
        print("✅ LangSmith client initialized successfully!")
    else:
        print("⚠️ LangSmith API key not set. Skipping client initialization.")
except Exception as e:
    print(f"⚠️ LangSmith initialization warning: {e}")

# Test the OpenAI API key
try:
    if OPENAI_API_KEY != "YOUR_OPENAI_API_KEY":
        test_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        test_response = test_llm.invoke("Test")
        print("✅ OpenAI API key verified successfully!")
    else:
         print("❌ OpenAI API key is not set. Please replace 'YOUR_OPENAI_API_KEY'.")
except Exception as e:
    print(f"❌ OpenAI API key error: {e}")


# Step 4: MCP Server Implementation for Retail
class RetailMCPServer:
    """
    A simulated Model Context Protocol (MCP) server that provides specialized tools
    for retail analysis. Each tool is traceable for observability.
    """
    def __init__(self):
        self.tools = {
            "price_optimizer": self.price_optimization_tool,
            "demand_forecaster": self.demand_forecasting_tool,
            "inventory_optimizer": self.inventory_optimization_tool,
            "customer_analyzer": self.customer_analytics_tool,
            "competitor_monitor": self.competitive_intelligence_tool,
            "market_analyzer": self.market_analysis_tool
        }
        self.session_id = str(uuid.uuid4())

    @traceable
    async def price_optimization_tool(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """MCP Tool: Advanced price optimization using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Perform advanced price optimization analysis:

            PRODUCT: {product_data}
            MARKET: {market_conditions}

            Calculate:
            1. Price elasticity estimation
            2. Optimal price point with confidence intervals
            3. Competitive positioning strategy
            4. Revenue maximization approach
            5. Risk assessment

            Return structured JSON analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "price_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": response.content,
                "recommendations": {
                    "optimal_price_range": {"min": 45.0, "max": 125.0, "recommended": 89.99},
                    "confidence_score": 0.87,
                    "expected_revenue_impact": "+15-25%",
                    "risk_level": "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "price_optimizer"}

    @traceable
    async def demand_forecasting_tool(self, historical_data: List, market_signals: Dict) -> Dict:
        """MCP Tool: Demand forecasting with market signals."""
        try:
            forecast_data = {
                "next_30_days": np.random.normal(100, 20, 30).tolist(),
                "confidence_interval": {"lower": 80, "upper": 120},
                "seasonality_factor": 1.15,
                "trend_direction": "upward"
            }
            return {
                "tool": "demand_forecaster",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "forecast": forecast_data,
                "market_signals_used": list(market_signals.keys()),
                "data_points_analyzed": len(historical_data)
            }
        except Exception as e:
            return {"error": str(e), "tool": "demand_forecaster"}

    @traceable
    async def inventory_optimization_tool(self, current_stock: Dict, demand_forecast: Dict) -> Dict:
        """MCP Tool: Inventory optimization based on demand forecasting."""
        try:
            optimization_recommendations = {
                "safety_stock_levels": {
                    "high_demand": current_stock.get('current_stock', 0) * 0.3,
                    "medium_demand": current_stock.get('current_stock', 0) * 0.2,
                    "low_demand": current_stock.get('current_stock', 0) * 0.1
                },
                "reorder_points": {
                    "urgent": current_stock.get('reorder_point', 0) * 0.8,
                    "standard": current_stock.get('reorder_point', 0),
                    "relaxed": current_stock.get('reorder_point', 0) * 1.2
                },
                "cost_optimization": {
                    "expected_savings": "15-20%",
                    "stockout_risk_reduction": "40-60%",
                    "carrying_cost_optimization": "10-15%"
                }
            }
            return {
                "tool": "inventory_optimizer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "optimization": optimization_recommendations,
                "demand_forecast_incorporated": True
            }
        except Exception as e:
            return {"error": str(e), "tool": "inventory_optimizer"}

    @traceable
    async def customer_analytics_tool(self, customer_data: Dict, purchase_history: List) -> Dict:
        """MCP Tool: Advanced customer analytics using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze customer behavior and generate insights:

            CUSTOMER: {customer_data}
            PURCHASE HISTORY: {purchase_history[:5]}

            Provide:
            1. Customer lifetime value prediction
            2. Churn risk assessment
            3. Personalization opportunities
            4. Retention strategy
            5. Cross-selling recommendations

            Return structured analysis.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "customer_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "customer_insights": response.content,
                "metrics": {
                    "lifetime_value_score": self._calculate_ltv(purchase_history),
                    "churn_risk": "low" if len(purchase_history) > 3 else "medium",
                    "engagement_level": "high" if len(purchase_history) > 5 else "medium"
                }
            }
        except Exception as e:
            return {"error": str(e), "tool": "customer_analyzer"}

    @traceable
    async def competitive_intelligence_tool(self, product_data: Dict, market_scope: str = "local") -> Dict:
        """MCP Tool: Competitive market intelligence."""
        try:
            competitor_analysis = {
                "main_competitors": ["Competitor A", "Competitor B", "Competitor C"],
                "price_comparison": {
                    "our_price": product_data.get('current_price', 0),
                    "competitor_avg": product_data.get('current_price', 0) * 0.9,
                    "market_range": {
                        "min": product_data.get('current_price', 0) * 0.7,
                        "max": product_data.get('current_price', 0) * 1.3
                    }
                },
                "competitive_positioning": "market_leader" if product_data.get('current_price', 0) > 100 else "value_player",
                "recommendations": [
                    "Monitor competitor pricing weekly",
                    "Differentiate through value-added services",
                    "Consider bundle pricing strategies"
                ]
            }
            return {
                "tool": "competitor_monitor",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "analysis": competitor_analysis,
                "market_scope": market_scope
            }
        except Exception as e:
            return {"error": str(e), "tool": "competitor_monitor"}

    @traceable
    async def market_analysis_tool(self, product_category: str, timeframe: str = "30d") -> Dict:
        """MCP Tool: Comprehensive market analysis using an LLM."""
        try:
            llm = ChatOpenAI(model="gpt-4", temperature=0.1)
            prompt = f"""
            Analyze market conditions for {product_category} over {timeframe}:

            Provide:
            1. Market trends and growth projections
            2. Consumer behavior insights
            3. Regulatory considerations
            4. Technology impacts
            5. Strategic recommendations

            Focus on actionable insights for retail pricing and inventory.
            """
            response = await llm.ainvoke(prompt)
            return {
                "tool": "market_analyzer",
                "session_id": self.session_id,
                "timestamp": datetime.now().isoformat(),
                "market_analysis": response.content,
                "timeframe": timeframe,
                "category": product_category
            }
        except Exception as e:
            return {"error": str(e), "tool": "market_analyzer"}

    def _calculate_ltv(self, purchase_history: List) -> float:
        """Helper function to calculate customer lifetime value."""
        if not purchase_history:
            return 0.0
        total_revenue = sum(p.get('revenue', 0) for p in purchase_history)
        purchase_count = len(purchase_history)
        avg_order_value = total_revenue / purchase_count if purchase_count > 0 else 0
        return min(round(avg_order_value * purchase_count * 0.1, 2), 1000.0)

print("🛠️ Initializing MCP Server...")
mcp_server = RetailMCPServer()


# Step 5: A2A (Agent-to-Agent) Communication Layer
class A2ACommunicationLayer:
    """
    Manages communication between agents, including direct messages, broadcasts,
    and consensus requests, with full tracing.
    """
    def __init__(self):
        self.agent_registries = {}
        self.conversation_history = {}
        self.performance_metrics = {}

    @traceable
    async def register_agent(self, agent_id: str, agent_type: str, capabilities: List[str]):
        """Register an agent in the A2A network."""
        self.agent_registries[agent_id] = {
            "agent_type": agent_type,
            "capabilities": capabilities,
            "status": "active",
            "last_heartbeat": datetime.now().isoformat()
        }
        self.performance_metrics[agent_id] = {
            "messages_sent": 0, "messages_received": 0,
            "response_time_avg": 0.0, "error_rate": 0.0
        }
        print(f"✅ Agent registered: {agent_id} ({agent_type})")

    @traceable
    async def send_message(self, from_agent: str, to_agent: str, message: Dict, message_type: str = "direct"):
        """Send a message between agents with tracing."""
        try:
            message_id = str(uuid.uuid4())
            timestamp = datetime.now().isoformat()

            message_package = {
                "message_id": message_id, "from_agent": from_agent,
                "to_agent": to_agent, "message_type": message_type,
                "content": message, "timestamp": timestamp
            }

            if from_agent not in self.conversation_history:
                self.conversation_history[from_agent] = []
            self.conversation_history[from_agent].append(message_package)

            self.performance_metrics[from_agent]["messages_sent"] += 1
            if to_agent in self.performance_metrics:
                self.performance_metrics[to_agent]["messages_received"] += 1

            print(f"📨 A2A Message: {from_agent} → {to_agent} ({message_type})")
            return message_id
        except Exception as e:
            print(f"❌ A2A Message Error: {e}")
            return None

    @traceable
    async def broadcast_message(self, from_agent: str, message: Dict, agent_filter: List[str] = None):
        """Broadcast a message to multiple agents."""
        try:
            target_agents = agent_filter if agent_filter else list(self.agent_registries.keys())
            target_agents = [agent for agent in target_agents if agent != from_agent]

            tasks = [self.send_message(from_agent, agent_id, message, "broadcast") for agent_id in target_agents]
            message_ids = await asyncio.gather(*tasks)
            return message_ids
        except Exception as e:
            print(f"❌ A2A Broadcast Error: {e}")
            return []

    def get_communication_metrics(self) -> Dict:
        """Get A2A communication performance metrics."""
        total_messages = sum(m["messages_sent"] for m in self.performance_metrics.values())
        active_agents = len([a for a, i in self.agent_registries.items() if i["status"] == "active"])
        return {
            "total_agents_registered": len(self.agent_registries),
            "active_agents": active_agents,
            "total_messages_exchanged": total_messages,
            "agent_performance": self.performance_metrics,
            "system_health": "optimal" if active_agents > 0 else "degraded"
        }

print("🔄 Initializing A2A Communication Layer...")
a2a_communication = A2ACommunicationLayer()


# Step 6: Enhanced Vector Database with Observability
class ObservableVectorDB:
    """
    A wrapper for a Chroma vector database that adds observability and performance tracking.
    """
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = None
        self.query_metrics = {"total_queries": 0, "successful_queries": 0, "average_response_time": 0.0}
        self.setup_sample_data()

    @traceable
    def setup_sample_data(self):
        """Initialize with comprehensive retail sample data."""
        sample_docs = [
            Document(page_content="Premium pricing strategy works best for electronics with 40-60% markup", metadata={"category": "pricing", "product_type": "electronics"}),
            Document(page_content="Competitive pricing essential for smartphone accessories market", metadata={"category": "pricing", "product_type": "accessories"}),
            Document(page_content="Safety stock levels should cover 2-week demand for fast-moving goods", metadata={"category": "inventory", "metric": "safety_stock"}),
            Document(page_content="Holiday season demand increases 60-80% for electronics", metadata={"category": "trends", "season": "Q4"}),
            Document(page_content="Price sensitivity is highest in the budget segment", metadata={"category": "customer", "insight": "pricing"})
        ]
        self.vector_store = Chroma.from_documents(documents=sample_docs, embedding=self.embeddings, persist_directory="./chroma_retail_db")
        print("✅ Observable Vector database initialized")

    @traceable
    async def enhanced_search(self, query: str, category: str = None, k: int = 3, agent_id: str = "unknown") -> List[Dict]:
        """Enhanced semantic search with observability."""
        start_time = datetime.now()
        self.query_metrics["total_queries"] += 1
        try:
            filter_dict = {"category": category} if category else {}
            results = self.vector_store.similarity_search_with_score(query, k=k, filter=filter_dict)

            response_time = (datetime.now() - start_time).total_seconds()
            self.query_metrics["successful_queries"] += 1
            total_queries = self.query_metrics["successful_queries"]
            self.query_metrics["average_response_time"] = ((self.query_metrics["average_response_time"] * (total_queries - 1)) + response_time) / total_queries

            formatted_results = [{"content": doc.page_content, "metadata": doc.metadata, "similarity_score": float(score)} for doc, score in results]
            print(f"🔍 VectorDB Search: '{query}' → {len(formatted_results)} results")
            return formatted_results
        except Exception as e:
            print(f"❌ VectorDB Search Error: {e}")
            return []

    def get_performance_metrics(self) -> Dict:
        """Get vector database performance metrics."""
        success_rate = self.query_metrics["successful_queries"] / max(1, self.query_metrics["total_queries"])
        return {**self.query_metrics, "success_rate": success_rate, "health_status": "optimal"}

print("🛢️ Initializing Observable Vector Database...")
observable_vector_db = ObservableVectorDB()


# Step 7: Enhanced Base Agent with MCP and A2A Integration
class EnhancedRetailAgent:
    """
    A base class for retail AI agents, integrating MCP tools, A2A communication,
    and an observable vector database.
    """
    def __init__(self, agent_id: str, agent_type: str, model: str = "gpt-4"):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.mcp_server = mcp_server
        self.a2a = a2a_communication
        self.vector_db = observable_vector_db
        self.history = []
        self.performance_metrics = {
            "tasks_completed": 0, "mcp_tools_used": 0,
            "a2a_messages_sent": 0, "vector_db_queries": 0,
            "avg_response_time": 0.0, "success_rate": 1.0
        }

    async def register(self):
        """Register agent with the A2A communication layer."""
        await self.a2a.register_agent(self.agent_id, self.agent_type, self._get_agent_capabilities())

    def _get_agent_capabilities(self) -> List[str]:
        """Define agent capabilities for A2A registration."""
        caps = ["analysis", "decision_support", "collaboration"]
        if "pricing" in self.agent_type: caps.extend(["price_optimization", "competitive_analysis"])
        elif "inventory" in self.agent_type: caps.extend(["demand_forecasting", "stock_optimization"])
        return caps

    @traceable
    async def use_mcp_tool(self, tool_name: str, tool_params: Dict) -> Dict:
        """Use MCP tools with built-in observability and A2A notifications."""
        if tool_name in self.mcp_server.tools:
            self.performance_metrics["mcp_tools_used"] += 1
            result = await self.mcp_server.tools[tool_name](**tool_params)

            # Notify other agents about tool usage
            await self.a2a.broadcast_message(self.agent_id, {
                "type": "mcp_tool_used", "tool": tool_name,
                "summary": str(result)[:100] + "..."
            })
            return result
        return {"error": f"Tool {tool_name} not found"}

    def get_enhanced_status(self) -> Dict:
        """Get comprehensive agent status with observability metrics."""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "performance_metrics": self.performance_metrics,
            "a2a_registered": self.agent_id in self.a2a.agent_registries,
            "health_status": "optimal"
        }


# Step 8: Enhanced Specialized Agents
class EnhancedPricingAgent(EnhancedRetailAgent):
    """A specialized agent for pricing strategy, using all system components."""
    def __init__(self, agent_id: str):
        super().__init__(agent_id, "enhanced_pricing_strategist", "gpt-4")

    @traceable
    async def analyze_pricing(self, product_data: Dict, market_conditions: Dict) -> Dict:
        """Enhanced pricing analysis workflow."""
        start_time = datetime.now()
        try:
            # 1. Get context from Vector DB
            self.performance_metrics["vector_db_queries"] += 1
            context = await self.vector_db.enhanced_search(
                f"pricing strategy for {product_data.get('category')}",
                category="pricing", agent_id=self.agent_id
            )

            # 2. Use MCP tools for advanced analysis
            mcp_analysis = await self.use_mcp_tool("price_optimizer", {
                "product_data": product_data,
                "market_conditions": {**market_conditions, "vector_context": context}
            })

            # 3. Generate final recommendation
            final_recommendation = await self._generate_final_recommendation(
                product_data, mcp_analysis, context
            )

            return {
                "recommendation": final_recommendation,
                "mcp_analysis_used": mcp_analysis,
                "context_used": context,
                "processing_time": (datetime.now() - start_time).total_seconds()
            }
        except Exception as e:
            return {"error": str(e)}

    async def _generate_final_recommendation(self, product_data: Dict, mcp_analysis: Dict, context: List) -> str:
        """Synthesize a final recommendation using an LLM."""
        prompt = f"""
        Synthesize a final pricing recommendation based on the following data:
        PRODUCT: {product_data}
        MCP ANALYSIS: {mcp_analysis}
        VECTOR DB CONTEXT: {context}

        Provide a comprehensive pricing strategy for executive decision-making.
        """
        response = await self.llm.ainvoke(prompt)
        return response.content


# Step 9: Enhanced Orchestrator with Full Observability
class ObservableRetailOrchestrator:
    """
    Orchestrates the agent army, runs workflows, and provides a centralized
    view of system-wide observability.
    """
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
        self.observability_metrics = {
            "total_workflows": 0, "successful_workflows": 0, "average_workflow_time": 0.0
        }

    @traceable
    async def setup_enhanced_agent_army(self):
        """Initialize the enhanced agent army and register them."""
        self.agents = {
            "pricing_alpha_enhanced": EnhancedPricingAgent("pricing_alpha_enhanced"),
            "pricing_beta_enhanced": EnhancedPricingAgent("pricing_beta_enhanced"),
        }
        # Register agents explicitly
        for agent in self.agents.values():
            await agent.register()

        print(f"✅ Enhanced Agent Army Initialized and Registered: {len(self.agents)} observable agents")

    @traceable
    async def run_observable_pricing_workflow(self, product_data: Dict) -> Dict:
        """Run a pricing workflow with full observability."""
        workflow_start = datetime.now()
        self.observability_metrics["total_workflows"] += 1
        try:
            print(f"🚀 Starting OBSERVABLE pricing workflow for {product_data.get('name')}...")
            market_context = await observable_vector_db.enhanced_search(
                f"market analysis for {product_data.get('category')}",
                category="trends",
                k=3
            )

            pricing_agents = [agent for aid, agent in self.agents.items() if "pricing" in aid]
            tasks = [agent.analyze_pricing(product_data, {"context": market_context}) for agent in pricing_agents]
            pricing_results = await asyncio.gather(*tasks)

            aggregated_recommendation = await self.aggregate_enhanced_recommendations(pricing_results, product_data)

            workflow_time = (datetime.now() - workflow_start).total_seconds()
            self.observability_metrics["successful_workflows"] += 1
            total_workflows = self.observability_metrics["successful_workflows"]
            self.observability_metrics["average_workflow_time"] = ((self.observability_metrics["average_workflow_time"] * (total_workflows-1)) + workflow_time) / total_workflows

            workflow_result = {
                "workflow_type": "observable_pricing",
                "aggregated_recommendation": aggregated_recommendation,
                "workflow_duration": workflow_time
            }
            self.workflow_history.append(workflow_result)
            print(f"✅ Observable pricing workflow completed in {workflow_time:.2f}s")
            return workflow_result
        except Exception as e:
            print(f"❌ Observable workflow error: {e}")
            return {"error": str(e)}

    @traceable
    async def aggregate_enhanced_recommendations(self, results: List[Dict], context_data: Dict) -> Dict:
        """Aggregate recommendations from multiple agents using an LLM."""
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        prompt = f"""
        Aggregate these pricing recommendations from multiple AI agents:
        CONTEXT: {context_data}
        AGENT ANALYSES: {json.dumps(results, indent=2)}
        Provide a single, consolidated strategy.
        """
        response = await llm.ainvoke(prompt)
        return {"consolidated_strategy": response.content}

    def get_comprehensive_observability(self) -> Dict:
        """Get comprehensive observability data from all system components."""
        agent_statuses = {aid: agent.get_enhanced_status() for aid, agent in self.agents.items()}
        return {
            "timestamp": datetime.now().isoformat(),
            "orchestrator_metrics": self.observability_metrics,
            "agent_army_status": {
                "total_agents": len(self.agents),
                "agent_breakdown": {"pricing_agents": len([a for a in self.agents if "pricing" in a])}
            },
            "a2a_communication_metrics": a2a_communication.get_communication_metrics(),
            "vector_db_performance": observable_vector_db.get_performance_metrics(),
            "workflow_history_summary": {
                "total_workflows": len(self.workflow_history),
                "success_rate": self.observability_metrics["successful_workflows"] / max(1, self.observability_metrics["total_workflows"])
            }
        }


# Step 10: Generate Sample Data
def generate_sample_data():
    """Generate sample data for the demonstration."""
    return {
        "products": [
            {"product_id": "PROD_001", "name": "Wireless Bluetooth Headphones", "category": "electronics", "cost_price": 45.0, "current_price": 99.99},
            {"product_id": "PROD_002", "name": "Smartphone Protective Case", "category": "accessories", "cost_price": 4.50, "current_price": 19.99}
        ]
    }

print("📊 Generating sample data...")
sample_data = generate_sample_data()

# Step 11: Main Execution Function
async def main():
    """Initializes and runs the entire demo."""
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY":
        print("\n\n❌ Cannot run demo. Please set your OpenAI API key at the top of the script.")
        return

    # Initialize Enhanced Orchestrator
    print("🎯 Initializing Observable Retail Agent Army Orchestrator...")
    observable_orchestrator = ObservableRetailOrchestrator()
    await observable_orchestrator.setup_enhanced_agent_army()


    # Run Enhanced Demo with Full Observability
    print("\n" + "="*70)
    print("🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO")
    print("="*70)

    # Run pricing workflows
    for product in sample_data['products']:
        await observable_orchestrator.run_observable_pricing_workflow(product)

    # Show final dashboard data
    final_observability = observable_orchestrator.get_comprehensive_observability()
    print("\n3. 📈 FINAL OBSERVABILITY DASHBOARD DATA")
    print(json.dumps(final_observability, indent=2, default=str))

    # Visualization and final summary
    print("\n" + "="*70)
    print("🎊 DEMO COMPLETED!")
    print("="*70)
    print("🔍 OBSERVABILITY FEATURES DEMONSTRATED:")
    print("  ✅ MCP (Model Context Protocol) Integration")
    print("  ✅ A2A (Agent-to-Agent) Communication Layer")
    print("  ✅ LangSmith AI Observability & Tracing")
    print("  ✅ Real-time Performance Monitoring")

# Step 12: Run the Demo
# This is the standard way to run an async function from a script.
# The `nest_asyncio.apply()` at the top handles notebook environments.
if __name__ == "__main__":
    # A simple check to avoid running the demo if keys are not set.
    if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY" or LANGSMITH_API_KEY == "YOUR_LANGSMITH_API_KEY":
        print("\n🛑 DEMO HALTED: Please provide your API keys in Step 3.")
    else:
        asyncio.run(main())



✅ Key packages already installed.
📚 Libraries imported successfully!
🔑 Setting up API Keys and Observability...
✅ LangSmith client initialized successfully!
✅ OpenAI API key verified successfully!
🛠️ Initializing MCP Server...
🔄 Initializing A2A Communication Layer...
🛢️ Initializing Observable Vector Database...
✅ Observable Vector database initialized
📊 Generating sample data...
🎯 Initializing Observable Retail Agent Army Orchestrator...
✅ Agent registered: pricing_alpha_enhanced (enhanced_pricing_strategist)
✅ Agent registered: pricing_beta_enhanced (enhanced_pricing_strategist)
✅ Enhanced Agent Army Initialized and Registered: 2 observable agents

🔍 OBSERVABLE RETAIL AI AGENT ARMY - ENHANCED DEMO
🚀 Starting OBSERVABLE pricing workflow for Wireless Bluetooth Headphones...
🔍 VectorDB Search: 'market analysis for electronics' → 3 results
🔍 VectorDB Search: 'pricing strategy for electronics' → 3 results
🔍 VectorDB Search: 'pricing strategy for electronics' → 3 results
📨 A2A Message: pr