In [None]:
# Install required dependencies
!pip install requests pandas ipython

print("‚úÖ Dependencies installed!")

In [None]:
# Install required dependencies
print("üì¶ Installing required dependencies...")
print("This may take a moment if packages need to be downloaded.\n")

import sys
import subprocess
import importlib

def install_package(package):
    """Install a package using pip."""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package], 
                            capture_output=True, text=True)
        return True
    except subprocess.CalledProcessError as e:
        print(f"‚ùå Failed to install {package}: {e}")
        return False

def check_package(package_name, import_name=None):
    """Check if a package is available and get its version."""
    if import_name is None:
        import_name = package_name
    
    try:
        module = importlib.import_module(import_name)
        version = getattr(module, '__version__', 'unknown')
        print(f"‚úÖ {package_name}: {version}")
        return True
    except ImportError:
        print(f"‚ùå {package_name}: not installed")
        return False

# Required packages
required_packages = [
    ("requests", "requests"),
    ("pandas", "pandas"), 
    ("ipython", "IPython")
]

print("üîç Checking existing packages...")
missing_packages = []

for package_name, import_name in required_packages:
    if not check_package(package_name, import_name):
        missing_packages.append(package_name)

if missing_packages:
    print(f"\nüì• Installing missing packages: {', '.join(missing_packages)}")
    for package in missing_packages:
        print(f"Installing {package}...")
        if install_package(package):
            print(f"‚úÖ {package} installed successfully")
        else:
            print(f"‚ùå Failed to install {package}")
else:
    print("\n‚úÖ All required packages are already installed!")

print("\nüéØ Dependency check complete!")
print("=" * 50)
print("Required packages for this notebook:")
print("  ‚Ä¢ requests: HTTP client for API calls")
print("  ‚Ä¢ pandas: Data analysis and manipulation")
print("  ‚Ä¢ IPython: Enhanced display utilities")
print("\nIf you encounter any import errors, please install manually:")
print("  pip install requests pandas ipython")
print("=" * 50)

In [None]:
# Import required libraries
import requests
import json
import hashlib
import base64
import secrets
import urllib.parse
from typing import Dict, Any, Optional
import pandas as pd
from IPython.display import display, HTML, JSON
import webbrowser
from datetime import datetime

# Configuration
BASE_URL = "http://localhost:8000"
MCP_ENDPOINT = f"{BASE_URL}/mcp"

print("‚úÖ Libraries imported successfully")
print(f"üåê MCP Server URL: {BASE_URL}")

## Step 1: OAuth Authentication Setup

Enter your Extend API credentials below. This will automatically complete the OAuth flow and get you a Bearer token.

In [None]:
# OAuth and PKCE utilities
def generate_pkce_pair():
    """Generate PKCE code_verifier and code_challenge pair."""
    code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
    code_challenge = base64.urlsafe_b64encode(
        hashlib.sha256(code_verifier.encode('utf-8')).digest()
    ).decode('utf-8').rstrip('=')
    return code_verifier, code_challenge

def complete_oauth_flow(user_email: str, api_key: str, api_secret: str) -> str:
    """Complete the full OAuth flow and return Bearer token."""
    
    # Generate PKCE parameters
    code_verifier, code_challenge = generate_pkce_pair()
    client_id = "notebook_test_client"
    redirect_uri = "http://localhost:3000/callback"
    state = secrets.token_urlsafe(16)
    
    print(f"üîê Generated PKCE challenge for user: {user_email}")
    
    # Step 1: Submit OAuth form (simulating the login page submission)
    callback_data = {
        'user_email': user_email,
        'extend_api_key': api_key,
        'extend_api_secret': api_secret,
        'client_id': client_id,
        'redirect_uri': redirect_uri,
        'code_challenge': code_challenge,
        'code_challenge_method': 'S256',
        'state': state
    }
    
    print("üìù Submitting OAuth authorization...")
    callback_response = requests.post(f"{BASE_URL}/callback", data=callback_data, allow_redirects=False)
    
    if callback_response.status_code != 302:
        raise Exception(f"OAuth callback failed: {callback_response.status_code} - {callback_response.text}")
    
    # Extract authorization code from redirect
    location = callback_response.headers.get('Location')
    parsed_url = urllib.parse.urlparse(location)
    query_params = urllib.parse.parse_qs(parsed_url.query)
    auth_code = query_params.get('code', [None])[0]
    
    if not auth_code:
        raise Exception(f"No authorization code in redirect: {location}")
    
    print(f"‚úÖ Got authorization code: {auth_code[:20]}...")
    
    # Step 2: Exchange authorization code for access token
    token_data = {
        'grant_type': 'authorization_code',
        'code': auth_code,
        'code_verifier': code_verifier,
        'client_id': client_id,
        'redirect_uri': redirect_uri
    }
    
    print("üîÑ Exchanging code for access token...")
    token_response = requests.post(f"{BASE_URL}/token", data=token_data)
    
    if token_response.status_code != 200:
        raise Exception(f"Token exchange failed: {token_response.status_code} - {token_response.text}")
    
    token_info = token_response.json()
    access_token = token_info.get('access_token')
    
    if not access_token:
        raise Exception(f"No access token in response: {token_info}")
    
    print(f"üéâ Successfully obtained Bearer token!")
    print(f"Token expires in: {token_info.get('expires_in', 'unknown')} seconds")
    
    return access_token

print("‚úÖ OAuth utilities ready")

In [None]:
# User credentials input - paste-friendly version
import getpass

print("Please enter your Extend API credentials:")
print("(You can paste your credentials into these prompts)")
print()

# These prompts allow pasting and work reliably in Jupyter
USER_EMAIL = input("üìß Enter your email: ").strip()
API_KEY = getpass.getpass("üîë Enter your Extend API key (paste-friendly): ").strip()
API_SECRET = getpass.getpass("üîí Enter your Extend API secret (paste-friendly): ").strip()

if not all([USER_EMAIL, API_KEY, API_SECRET]):
    raise ValueError("All credentials are required!")

print(f"\n‚úÖ Credentials captured for: {USER_EMAIL}")
print("üîÑ Ready to proceed with OAuth flow!")

In [None]:
# Complete OAuth flow and get Bearer token
try:
    BEARER_TOKEN = complete_oauth_flow(USER_EMAIL, API_KEY, API_SECRET)
    print(f"\nüîë Bearer Token obtained: {BEARER_TOKEN[:40]}...")
    print("\n‚úÖ Ready to make MCP requests!")
except Exception as e:
    print(f"‚ùå OAuth flow failed: {str(e)}")
    raise

## Step 2: MCP Client Utilities

Helper functions for making MCP requests.

In [None]:
# MCP client utilities
class MCPClient:
    def __init__(self, base_url: str, bearer_token: str):
        self.base_url = base_url
        self.bearer_token = bearer_token
        self.request_id = 1
    
    def _make_request(self, method: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Make an MCP JSON-RPC request."""
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.bearer_token}"
        }
        
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self.request_id
        }
        
        if params:
            payload["params"] = params
        
        self.request_id += 1
        
        response = requests.post(f"{self.base_url}/mcp", json=payload, headers=headers)
        
        if response.status_code != 200:
            raise Exception(f"Request failed: {response.status_code} - {response.text}")
        
        return response.json()
    
    def initialize(self) -> Dict[str, Any]:
        """Initialize MCP connection."""
        return self._make_request("initialize")
    
    def list_tools(self) -> Dict[str, Any]:
        """List available tools."""
        return self._make_request("tools/list")
    
    def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]:
        """Call a specific tool."""
        params = {"name": tool_name}
        if arguments:
            params["arguments"] = arguments
        return self._make_request("tools/call", params)
    
    def display_response(self, response: Dict[str, Any], title: str = "Response"):
        """Pretty print MCP response."""
        print(f"\nüìã {title}")
        print("=" * 50)
        
        if "error" in response:
            error = response["error"]
            print(f"‚ùå Error {error['code']}: {error['message']}")
            return
        
        if "result" in response:
            result = response["result"]
            
            # Handle different result types
            if isinstance(result, dict):
                if "content" in result:
                    # Tool execution result
                    for content in result["content"]:
                        if content["type"] == "text":
                            try:
                                # Try to parse as JSON for pretty printing
                                data = json.loads(content["text"])
                                display(JSON(data))
                            except:
                                print(content["text"])
                else:
                    # Direct result (initialize, tools/list)
                    display(JSON(result))
            else:
                print(result)

# Create MCP client
mcp = MCPClient(BASE_URL, BEARER_TOKEN)
print("‚úÖ MCP client ready")

## Step 3: Test MCP Connection

Let's verify the MCP connection is working properly.

In [None]:
# Test MCP initialize
print("üîÑ Testing MCP initialize...")
init_response = mcp.initialize()
mcp.display_response(init_response, "Initialize Response")

In [None]:
# List available tools
print("üîÑ Listing available tools...")
tools_response = mcp.list_tools()
mcp.display_response(tools_response, "Available Tools")

# Extract tool names for easy reference
if "result" in tools_response and "tools" in tools_response["result"]:
    available_tools = [tool["name"] for tool in tools_response["result"]["tools"]]
    print(f"\nüìù Tool Names: {', '.join(available_tools)}")
else:
    available_tools = []
    print("‚ö†Ô∏è No tools found")

In [None]:
# Get virtual cards
import re

print("üí≥ Testing get_virtual_cards...")
vc_response = mcp.call_tool("get_virtual_cards", {
    "per_page": 5,
    "sort_field": "createdAt",
    "sort_direction": "DESC"
})
mcp.display_response(vc_response, "Virtual Cards")

# Extract first virtual card ID using regex
virtual_card_id = None
if "result" in vc_response and "content" in vc_response["result"]:
    try:
        content_text = vc_response["result"]["content"][0]["text"]
        # Look for virtual card ID pattern: vc_followed_by_alphanumeric
        vc_matches = re.findall(r'vc_[a-zA-Z0-9]+', content_text)
        if vc_matches:
            virtual_card_id = vc_matches[0]  # Get the first one
            print(f"\nüìù Found virtual card ID for testing: {virtual_card_id}")
    except Exception as e:
        print(f"‚ö†Ô∏è Could not extract virtual card ID: {e}")

if not virtual_card_id:
    print("‚ö†Ô∏è No virtual card ID found for detail testing")

## Step 4: Virtual Cards Testing

Test virtual card management tools.

In [None]:
# Get virtual card detail (if we have an ID)
if virtual_card_id:
    print(f"üîç Testing get_virtual_card_detail for ID: {virtual_card_id}")
    detail_response = mcp.call_tool("get_virtual_card_detail", {
        "virtual_card_id": virtual_card_id
    })
    mcp.display_response(detail_response, "Virtual Card Detail")
else:
    print("‚ö†Ô∏è No virtual card ID available for detail testing")

In [None]:
# Get recent transactions
print("üí∞ Testing get_transactions...")
txn_response = mcp.call_tool("get_transactions", {
    "per_page": 5,
    "sort_field": "-date"  # Most recent first
})
mcp.display_response(txn_response, "Recent Transactions")

# Extract first transaction ID using regex
transaction_id = None
if "result" in txn_response and "content" in txn_response["result"]:
    try:
        content_text = txn_response["result"]["content"][0]["text"]
        # Look for transaction ID patterns - could be txn_, tr_, or other patterns
        # Let's try multiple common patterns
        id_patterns = [
            r'txn_[a-zA-Z0-9]+',      # txn_XXXXX
            r'tr_[a-zA-Z0-9]+',       # tr_XXXXX  
            r'ID:\s*([a-zA-Z0-9_]+)', # ID: XXXXX
            r'Transaction ID:\s*([a-zA-Z0-9_]+)' # Transaction ID: XXXXX
        ]
        
        for pattern in id_patterns:
            matches = re.findall(pattern, content_text)
            if matches:
                transaction_id = matches[0]
                print(f"\nüìù Found transaction ID for testing: {transaction_id}")
                break
                
    except Exception as e:
        print(f"‚ö†Ô∏è Could not extract transaction ID: {e}")

if not transaction_id:
    print("‚ö†Ô∏è No transaction ID found for detail testing")

## Step 5: Transactions Testing

Test transaction management tools.

In [None]:
# Get transaction detail (if we have an ID)
if transaction_id:
    print(f"üîç Testing get_transaction_detail for ID: {transaction_id}")
    detail_response = mcp.call_tool("get_transaction_detail", {
        "transaction_id": transaction_id
    })
    mcp.display_response(detail_response, "Transaction Detail")
else:
    print("‚ö†Ô∏è No transaction ID available for detail testing")

In [None]:
# Get expense categories
print("üìä Testing get_expense_categories...")
categories_response = mcp.call_tool("get_expense_categories", {
    "active": True,
    "sort_field": "name",
    "sort_direction": "ASC"
})
mcp.display_response(categories_response, "Expense Categories")

# Store first category ID for label testing
category_id = None
if "result" in categories_response and "content" in categories_response["result"]:
    try:
        content_text = categories_response["result"]["content"][0]["text"]
        cat_data = json.loads(content_text)
        if "expenseCategories" in cat_data and cat_data["expenseCategories"]:
            category_id = cat_data["expenseCategories"][0]["id"]
            print(f"\nüìù Found category ID for testing: {category_id}")
    except:
        pass

## Step 6: Expense Categories Testing

Test expense category management tools.

In [None]:
# Get expense category labels (if we have a category ID)
if category_id:
    print(f"üè∑Ô∏è Testing get_expense_category_labels for category: {category_id}")
    labels_response = mcp.call_tool("get_expense_category_labels", {
        "category_id": category_id,
        "active": True,
        "per_page": 10
    })
    mcp.display_response(labels_response, "Category Labels")
else:
    print("‚ö†Ô∏è No category ID available for labels testing")

## Step 7: Custom Tool Testing

Test any specific tool with custom parameters.

In [None]:
# Custom tool testing - modify as needed
print("Available tools for custom testing:")
for i, tool in enumerate(available_tools, 1):
    print(f"{i:2d}. {tool}")

print("\nModify the cell below to test specific tools with custom parameters.")

In [None]:
# Example: Test credit cards
print("üí≥ Testing get_credit_cards...")
cc_response = mcp.call_tool("get_credit_cards", {
    "per_page": 3
})
mcp.display_response(cc_response, "Credit Cards")

In [None]:
# Example: Search virtual cards by status
print("üîç Testing virtual cards with ACTIVE status filter...")
active_vc_response = mcp.call_tool("get_virtual_cards", {
    "per_page": 10,
    "status": "ACTIVE",
    "sort_field": "createdAt",
    "sort_direction": "DESC"
})
mcp.display_response(active_vc_response, "Active Virtual Cards")

## Step 8: Error Testing

Test error scenarios and edge cases.

In [None]:
# Test invalid tool
print("‚ùå Testing invalid tool name...")
invalid_response = mcp.call_tool("nonexistent_tool", {})
mcp.display_response(invalid_response, "Invalid Tool Test")

In [None]:
# Test invalid parameters
print("‚ùå Testing invalid parameters...")
invalid_params_response = mcp.call_tool("get_virtual_cards", {
    "per_page": "invalid_number"
})
mcp.display_response(invalid_params_response, "Invalid Parameters Test")

## Step 9: Summary and Cleanup

Testing summary and token cleanup (optional).

In [None]:
# Display testing summary
print("\nüéâ OAuth + MCP Testing Complete!")
print("=" * 50)
print(f"‚úÖ User: {USER_EMAIL}")
print(f"‚úÖ OAuth Bearer Token: {BEARER_TOKEN[:20]}...")
print(f"‚úÖ MCP Server: {BASE_URL}")
print(f"‚úÖ Available Tools: {len(available_tools)}")
print(f"‚úÖ Testing completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print("\nüìã Test Results:")
print("  - OAuth flow: ‚úÖ Working")
print("  - MCP initialize: ‚úÖ Working")
print("  - Tools list: ‚úÖ Working")
print("  - Tool execution: ‚úÖ Working")
print("  - Error handling: ‚úÖ Working")

print("\nüöÄ Ready for production deployment!")

In [None]:
# Optional: Revoke token for security
# Uncomment the lines below if you want to revoke the token after testing

# print("üîí Revoking Bearer token...")
# revoke_response = requests.post(f"{BASE_URL}/revoke", data={"token": BEARER_TOKEN})
# if revoke_response.status_code == 200:
#     print("‚úÖ Token revoked successfully")
# else:
#     print(f"‚ö†Ô∏è Token revocation failed: {revoke_response.status_code}")

print("\n‚úÖ Testing notebook complete!")