# LangChain Integration with Authority Nanos

This notebook demonstrates how to integrate Authority Nanos with LangChain, a popular framework for building LLM applications.

## Why Integrate with LangChain?

LangChain provides:
- Easy-to-use abstractions for LLM interactions
- Built-in tool/function calling support
- Agent frameworks (ReAct, Plan-and-Execute, etc.)
- Memory and context management

Authority Nanos provides:
- Policy-controlled tool execution
- Capability-based authorization
- Audit logging
- Secure state management

Together, they enable **secure, production-ready AI agents**.

## This Tutorial

We'll cover:
1. Setting up LangChain with Authority Nanos
2. Running LLM calls through the kernel
3. Creating policy-controlled tools
4. Building a secure LangChain agent

All examples work in **simulation mode** - no real kernel or LLM API keys required!

In [None]:
# Install dependencies (uncomment if needed)
# %pip install authority-nanos langchain langchain-core

import json
from typing import Any, Dict, List, Optional, Type
from authority_nanos import (
    AuthorityKernel,
    OperationDeniedError,
    AuthorityKernelError
)

print("Imports successful!")

## 1. Setting Up LangChain with Authority Nanos

First, let's create a custom LLM wrapper that routes calls through the Authority Kernel.

Note: In this tutorial, we'll create mock implementations that work in simulation mode. For production, you would use the real LangChain classes.

In [None]:
# Mock LangChain-style interfaces for simulation
# In production, you would import from langchain

class BaseLLM:
    """Mock base LLM class (simplified LangChain interface)."""
    
    def invoke(self, prompt: str) -> str:
        raise NotImplementedError
    
    def __call__(self, prompt: str) -> str:
        return self.invoke(prompt)


class BaseTool:
    """Mock base Tool class (simplified LangChain interface)."""
    name: str = "base_tool"
    description: str = "A base tool"
    
    def run(self, input_str: str) -> str:
        raise NotImplementedError
    
    def __call__(self, input_str: str) -> str:
        return self.run(input_str)


print("Mock LangChain interfaces defined!")

### Authority Nanos LLM Wrapper

This wrapper routes LLM calls through the Authority Kernel for policy enforcement:

In [None]:
class AuthorityLLM(BaseLLM):
    """LangChain-compatible LLM that routes through Authority Kernel.
    
    This wrapper ensures all LLM calls:
    - Are subject to policy enforcement
    - Are logged for auditing
    - Respect budget limits
    """
    
    def __init__(self, kernel: AuthorityKernel, model: str = "gpt-4"):
        self.ak = kernel
        self.model = model
    
    def invoke(self, prompt: str) -> str:
        """Invoke the LLM through Authority Kernel."""
        # Build request
        request = json.dumps({
            "model": self.model,
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "max_tokens": 1000
        }).encode()
        
        # Send through kernel
        response = self.ak.inference(request)
        result = json.loads(response.decode())
        
        # Extract response text
        if "choices" in result:
            return result["choices"][0]["message"]["content"]
        
        return str(result)


# Test the LLM wrapper
with AuthorityKernel(simulate=True) as ak:
    llm = AuthorityLLM(ak, model="gpt-4")
    
    response = llm.invoke("What is the capital of France?")
    print(f"LLM Response: {response}")
    print()
    print("Note: This is a simulated response. Real responses require LLM configuration.")

## 2. Creating Policy-Controlled Tools

LangChain tools can be wrapped to enforce Authority Kernel policies:

In [None]:
class AuthorityTool(BaseTool):
    """Base class for Authority Kernel-controlled tools.
    
    Subclass this to create tools that:
    - Check authorization before execution
    - Log all invocations
    - Handle denials gracefully
    """
    
    def __init__(self, kernel: AuthorityKernel):
        self.ak = kernel
    
    def _check_auth(self, operation: str, target: str) -> bool:
        """Check authorization for an operation."""
        return self.ak.authorize(operation, target)
    
    def _get_denial_reason(self) -> str:
        """Get the reason for the last denial."""
        denial = self.ak.get_last_denial()
        return denial.reason if denial else "Unknown reason"


class FileReadTool(AuthorityTool):
    """Tool for reading files with authorization."""
    name = "file_read"
    description = "Read the contents of a file. Input should be a file path."
    
    def run(self, path: str) -> str:
        path = path.strip()
        
        if not self._check_auth("read", path):
            return f"DENIED: Cannot read {path}. Reason: {self._get_denial_reason()}"
        
        try:
            data = self.ak.file_read(path)
            return data.decode()
        except Exception as e:
            return f"ERROR: {e}"


class FileWriteTool(AuthorityTool):
    """Tool for writing files with authorization."""
    name = "file_write"
    description = "Write content to a file. Input format: 'path|content'"
    
    def run(self, input_str: str) -> str:
        if "|" not in input_str:
            return "ERROR: Input must be 'path|content'"
        
        path, content = input_str.split("|", 1)
        path = path.strip()
        
        if not self._check_auth("write", path):
            return f"DENIED: Cannot write to {path}. Reason: {self._get_denial_reason()}"
        
        try:
            self.ak.file_write(path, content.encode())
            return f"SUCCESS: Wrote {len(content)} bytes to {path}"
        except Exception as e:
            return f"ERROR: {e}"


class CalculatorTool(AuthorityTool):
    """Tool for calculations (no special auth needed)."""
    name = "calculator"
    description = "Perform arithmetic calculations. Input should be a math expression."
    
    def run(self, expression: str) -> str:
        # Safe subset of eval
        allowed = set("0123456789+-*/. ()")
        expression = expression.strip()
        
        if not all(c in allowed for c in expression):
            return "ERROR: Invalid characters in expression"
        
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"ERROR: {e}"


class WebSearchTool(AuthorityTool):
    """Tool for web searches with authorization."""
    name = "web_search"
    description = "Search the web for information. Input should be a search query."
    
    def run(self, query: str) -> str:
        query = query.strip()
        url = f"https://search.example.com/q={query}"
        
        if not self._check_auth("http.get", url):
            return f"DENIED: Cannot search for '{query}'. Reason: {self._get_denial_reason()}"
        
        # Simulated search results
        return json.dumps({
            "query": query,
            "results": [
                {"title": f"Result 1 for {query}", "snippet": "..."},
                {"title": f"Result 2 for {query}", "snippet": "..."},
            ],
            "simulated": True
        })


print("Authority-controlled tools defined!")

### Testing the Tools

In [None]:
with AuthorityKernel(simulate=True) as ak:
    # Set up policy
    ak.deny_target("/etc/shadow")
    ak.deny_target("/etc/passwd")
    
    # Create tools
    file_read = FileReadTool(ak)
    file_write = FileWriteTool(ak)
    calculator = CalculatorTool(ak)
    web_search = WebSearchTool(ak)
    
    print("=== Tool Tests ===")
    print()
    
    # Test calculator
    print(f"Calculator: 15 * 7 + 3 = {calculator('15 * 7 + 3')}")
    print()
    
    # Test file read (allowed)
    print(f"File Read (allowed): {file_read('/etc/config.json')[:50]}...")
    print()
    
    # Test file read (denied)
    print(f"File Read (denied): {file_read('/etc/shadow')}")
    print()
    
    # Test web search
    print(f"Web Search: {web_search('Authority Nanos')}")

## 3. Building a LangChain-Style Agent

Now let's build a ReAct-style agent that uses Authority Nanos:

In [None]:
class AuthorityAgent:
    """A LangChain-style agent backed by Authority Nanos.
    
    Implements a ReAct-style loop:
    1. Think (LLM reasoning)
    2. Act (execute tool)
    3. Observe (process result)
    4. Repeat until done
    """
    
    def __init__(self, kernel: AuthorityKernel, model: str = "gpt-4"):
        self.ak = kernel
        self.llm = AuthorityLLM(kernel, model)
        self.tools: Dict[str, AuthorityTool] = {}
        self.max_iterations = 5
        self.verbose = True
    
    def add_tool(self, tool: AuthorityTool):
        """Register a tool with the agent."""
        self.tools[tool.name] = tool
    
    def run(self, task: str) -> str:
        """Run the agent on a task."""
        if self.verbose:
            print(f"\n{'='*60}")
            print(f"Agent Task: {task}")
            print(f"{'='*60}")
        
        # Build initial prompt
        prompt = self._build_prompt(task)
        scratchpad = ""
        
        for i in range(self.max_iterations):
            if self.verbose:
                print(f"\n--- Iteration {i+1} ---")
            
            # Think
            full_prompt = prompt + scratchpad
            response = self.llm.invoke(full_prompt)
            
            if self.verbose:
                print(f"LLM: {response[:200]}..." if len(response) > 200 else f"LLM: {response}")
            
            # Check for final answer
            if "FINAL ANSWER:" in response:
                answer = response.split("FINAL ANSWER:")[1].strip()
                if self.verbose:
                    print(f"\nFinal Answer: {answer}")
                return answer
            
            # Parse action
            action = self._parse_action(response)
            
            if action:
                tool_name, tool_input = action
                
                if tool_name in self.tools:
                    if self.verbose:
                        print(f"Action: {tool_name}({tool_input})")
                    
                    # Execute tool
                    result = self.tools[tool_name].run(tool_input)
                    
                    if self.verbose:
                        print(f"Observation: {result[:200]}..." if len(result) > 200 else f"Observation: {result}")
                    
                    # Add to scratchpad
                    scratchpad += f"\nAction: {tool_name}({tool_input})\nObservation: {result}\n"
                else:
                    scratchpad += f"\nAction: {tool_name}\nObservation: Tool not found\n"
            else:
                # No action found - agent might be stuck
                scratchpad += f"\nThought: {response}\n"
        
        return "Max iterations reached without answer"
    
    def _build_prompt(self, task: str) -> str:
        """Build the agent prompt."""
        tools_desc = "\n".join(
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        )
        
        return f"""You are a helpful AI assistant with access to tools.

Available Tools:
{tools_desc}

To use a tool, respond with:
ACTION: tool_name(input)

When you have the final answer, respond with:
FINAL ANSWER: your answer here

Task: {task}

Begin!
"""
    
    def _parse_action(self, response: str) -> Optional[tuple]:
        """Parse an action from the response."""
        if "ACTION:" not in response:
            return None
        
        try:
            action_part = response.split("ACTION:")[1].strip()
            
            if "(" in action_part and ")" in action_part:
                tool_name = action_part.split("(")[0].strip()
                tool_input = action_part.split("(")[1].split(")")[0].strip()
                return (tool_name, tool_input)
        except Exception:
            pass
        
        return None


print("AuthorityAgent class defined!")

### Running the Agent

In [None]:
with AuthorityKernel(simulate=True) as ak:
    # Create agent
    agent = AuthorityAgent(ak)
    
    # Add tools
    agent.add_tool(FileReadTool(ak))
    agent.add_tool(FileWriteTool(ak))
    agent.add_tool(CalculatorTool(ak))
    agent.add_tool(WebSearchTool(ak))
    
    print("Tools registered:")
    for name in agent.tools:
        print(f"  - {name}")
    
    # Run a task
    result = agent.run("Calculate 42 * 17 and tell me the result")
    print(f"\n\nResult: {result}")

## 4. Policy-Controlled Agent

Let's see how the agent handles policy restrictions:

In [None]:
with AuthorityKernel(simulate=True) as ak:
    print("=== Policy-Controlled Agent Demo ===")
    print()
    
    # Configure restrictive policy
    ak.deny_target("/etc/passwd")
    ak.deny_target("/etc/shadow")
    ak.deny_target("/root/*")
    ak.deny_operation("write")  # No write operations allowed
    
    print("Policy configured:")
    print("  - Denied targets: /etc/passwd, /etc/shadow, /root/*")
    print("  - Denied operations: write")
    print()
    
    # Create tools and test directly
    file_read = FileReadTool(ak)
    file_write = FileWriteTool(ak)
    
    print("Testing tools with policy:")
    print()
    
    # Allowed read
    print(f"Read /etc/config.json: ALLOWED")
    result = file_read("/etc/config.json")
    print(f"  Result: {result[:50]}...")
    print()
    
    # Denied read
    print(f"Read /etc/passwd: DENIED")
    result = file_read("/etc/passwd")
    print(f"  Result: {result}")
    print()
    
    # Denied write
    print(f"Write /tmp/test.txt: DENIED (write operation denied)")
    result = file_write("/tmp/test.txt|Hello World")
    print(f"  Result: {result}")

## 5. Agent with Persistent Memory

Use the typed heap for conversation memory:

In [None]:
class AgentWithMemory(AuthorityAgent):
    """Agent with persistent memory in typed heap."""
    
    def __init__(self, kernel: AuthorityKernel, agent_id: str, model: str = "gpt-4"):
        super().__init__(kernel, model)
        self.agent_id = agent_id
        self._init_memory()
    
    def _init_memory(self):
        """Initialize agent memory in typed heap."""
        memory_data = json.dumps({
            "agent_id": self.agent_id,
            "conversations": [],
            "facts": [],
            "task_count": 0
        }).encode()
        
        self.memory_handle = self.ak.alloc("agent_memory", memory_data)
    
    def get_memory(self) -> Dict:
        """Retrieve current memory state."""
        data = self.ak.read(self.memory_handle)
        return json.loads(data.decode())
    
    def add_fact(self, fact: str):
        """Add a fact to agent memory."""
        patch = json.dumps([
            {"op": "add", "path": "/facts/-", "value": fact}
        ]).encode()
        self.ak.write(self.memory_handle, patch)
    
    def run(self, task: str) -> str:
        """Run task and update memory."""
        # Increment task count
        memory = self.get_memory()
        patch = json.dumps([
            {"op": "replace", "path": "/task_count", "value": memory["task_count"] + 1},
            {"op": "add", "path": "/conversations/-", "value": {
                "task": task,
                "timestamp": "2024-01-01T12:00:00Z"
            }}
        ]).encode()
        self.ak.write(self.memory_handle, patch)
        
        # Run the task
        result = super().run(task)
        
        return result


# Demo agent with memory
with AuthorityKernel(simulate=True) as ak:
    print("=== Agent with Memory Demo ===")
    print()
    
    agent = AgentWithMemory(ak, "agent-mem-001")
    agent.add_tool(CalculatorTool(ak))
    agent.verbose = False  # Reduce output
    
    # Run some tasks
    agent.add_fact("User prefers metric units")
    agent.add_fact("Current location: San Francisco")
    
    agent.run("Calculate 100 + 200")
    agent.run("What is 50 * 4?")
    
    # Show memory state
    print("\nAgent Memory State:")
    print(json.dumps(agent.get_memory(), indent=2))

## 6. Audit Logging

All agent actions are logged:

In [None]:
with AuthorityKernel(simulate=True) as ak:
    print("=== Agent Audit Log ===")
    print()
    
    # Set up agent with restrictions
    ak.deny_target("/secrets/*")
    
    agent = AuthorityAgent(ak)
    agent.add_tool(FileReadTool(ak))
    agent.add_tool(CalculatorTool(ak))
    agent.verbose = False
    
    # Execute some operations
    file_read = FileReadTool(ak)
    file_read("/etc/config.json")  # Allowed
    file_read("/secrets/api_key.txt")  # Denied
    
    calc = CalculatorTool(ak)
    calc("2 + 2")
    
    # Get audit logs
    logs = ak.audit_logs()
    
    print(f"Total audit entries: {len(logs)}")
    print()
    print("Authorization events:")
    print("-" * 70)
    
    for log_bytes in logs:
        entry = json.loads(log_bytes.decode())
        event = entry.get("event", "")
        
        if "authorize" in event:
            details = entry.get("details", {})
            status = "ALLOWED" if details.get("authorized") else "DENIED"
            op = details.get("operation", "?")[:10]
            target = details.get("target", "?")[:30]
            time = entry.get("timestamp", "?")[:19]
            print(f"{time} | {op:10} | {target:30} | {status}")

## 7. Production Considerations

When moving to production:

### Replace Simulation with Real Kernel

```python
# Development (simulation)
ak = AuthorityKernel(simulate=True)

# Production (real kernel)
ak = AuthorityKernel(simulate=False)
```

### Configure Real LLM

In production, configure the kernel with your LLM provider credentials.

### Define Production Policies

```toml
# policy.toml
[agent.production_agent]
description = "Production LangChain Agent"

[[agent.production_agent.allow]]
operation = "read"
target = "/data/public/*"

[[agent.production_agent.allow]]
operation = "http.get"
target = "https://api.openai.com/*"

[[agent.production_agent.deny]]
operation = "*"
target = "*"
```

### Enable Comprehensive Logging

Configure audit log export to your logging infrastructure.

## Summary

In this notebook, you learned how to:

1. **Wrap LLM calls** through Authority Kernel for policy enforcement
2. **Create policy-controlled tools** that check authorization
3. **Build a LangChain-style agent** with Authority Nanos backend
4. **Handle policy denials** gracefully in agent workflows
5. **Persist agent memory** in the typed heap
6. **Audit agent actions** for compliance and debugging

## Key Benefits

| Feature | Benefit |
|---------|--------|
| Policy-controlled tools | Prevent unauthorized actions |
| Kernel-routed LLM | Enforce inference policies |
| Typed heap memory | Secure, versioned state |
| Audit logging | Full visibility into agent behavior |

## Next Steps

- Review the Security documentation for production deployment
- Explore the Policy documentation for complex policy configurations
- Check out the API reference for all available features