In [None]:
# Complete Improved Travel Assistant with All Methods
# Includes the three requested methods: run_comprehensive_test, quick_airbnb_search, comprehensive_travel_planning

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio
import asyncio
import os
from datetime import datetime
import json
from pathlib import Path
import subprocess
import time
import traceback
from typing import Dict, Union

class TravelAssistant:
    """Complete travel assistant with all requested methods"""
    
    # Geographic knowledge base
    LOCATION_CONTEXT = {
        'Stockholm': {
            'region': 'Sweden',
            'airports': ['Arlanda Airport (ARN)', 'Bromma Airport (BMA)'],
            'major_locations': ['Gamla Stan', 'Norrmalm', 'Södermalm', 'Central Station'],
            'nearby_cities': ['Uppsala', 'Norrköping', 'Eskilstuna'],
            'country_code': 'SE',
            'currency': 'SEK'
        },
        'Paris': {
            'region': 'France',
            'airports': ['Charles de Gaulle (CDG)', 'Orly (ORY)'],
            'major_locations': ['Eiffel Tower', 'Louvre', 'Champs-Élysées', 'Marais'],
            'nearby_cities': ['Versailles', 'Fontainebleau', 'Reims'],
            'country_code': 'FR',
            'currency': 'EUR'
        },
        'Gothenburg': {
            'region': 'Sweden',
            'airports': ['Gothenburg Landvetter Airport (GOT)'],
            'major_locations': ['Centrum', 'Haga', 'Linné', 'Majorna'],
            'nearby_cities': ['Borås', 'Trollhättan', 'Uddevalla'],
            'nearby_landmarks': ['Tjolöholm\'s castle (35km south)'],
            'country_code': 'SE',
            'currency': 'SEK'
        }
    }
    
    def __init__(self):
        self.api_key = self._setup_api_key()
        self.servers = self._initialize_servers()
        self.agent = None
        self.working_servers = []
    
    def _setup_api_key(self) -> str:
        """Setup and validate API key"""
        api_key = os.environ.get("ANTHROPIC_API_KEY")
        
        if not api_key:
            config_path = Path.home() / ".anthropic" / "config.json"
            if config_path.exists():
                with open(config_path, 'r') as f:
                    config = json.load(f)
                    api_key = config.get('api_key')
        
        if not api_key:
            api_key = input("Please enter your Anthropic API key: ")
        
        os.environ["ANTHROPIC_API_KEY"] = api_key
        return api_key
    
    def _initialize_servers(self) -> Dict[str, MCPServerStdio]:
        """Initialize available MCP servers"""
        return {
            'airbnb': MCPServerStdio(
                command='npx',
                args=['-y', '@openbnb/mcp-server-airbnb', '--ignore-robots-txt'],
                env=os.environ
            ),
            'filesystem': MCPServerStdio(
                command='npx',
                args=['-y', '@modelcontextprotocol/server-filesystem', './'],
                env=os.environ
            )
        }
    
    async def test_prerequisites(self) -> Dict[str, bool]:
        """Test system requirements"""
        results = {}
        
        # Test API key
        results['api_key'] = bool(self.api_key)
        
        # Test basic agent
        try:
            basic_agent = Agent('claude-3-7-sonnet-20250219')
            result = await basic_agent.run("Test")
            results['basic_agent'] = True
        except Exception as e:
            results['basic_agent'] = False
            print(f"Basic agent test failed: {e}")
        
        # Test NPX
        try:
            subprocess.run(['npx', '--version'], check=True, capture_output=True)
            results['npx'] = True
        except:
            results['npx'] = False
            print("NPX not found - please install Node.js and npm")
        
        return results
    
    async def test_server(self, name: str, server: MCPServerStdio, timeout: int = 20) -> Dict[str, Union[bool, str]]:
        """Test individual MCP server"""
        print(f"Testing {name} server...")
        start_time = time.time()
        
        try:
            agent = Agent('claude-3-7-sonnet-20250219', mcp_servers=[server])
            
            async def test_run():
                async with agent.run_mcp_servers():
                    return await agent.run("Test")
            
            result = await asyncio.wait_for(test_run(), timeout=timeout)
            elapsed = time.time() - start_time
            
            return {
                'success': True,
                'elapsed_time': f"{elapsed:.1f}s",
                'message': f"{name} server working"
            }
        except asyncio.TimeoutError:
            return {
                'success': False,
                'elapsed_time': f"{timeout}s (timeout)",
                'message': f"{name} server timed out"
            }
        except Exception as e:
            return {
                'success': False,
                'elapsed_time': f"{time.time() - start_time:.1f}s",
                'message': f"{name} server error: {str(e)}"
            }
    
    async def test_all_servers(self) -> Dict[str, Dict]:
        """Test all available servers"""
        results = {}
        
        for name, server in self.servers.items():
            results[name] = await self.test_server(name, server)
            
        # Identify working servers
        self.working_servers = [
            server for name, server in self.servers.items() 
            if results[name]['success']
        ]
        
        return results
    
    def create_agent(self, use_all_servers: bool = False) -> Agent:
        """Create agent with specified servers"""
        servers_to_use = self.servers.values() if use_all_servers else self.working_servers
        
        # Add custom time tool
        async def get_time():
            return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        self.agent = Agent(
            'claude-3-7-sonnet-20250219',
            system_prompt="""You're a travel agent with access to Airbnb listings and file operations.
            When dealing with Airbnb, provide comprehensive information including:
            - Name and title of listing
            - Location and neighborhood
            - Price per night
            - Number of bedrooms, beds, and guests
            - Rating and review count
            - Distance and travel time to specified locations
            Always format results as markdown tables when requested.""",
            mcp_servers=list(servers_to_use),
            tools=[get_time]
        )
        
        return self.agent
    
    async def run_with_fallback(self, prompt: str, timeout: int = 60) -> str:
        """Run agent with fallback to simpler configuration"""
        if not self.agent:
            self.create_agent()
        
        try:
            print("Running with all working servers...")
            async def run_full():
                async with self.agent.run_mcp_servers():
                    return await self.agent.run(prompt)
            
            result = await asyncio.wait_for(run_full(), timeout=timeout)
            return result.output
            
        except asyncio.TimeoutError:
            print("Timeout with full configuration, trying minimal setup...")
            
            # Fallback to minimal setup (just Airbnb if available)
            if 'airbnb' in self.servers and any('airbnb' in str(s) for s in self.working_servers):
                minimal_agent = Agent(
                    'claude-3-7-sonnet-20250219',
                    system_prompt="You're a travel agent who helps find Airbnb accommodations.",
                    mcp_servers=[self.servers['airbnb']]
                )
                
                try:
                    async with minimal_agent.run_mcp_servers():
                        result = await minimal_agent.run(prompt)
                        return result.output
                except Exception as e:
                    return f"Minimal setup also failed: {str(e)}"
            else:
                return "No working servers available for fallback"
        
        except Exception as e:
            return f"Error: {str(e)}\n{traceback.format_exc()}"
    
    async def save_results(self, content: str, filename: str = "travel_results.md"):
        """Save results to file"""
        try:
            with open(filename, "w") as f:
                f.write(content)
            return f"Results saved to {filename}"
        except Exception as e:
            return f"Failed to save file: {str(e)}"
    
    # Method 1: run_comprehensive_test
    async def run_comprehensive_test(self):
        """Run all diagnostic tests"""
        print("Travel Assistant Comprehensive Diagnostic Test")
        print("=" * 50)
        
        # Test prerequisites
        prereq_results = await self.test_prerequisites()
        print("\nPrerequisites:")
        for item, status in prereq_results.items():
            print(f"  {item}: {'✓' if status else '✗'}")
        
        if not all(prereq_results.values()):
            print("\nCannot proceed - fix prerequisites first")
            return
        
        # Test individual servers
        print("\nTesting Individual Servers:")
        server_results = await self.test_all_servers()
        for name, result in server_results.items():
            status = '✓' if result['success'] else '✗'
            print(f"  {name}: {status} ({result['elapsed_time']}) - {result['message']}")
        
        # Test combined functionality
        if self.working_servers:
            print(f"\nTesting Combined Functionality ({len(self.working_servers)} working servers):")
            self.create_agent()
            
            test_prompt = "Find 1 Airbnb in Stockholm for 2 people, check-in tomorrow, checkout in 3 days"
            result = await self.run_with_fallback(test_prompt, timeout=30)
            print(f"Result preview: {result[:200]}..." if len(result) > 200 else result)
        else:
            print("\nNo working servers available for combined testing")
    
    # Method 2: quick_airbnb_search
    async def quick_airbnb_search(self):
        """Quick Airbnb search example"""
        prompt = """Find 3 Airbnbs in Paris for 2 people:
        - Check-in: December 20
        - Check-out: December 23
        - Near Eiffel Tower
        Format as markdown table with: name, price, beds, rating"""
        
        result = await self.run_with_fallback(prompt)
        print(result)
        
        # Save results
        save_result = await self.save_results(result, "paris_airbnbs.md")
        print(save_result)
    
    # Method 3: comprehensive_travel_planning
    async def comprehensive_travel_planning(self):
        """Full travel planning example"""
        # First, test all systems
        await self.run_comprehensive_test()
        
        # Now plan a trip
        prompt = """Plan a trip to Gothenburg, Sweden:
        1. Find 5 Airbnbs for 2 people, staying July 10-15
        2. Focus on city center locations
        3. Budget: 1000 SEK per night maximum
        4. Provide: name, price, beds, rating, distance to city center
        5. Save results as markdown table
        6. Also estimate transport costs from airport"""
        
        result = await self.run_with_fallback(prompt)
        print(result)
        
        # Save comprehensive results
        save_result = await self.save_results(result, "gothenburg_travel_plan.md")
        print(save_result)

In [None]:
# For Jupyter usage - can be run directly
assistant = TravelAssistant()
await assistant.run_comprehensive_test()

#assistant = TravelAssistant()
#await assistant.quick_airbnb_search()

#assistant = TravelAssistant()
#await assistant.comprehensive_travel_planning()