# Calling Aura Agents via REST API

In Lab 2, you built a powerful GraphRAG agent using Neo4j's visual agent builder. That agent can answer questions about manufacturing product development data, use custom Cypher queries, and perform semantic searches.

In this notebook, you'll learn to call that same agent programmatically via its REST API. This enables:

- **Application Integration**: Embed the agent in web apps or microservices
- **Automation**: Include agent calls in data pipelines
- **Batch Processing**: Ask multiple questions programmatically
- **Custom UIs**: Build your own chat interfaces

---

## Prerequisites

Before running this notebook, you need:

1. **An Aura Agent** with external endpoint enabled (from Lab 2)
2. **API Credentials** (Client ID and Secret) from your Neo4j profile
3. **The Agent Endpoint URL** (copied from the Aura console)

See the README for instructions on getting these credentials.

## 1. Configuration

Paste your credentials below. You can find these in the Neo4j Aura console:

- **Client ID/Secret**: Profile icon → Settings → API keys
- **Agent Endpoint**: Click on your agent → Copy endpoint

In [None]:
# Load configuration from CONFIG.txt
from dotenv import load_dotenv
import os

load_dotenv("../CONFIG.txt")

NEO4J_CLIENT_ID = os.getenv("NEO4J_CLIENT_ID")
NEO4J_CLIENT_SECRET = os.getenv("NEO4J_CLIENT_SECRET")
NEO4J_AGENT_ENDPOINT = os.getenv("NEO4J_AGENT_ENDPOINT")

# Validate configuration
errors = []
if not NEO4J_CLIENT_ID or "your-client-id" in NEO4J_CLIENT_ID:
    errors.append("Set NEO4J_CLIENT_ID in CONFIG.txt")
if not NEO4J_CLIENT_SECRET or "your-client-secret" in NEO4J_CLIENT_SECRET:
    errors.append("Set NEO4J_CLIENT_SECRET in CONFIG.txt")
if not NEO4J_AGENT_ENDPOINT or "your-agent-endpoint" in NEO4J_AGENT_ENDPOINT:
    errors.append("Set NEO4J_AGENT_ENDPOINT in CONFIG.txt")

if errors:
    print("ERROR: Configuration incomplete!")
    for e in errors:
        print(f"  - {e}")
else:
    print("Configuration OK!")
    print(f"  Endpoint: {NEO4J_AGENT_ENDPOINT[:60]}...")

## 2. Install Dependencies

We need two packages:
- `httpx`: Modern async HTTP client
- `pydantic`: Data validation and parsing

In [None]:
%pip install httpx pydantic -q

In [None]:
import importlib.metadata

packages = ["httpx", "pydantic"]

print("Required packages:")
print("-" * 40)
for pkg in packages:
    try:
        version = importlib.metadata.version(pkg)
        print(f"{pkg:20} {version}")
    except importlib.metadata.PackageNotFoundError:
        print(f"{pkg:20} NOT INSTALLED")

## 3. Define Response Models

We use Pydantic models to parse the API responses into type-safe Python objects. This makes it easy to access the agent's text response, thinking, tool usage, and token metrics.

In [None]:
from datetime import datetime, timedelta, timezone
from typing import Any
from pydantic import BaseModel, Field


class TokenResponse(BaseModel):
    """OAuth2 token response from Neo4j Aura API."""
    access_token: str
    token_type: str = "bearer"
    expires_in: int = 3600


class CachedToken(BaseModel):
    """Cached OAuth2 token with expiration tracking."""
    access_token: str
    expires_at: datetime

    def is_expired(self, buffer_seconds: int = 60) -> bool:
        """Check if token is expired or about to expire."""
        return datetime.now(timezone.utc) >= self.expires_at.replace(
            tzinfo=timezone.utc
        ) - timedelta(seconds=buffer_seconds)


class AgentUsage(BaseModel):
    """Token usage metrics from agent invocation."""
    request_tokens: int | None = None
    response_tokens: int | None = None
    total_tokens: int | None = None


class ToolUse(BaseModel):
    """Details about a tool used by the agent."""
    tool_use_id: str | None = None
    type: str | None = None
    output: Any | None = None


class AgentResponse(BaseModel):
    """Response from invoking an Aura Agent."""
    text: str | None = Field(default=None, description="Formatted response text")
    thinking: str | list[str] | None = Field(default=None, description="Agent reasoning steps")
    tool_uses: list[ToolUse] | None = Field(default=None, description="Tools used during invocation")
    status: str | None = Field(default=None, description="Request status")
    usage: AgentUsage | None = Field(default=None, description="Token usage metrics")
    raw_response: dict[str, Any] | None = Field(default=None, description="Raw JSON response")

    @classmethod
    def from_api_response(cls, data: dict[str, Any]) -> "AgentResponse":
        """Parse API response into AgentResponse model."""
        text = None
        thinking = None
        tool_uses = []

        # Parse the content array
        content = data.get("content", [])
        for item in content:
            item_type = item.get("type")
            if item_type == "text":
                text = item.get("text")
            elif item_type == "thinking":
                thinking = item.get("thinking")
            elif item_type == "tool_use":
                tool_uses.append(
                    ToolUse(
                        tool_use_id=item.get("id"),
                        type=item.get("name"),
                        output=item.get("input"),
                    )
                )
            elif item_type == "tool_result":
                tool_uses.append(
                    ToolUse(
                        tool_use_id=item.get("tool_use_id"),
                        type="tool_result",
                        output=item.get("content"),
                    )
                )

        usage = None
        if "usage" in data:
            usage = AgentUsage(**data["usage"])

        return cls(
            text=text,
            thinking=thinking,
            tool_uses=tool_uses if tool_uses else None,
            status=data.get("status"),
            usage=usage,
            raw_response=data,
        )


print("Models defined successfully!")

## 4. Build the Aura Agent Client

The client handles:
- **OAuth2 Authentication**: Gets access tokens using client credentials
- **Token Caching**: Reuses tokens until they expire (1 hour)
- **Token Refresh**: Automatically refreshes on 401 errors
- **Response Parsing**: Converts JSON to typed Python objects

### Authentication Flow

```
1. POST /oauth/token with Basic Auth (client_id:client_secret)
2. Receive access_token (valid for 1 hour)
3. Include Bearer token in subsequent requests
4. Refresh automatically when token expires
```

In [None]:
import logging
from urllib.parse import urlparse
import httpx

logger = logging.getLogger(__name__)


class AuraAgentError(Exception):
    """Base exception for Aura Agent errors."""
    pass


class AuthenticationError(AuraAgentError):
    """Raised when authentication fails."""
    pass


class InvocationError(AuraAgentError):
    """Raised when agent invocation fails."""
    pass


class AuraAgentClient:
    """Client for invoking Neo4j Aura Agents via REST API.
    
    Handles OAuth2 authentication, token caching, and response parsing.
    Supports both synchronous and asynchronous invocation.
    """
    
    DEFAULT_TOKEN_URL = "https://api.neo4j.io/oauth/token"
    DEFAULT_TIMEOUT = 60

    def __init__(
        self,
        client_id: str,
        client_secret: str,
        endpoint_url: str,
        token_url: str | None = None,
        timeout: int | None = None,
    ):
        """Initialize the Aura Agent client.
        
        Args:
            client_id: Neo4j Aura API client ID
            client_secret: Neo4j Aura API client secret  
            endpoint_url: Full URL to the agent invoke endpoint
            token_url: Optional custom OAuth2 token URL
            timeout: Optional request timeout in seconds
        """
        self.client_id = client_id
        self.client_secret = client_secret
        self.endpoint_url = endpoint_url
        self.token_url = token_url or self.DEFAULT_TOKEN_URL
        self.timeout = timeout or self.DEFAULT_TIMEOUT
        self._cached_token: CachedToken | None = None
        self._validate_endpoint_url()

    def _validate_endpoint_url(self) -> None:
        """Validate the endpoint URL format."""
        parsed = urlparse(self.endpoint_url)
        if not parsed.scheme or not parsed.netloc:
            raise ValueError(f"Invalid endpoint URL: {self.endpoint_url}")
        if not self.endpoint_url.endswith("/invoke"):
            logger.warning(f"Endpoint URL should end with '/invoke'. Got: {self.endpoint_url}")

    def _get_auth_header(self) -> tuple[str, str]:
        """Get HTTP Basic auth credentials for token request."""
        return (self.client_id, self.client_secret)

    def _parse_token_response(self, data: dict[str, Any]) -> CachedToken:
        """Parse OAuth2 token response and create cached token."""
        token = TokenResponse(**data)
        expires_at = datetime.now(timezone.utc) + timedelta(seconds=token.expires_in)
        return CachedToken(access_token=token.access_token, expires_at=expires_at)

    def _get_token_sync(self, client: httpx.Client) -> str:
        """Get a valid access token, refreshing if necessary."""
        if self._cached_token and not self._cached_token.is_expired():
            return self._cached_token.access_token

        logger.debug("Requesting new OAuth2 token")
        response = client.post(
            self.token_url,
            auth=self._get_auth_header(),
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={"grant_type": "client_credentials"},
        )

        if response.status_code != 200:
            raise AuthenticationError(
                f"Failed to obtain access token: {response.status_code} - {response.text}"
            )

        self._cached_token = self._parse_token_response(response.json())
        logger.debug("Successfully obtained new token")
        return self._cached_token.access_token

    async def _get_token_async(self, client: httpx.AsyncClient) -> str:
        """Get a valid access token, refreshing if necessary (async)."""
        if self._cached_token and not self._cached_token.is_expired():
            return self._cached_token.access_token

        logger.debug("Requesting new OAuth2 token (async)")
        response = await client.post(
            self.token_url,
            auth=self._get_auth_header(),
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={"grant_type": "client_credentials"},
        )

        if response.status_code != 200:
            raise AuthenticationError(
                f"Failed to obtain access token: {response.status_code} - {response.text}"
            )

        self._cached_token = self._parse_token_response(response.json())
        return self._cached_token.access_token

    def invoke(self, question: str) -> AgentResponse:
        """Invoke the Aura Agent with a question (synchronous).
        
        Args:
            question: Natural language question to ask the agent
            
        Returns:
            AgentResponse containing the agent's answer and metadata
        """
        with httpx.Client(timeout=self.timeout) as client:
            token = self._get_token_sync(client)

            response = client.post(
                self.endpoint_url,
                headers={
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                    "Authorization": f"Bearer {token}",
                },
                json={"input": question},
            )

            # Retry with fresh token on 401
            if response.status_code == 401:
                logger.debug("Token expired, refreshing...")
                self._cached_token = None
                token = self._get_token_sync(client)
                response = client.post(
                    self.endpoint_url,
                    headers={
                        "Content-Type": "application/json",
                        "Accept": "application/json",
                        "Authorization": f"Bearer {token}",
                    },
                    json={"input": question},
                )

            if response.status_code != 200:
                raise InvocationError(
                    f"Agent invocation failed: {response.status_code} - {response.text}"
                )

            return AgentResponse.from_api_response(response.json())

    async def invoke_async(self, question: str) -> AgentResponse:
        """Invoke the Aura Agent with a question (asynchronous).
        
        Args:
            question: Natural language question to ask the agent
            
        Returns:
            AgentResponse containing the agent's answer and metadata
        """
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            token = await self._get_token_async(client)

            response = await client.post(
                self.endpoint_url,
                headers={
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                    "Authorization": f"Bearer {token}",
                },
                json={"input": question},
            )

            # Retry with fresh token on 401
            if response.status_code == 401:
                self._cached_token = None
                token = await self._get_token_async(client)
                response = await client.post(
                    self.endpoint_url,
                    headers={
                        "Content-Type": "application/json",
                        "Accept": "application/json",
                        "Authorization": f"Bearer {token}",
                    },
                    json={"input": question},
                )

            if response.status_code != 200:
                raise InvocationError(
                    f"Agent invocation failed: {response.status_code} - {response.text}"
                )

            return AgentResponse.from_api_response(response.json())

    def clear_token_cache(self) -> None:
        """Clear the cached OAuth2 token, forcing a refresh on next request."""
        self._cached_token = None

    def __repr__(self) -> str:
        return f"AuraAgentClient(endpoint_url='{self.endpoint_url[:50]}...')"


print("AuraAgentClient class defined successfully!")

## 5. Create the Client

Now we create an instance of the client using your credentials.

In [None]:
# Create the client
client = AuraAgentClient(
    client_id=NEO4J_CLIENT_ID,
    client_secret=NEO4J_CLIENT_SECRET,
    endpoint_url=NEO4J_AGENT_ENDPOINT,
)

print(f"Client created: {client}")

## 6. Test the Connection

Let's test the connection by asking the agent what tools it has available. This is the same agent you built in Lab 2!

In [None]:
# Ask the agent what tools it has
response = client.invoke("What tools do you have available? List each tool and what it does.")

print("Agent Tools:")
print("=" * 50)
print(response.text)

## 7. Query the Agent

Now let's ask some questions about the manufacturing data. These are the same types of questions you tested in Lab 2.

### Helper Function

In [None]:
def ask_agent(question: str, show_thinking: bool = False, show_tools: bool = False):
    """Ask the agent a question and display the response."""
    print(f"Question: {question}")
    print("-" * 60)
    
    response = client.invoke(question)
    
    print(f"\nAnswer:\n{response.text}")
    
    if show_thinking and response.thinking:
        print(f"\n--- Agent Thinking ---")
        print(response.thinking)
    
    if show_tools and response.tool_uses:
        print(f"\n--- Tools Used ---")
        for tool in response.tool_uses:
            print(f"  - {tool.type}")
    
    if response.usage:
        print(f"\n[Tokens: {response.usage.total_tokens}]")
    
    return response

### Product Overview

Ask about a specific product or component using the Cypher template tool:

In [None]:
response = ask_agent(
    "Tell me about the R2D2 product and its technology domains",
    show_tools=True
)

### Component Analysis

Explore requirements across components:

In [None]:
response = ask_agent(
    "What requirements do the HVB_3900 and PDU_1500 components have in common?",
    show_tools=True
)

### Semantic Search

Use the similarity search tool to find relevant content:

In [None]:
response = ask_agent(
    "What do the requirements say about thermal management and cooling?",
    show_tools=True
)

### Structured Query

Ask a fact-based question that requires a database query:

In [None]:
response = ask_agent(
    "Which component has the most requirements?",
    show_tools=True
)

## 8. Explore Agent Thinking

The agent exposes its reasoning process. Let's see how it decides which tools to use:

In [None]:
response = ask_agent(
    "What defects have been found in the Electric Powertrain domain and what are their severities?",
    show_thinking=True,
    show_tools=True
)

## 9. View Raw Response

For debugging, you can access the complete raw API response:

In [None]:
import json

if response.raw_response:
    print("Raw API Response:")
    print(json.dumps(response.raw_response, indent=2))

## 10. Async and Concurrent Requests

For better performance, you can make multiple requests concurrently using async:

In [None]:
import asyncio

async def ask_multiple_questions():
    """Ask multiple questions concurrently."""
    questions = [
        "What components are in the Electric Powertrain domain?",
        "What components are in the Chassis domain?",
        "What components are in the Body domain?",
    ]
    
    print(f"Asking {len(questions)} questions concurrently...")
    print("-" * 50)
    
    # Create async tasks for all questions
    tasks = [client.invoke_async(q) for q in questions]
    
    # Wait for all to complete
    responses = await asyncio.gather(*tasks)
    
    # Display results
    for question, response in zip(questions, responses):
        print(f"\nQ: {question}")
        # Show first 200 chars of response
        text = response.text or "No response"
        print(f"A: {text[:200]}..." if len(text) > 200 else f"A: {text}")
    
    return responses

# Run async code in Jupyter
responses = await ask_multiple_questions()

## 11. Try Your Own Questions

Enter your own questions to explore the manufacturing data:

In [None]:
# Enter your question here
my_question = "What changes have been proposed that affect battery requirements?"

response = ask_agent(my_question, show_thinking=True, show_tools=True)

In [None]:
# Try another question
my_question = "What test cases exist for requirements in the high-voltage battery component?"

response = ask_agent(my_question, show_tools=True)

## Summary

In this lab, you learned how to:

1. **Authenticate** with the Neo4j Aura API using OAuth2 client credentials
2. **Build a reusable client** that handles token caching and refresh
3. **Invoke your Aura Agent** programmatically via REST API
4. **Parse responses** into type-safe Python objects
5. **Access agent thinking** to understand how it reasons about tool selection
6. **Make concurrent requests** using async for better performance

### What's Next?

You can now:
- Integrate this client into your own applications
- Build custom chat interfaces
- Create automated workflows that query your manufacturing knowledge graph
- Combine with other APIs and data sources

**Congratulations!** You have completed all labs in the workshop.

---

## Resources

- [Neo4j Aura Agents Documentation](https://neo4j.com/developer/genai-ecosystem/aura-agent/)
- [Neo4j API Authentication](https://neo4j.com/docs/aura/platform/api/authentication/)
- [Build a GraphRAG Agent in Minutes](https://neo4j.com/blog/genai/build-context-aware-graphrag-agent/)