In [1]:
import os
import sys
import socket
import time
import threading
import argparse
import json
from python_a2a import OpenAIA2AServer, run_server, A2AServer, AgentCard, AgentSkill
from python_a2a.langchain import to_langchain_agent, to_langchain_tool
from python_a2a.mcp import FastMCP
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
import yfinance as yf
import pandas as pd
import json
 # Import required libraries
import json
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin

In [None]:
def find_available_port(start_port=5000, max_tries=20):
    """Find an available port starting from start_port"""
    for port in range(start_port, start_port + max_tries):
        try:
            # Try to create a socket on the port
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.bind(('localhost', port))
            sock.close()
            return port
        except OSError:
            # Port is already in use, try the next one
            continue
    
    # If we get here, no ports were available
    print(f"⚠️  Could not find an available port in range {start_port}-{start_port + max_tries - 1}")
    return start_port + 1000  # Try a port well outside the normal range

def run_server_in_thread(server_func, server, **kwargs):
    """Run a server in a background thread"""
    thread = threading.Thread(target=server_func, args=(server,), kwargs=kwargs, daemon=True)
    thread.start()
    time.sleep(2)  # Give the server time to start
    return thread

In [None]:
def parse_arguments():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(description="A2A + MCP + LangChain Stock Market Integration Example")
    parser.add_argument(
        "--a2a-port", type=int, default=None,
        help="Port to run the A2A server on (default: auto-select)"
    )
    parser.add_argument(
        "--mcp-port", type=int, default=None,
        help="Port to run the MCP server on (default: auto-select)"
    )
    parser.add_argument(
        "--model", type=str, default="gpt-4o",
        help="OpenAI model to use (default: gpt-4o)"
    )
    parser.add_argument(
        "--temperature", type=float, default=0.0,
        help="Temperature for generation (default: 0.0)"
    )
    return parser.parse_args()

In [None]:
def main():
    """Main function"""
    # Check API key
    if not check_api_key():
        return 1
    
    # Parse arguments
    args = parse_arguments()# Find available ports - use different ranges to avoid conflicts
    a2a_port = args.a2a_port or find_available_port(5000, 20)
    mcp_port = args.mcp_port or find_available_port(7000, 20)  # Try higher port range
    
    print(f"🔍 A2A server port: {a2a_port}")
    print(f"🔍 MCP server port: {mcp_port}")
    
    # Step 1: Create the OpenAI-powered A2A server for Stock Market Expertise
    print("\n📝 Step 1: Creating OpenAI-Powered A2A Server")
    
    # Create an Agent Card for our OpenAI-powered agent
    agent_card = AgentCard(
        name="Stock Market Expert",
        description=f"An A2A agent specialized in stock market analysis and financial information",
        url=f"http://localhost:{a2a_port}",
        version="1.0.0",
        skills=[
            AgentSkill(
                name="Market Analysis",
                description="Analyzing market trends, stock performance, and market indicators",
                examples=["What's the current market sentiment?", "How do interest rates affect tech stocks?"]
            ),
            AgentSkill(
                name="Investment Strategies",
                description="Providing information about various investment approaches and strategies",
                examples=["What is dollar cost averaging?", "How should I diversify my portfolio?"]
            ),
            AgentSkill(
                name="Company Analysis",
                description="Analyzing specific companies, their fundamentals, and performance metrics",
                examples=["What are key metrics to evaluate a tech company?", "How to interpret P/E ratios?"]
            )
        ]
    )
    
    # Create the OpenAI server
    openai_server = OpenAIA2AServer(
        api_key=os.environ["OPENAI_API_KEY"],
        model=args.model,
        temperature=args.temperature,
        system_prompt="You are a stock market and financial analysis expert. Provide accurate, concise information about stocks, market trends, investment strategies, and financial metrics. Focus on educational content and avoid making specific investment recommendations or predictions."
    )

In [None]:
# Wrap it in a standard A2A server for proper handling
class StockMarketExpert(A2AServer):
    def __init__(self, openai_server, agent_card):
        super().__init__(agent_card=agent_card)
        self.openai_server = openai_server
    
    def handle_message(self, message):
        """Handle incoming messages by delegating to OpenAI server"""
        return self.openai_server.handle_message(message)

# Create the wrapped agent
stock_agent = StockMarketExpert(openai_server, agent_card)

# Start the A2A server in a background thread
a2a_server_url = f"http://localhost:{a2a_port}"
print(f"\nStarting A2A server on {a2a_server_url}...")

def run_a2a_server(server, host="0.0.0.0", port=a2a_port):
    """Run the A2A server"""
    run_server(server, host=host, port=port)

a2a_thread = run_server_in_thread(run_a2a_server, stock_agent)

# Step 2: Create MCP Server with Finance Tools
print("\n📝 Step 2: Creating MCP Server with Finance Tools")

# Create MCP server with tools
mcp_server = FastMCP(
    name="Finance Tools",
    description="Advanced tools for stock market analysis"
)

In [None]:
# Stock Data Fetcher Tool - Enhanced
@mcp_server.tool(
    name="stock_data",
    description="Fetch stock data for one or more ticker symbols"
)
def stock_data(input_str=None, **kwargs):
    """Fetch stock data using yfinance with enhanced parsing."""
    try:
        # Handle both positional and keyword arguments
        if 'input' in kwargs:
            input_str = kwargs['input']
        
        # Make sure we have a string
        if input_str is None:
            return {"text": "Error: No ticker symbol provided"}
        
        input_str = str(input_str).strip()
        
        # Extract tickers - support multiple formats
        tickers = []
        
        # Check for comma-separated tickers
        if ',' in input_str:
            tickers = [t.strip().upper() for t in input_str.split(',') if t.strip()]
        else:
            # Extract words that look like tickers (1-5 letters)
            import re
            tickers = [word.upper() for word in re.findall(r'\b[A-Za-z]{1,5}\b', input_str)]
        
        # Map common stock names to tickers if no explicit tickers found
        if not tickers:
            common_stocks = {
                'apple': 'AAPL', 'microsoft': 'MSFT', 'google': 'GOOGL', 
                'amazon': 'AMZN', 'tesla': 'TSLA', 'nvidia': 'NVDA'
            }
            for name, ticker in common_stocks.items():
                if name.lower() in input_str.lower():
                    tickers.append(ticker)
        
        if not tickers:
            return {"text": "No valid ticker symbols found in input"}
        
        # Default parameters
        period = "1mo"
        interval = "1d"_sy
        
        # Very simple parameter extraction
        if "year" in input_str or "1y" in input_str:
            period = "1y"
        elif "week" in input_str or "1w" in input_str:
            period = "1wk"
        
        # Import libraries
        import yfinance as yf
        import pandas as pd
        import json
        
        # Process each ticker
        results = {}
        
        for ticker_symbol in tickers:
            try:
                # Fetch data
                ticker = yf.Ticker(ticker_symbol)
                hist = ticker.history(period=period, interval=interval)
                
                if hist.empty:
                    results[ticker_symbol] = {"error": f"No data found for {ticker_symbol}"}
                    continue
                
                # Get more comprehensive info
                info = ticker.info
                company_name = info.get('shortName', ticker_symbol)
                sector = info.get('sector', 'Unknown')
                industry = info.get('industry', 'Unknown')
                
                # Enhanced summary with more metrics
                latest = hist.iloc[-1]
                earliest = hist.iloc[0]
                price_change = float(latest['Close']) - float(earliest['Close'])
                percent_change = (price_change / float(earliest['Close'])) * 100
                
                # Calculate some key statistics
                high_52week = info.get('fiftyTwoWeekHigh', 'Unknown')
                low_52week = info.get('fiftyTwoWeekLow', 'Unknown')
                avg_volume = info.get('averageVolume', 'Unknown')
                market_cap = info.get('marketCap', 'Unknown')
                pe_ratio = info.get('trailingPE', 'Unknown')
                
                # Create a more comprehensive summary
                summary = {
                    "ticker": ticker_symbol,
                    "company_name": company_name,
                    "sector": sector,
                    "industry": industry,
                    "latest_price": float(latest['Close']),
                    "price_change": float(price_change),
                    "percent_change": float(percent_change),
                    "52_week_high": high_52week,
                    "52_week_low": low_52week,
                    "average_volume": avg_volume,
                    "market_cap": market_cap,
                    "pe_ratio": pe_ratio,
                    "period": period,
                    "interval": interval,
                    "data_points": len(hist)
                }
                
                results[ticker_symbol] = summary
                
            except Exception as e:
                results[ticker_symbol] = {"error": f"Error processing {ticker_symbol}: {str(e)}"}
        
        return {"text": json.dumps(results)}
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        return {"text": f"Error: {str(e)}\nDetails: {error_details}"}

In [None]:
# Web Scraper Tool - Enhanced
@mcp_server.tool(
    name="web_scraper",
    description="Get financial news and information"
)
def web_scraper(input_str=None, **kwargs):
    """Get financial news using web search."""
    try:
        # Handle both positional and keyword arguments
        if 'input' in kwargs:
            input_str = kwargs['input']
        
        # Make sure we have a string
        if input_str is None:
            return {"text": "Error: No input provided"}
        
        input_str = str(input_str).strip()
        
        # Import required libraries
        import json
        import requests
        from bs4 import BeautifulSoup
        import re
        from urllib.parse import urljoin
        
        # Determine if this is a ticker, URL, or topic
        if re.match(r'^[A-Za-z]{1,5}$', input_str):
            # It's a ticker symbol - get stock news
            ticker = input_str.upper()
            
            # Use a simple approach with Finviz
            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36'}
            url = f"https://finviz.com/quote.ashx?t={ticker.lower()}"
            
            response = requests.get(url, headers=headers)
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Get news items
            news_table = soup.find('table', {'id': 'news-table'})
            news_items = []
            
            if news_table:
                for row in news_table.find_all('tr')[:5]:
                    cells = row.find_all('td')
                    if len(cells) >= 2:
                        date_cell = cells[0]
                        title_cell = cells[1]
                        
                        link = title_cell.find('a')
                        if link:
                            news_link = link['href']
                            # Fix relative URLs
                            if not news_link.startswith('http'):
                                news_link = urljoin(url, news_link)
                            
                            news_items.append({
                                "title": link.text.strip(),
                                "link": news_link,
                                "date": date_cell.text.strip()
                            })
            
            # Try to get some company details from Finviz as well
            company_details = {}
            snapshot_table = soup.find('table', {'class': 'snapshot-table2'})
            if snapshot_table:
                rows = snapshot_table.find_all('tr')
                for row in rows:
                    cells = row.find_all('td')
                    for i in range(0, len(cells), 2):
                        if i+1 < len(cells):
                            key = cells[i].text.strip()
                            value = cells[i+1].text.strip()
                            if key and value:
                                company_details[key] = value
            
            return {"text": json.dumps({
                "ticker": ticker,
                "news_items": news_items,
                "company_details": company_details
            })}
            
        elif input_str.startswith('http'):
            # It's a URL - just return basic info
            return {"text": json.dumps({
                "url": input_str,
                "message": "URL processing is simplified in this version."
            })}
        else:
            # Treat as a topic
            topic = input_str.replace("topic:", "").strip()
            
            return {"text": json.dumps({
                "topic": topic,
                "message": "Please use web search for detailed topic information."
            })}
            
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        return {"text": f"Error: {str(e)}\nDetails: {error_details}"}

In [None]:
# Start the MCP server in a background thread
    mcp_server_url = f"http://localhost:{mcp_port}"
    print(f"\nStarting MCP server on {mcp_server_url}...")
    
    def run_mcp_server(server, host="0.0.0.0", port=mcp_port):
        """Run the MCP server"""
        server.run(host=host, port=port)
    
    mcp_thread = run_server_in_thread(run_mcp_server, mcp_server)
    
    # Wait a bit longer for the MCP server to start
    print("\nWaiting for servers to initialize...")
    time.sleep(5)
    
    # Check if MCP server is actually running
    mcp_server_running = False
    try:
        import requests
        response = requests.get(f"{mcp_server_url}/tools")
        if response.status_code == 200:
            mcp_server_running = True
    except:
        pass
    
    if not mcp_server_running:
        print(f"❌ MCP server failed to start on port {mcp_port}.")
        print("Let's try a different port...")
        
        # Try a different port
        mcp_port = find_available_port(8000, 20)
        mcp_server_url = f"http://localhost:{mcp_port}"
        print(f"\nRetrying MCP server on {mcp_server_url}...")
        
        # Start the MCP server in a background thread with new port
        mcp_thread = run_server_in_thread(run_mcp_server, mcp_server, port=mcp_port)
        time.sleep(5)
    
    # Step 3: Convert A2A agent to LangChain
    print("\n📝 Step 3: Converting A2A Agent to LangChain")
    
    try:
        langchain_agent = to_langchain_agent(a2a_server_url)
        print("✅ Successfully converted A2A agent to LangChain")
    except Exception as e:
        print(f"❌ Error converting A2A agent to LangChain: {e}")
        return 1
    
    # Step 4: Convert MCP tools to LangChain tools
    print("\n📝 Step 4: Converting MCP Tools to LangChain")
    
    try:
        stock_data_tool = to_langchain_tool(mcp_server_url, "stock_data")
        web_scraper_tool = to_langchain_tool(mcp_server_url, "web_scraper")
        print("✅ Successfully converted MCP tools to LangChain")
    except Exception as e:
        print(f"❌ Error converting MCP tools to LangChain: {e}")
        print("\nContinuing with only the A2A agent...")
        stock_data_tool = None
        web_scraper_tool = None
    
    # Step 5: Test the components individually
    print("\n📝 Step 5: Testing Individual Components")
    
    # Test A2A agent via LangChain
    try:
        print("\nTesting A2A-based LangChain agent:")
        result = langchain_agent.invoke("What are some key metrics to evaluate a company's stock?")
        print(f"A2A Agent Response: {result.get('output', '')}")
    except Exception as e:
        print(f"❌ Error using A2A-based LangChain agent: {e}")
    
    # Test MCP tools via LangChain if available
    if stock_data_tool and web_scraper_tool:
        try:
            print("\nTesting MCP-based LangChain tools:")
            
            print("\n1. Stock Data Tool:")
            stock_result = stock_data_tool.invoke("AAPL")
            print(f"Stock Data Tool Response: {stock_result[:500]}...")  # Truncate for display
            
            print("\n2. Web Scraper Tool:")
            web_result = web_scraper_tool.invoke("AAPL")
            print(f"Web Scraper Tool Response: {web_result[:500]}...")  # Truncate for display
            
        except Exception as e:
            print(f"❌ Error using MCP-based LangChain tools: {e}")
            import traceback
            traceback.print_exc()

In [None]:
# Step 6: Creating a Meta-Agent with available components
print("\n📝 Step 6: Creating Meta-Agent with Available Tools")

try:
    # Create an LLM for the meta-agent
    llm = ChatOpenAI(model=args.model, temperature=args.temperature)
    
    # Create wrapper functions for the LangChain tools
    def ask_stock_expert(query):
        """Ask the stock market expert agent a question."""
        try:
            result = langchain_agent.invoke(query)
            return result.get('output', 'No response')
        except Exception as e:
            return f"Error querying stock expert: {str(e)}"
    
    # Create tools list starting with the A2A agent
    tools = [
        Tool(
            name="StockExpert",
            func=ask_stock_expert,
            description="Ask the stock market expert questions about investing, market trends, financial concepts, etc."
        )
    ]
    
    # Add MCP tools if available
    if stock_data_tool:
        def get_stock_data(ticker_info):
            """Get stock data for analysis."""
            try:
                # Ensure ticker_info is a string
                if ticker_info is None:
                    return "Error: No ticker symbol provided"
                if not isinstance(ticker_info, str):
                    ticker_info = str(ticker_info)
                
                result = stock_data_tool.invoke(ticker_info)
                return result
            except Exception as e:
                return f"Error getting stock data: {str(e)}"
                
        tools.append(Tool(
            name="StockData",
            func=get_stock_data,
            description="Get historical stock data. Input can be one or more ticker symbols (e.g., 'AAPL' or 'AAPL, MSFT')"
        ))
        
    if web_scraper_tool:
        def get_financial_news(query):
            """Get financial news and information."""
            try:
                # Ensure query is a string
                if query is None:
                    return "Error: No query provided"
                if not isinstance(query, str):
                    query = str(query)
                
                result = web_scraper_tool.invoke(query)
                return result
            except Exception as e:
                return f"Error getting financial news: {str(e)}"
                
        tools.append(Tool(
            name="FinancialNews",
            func=get_financial_news,
            description="Get financial news. Input can be a ticker symbol, URL, or topic."
        ))
    
    # Create the meta-agent
    meta_agent = initialize_agent(
        tools, 
        llm, 
        agent=AgentType.OPENAI_FUNCTIONS,
        verbose=True,
        handle_parsing_errors=True
    )
    #  Test the meta-agent
    print("\nTesting Meta-Agent with available tools:")
    
    test_query = "What are the current stock prices of Apple and Nvidia?"
    
    print(f"\nQuery: {test_query}")
    meta_result = meta_agent.invoke(test_query)
    print(f"\nMeta-Agent Response: {meta_result}")
    
except Exception as e:
    print(f"❌ Error creating or using meta-agent: {e}")
    import traceback
    traceback.print_exc()

# Keep the servers running until user interrupts
print("\n✅ Integration successful!")
print("Press Ctrl+C to stop the servers and exit")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("\nExiting...")

return 0


In [None]:

if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print("\nProgram interrupted by user")
        sys.exit(0)