# SageMaker Coding Agent

**Single-File Version** - All code in one notebook for easy deployment.

A secure AI coding assistant powered by AWS Bedrock Claude.

## Setup
1. Run the first cell to install dependencies
2. Run all cells in order
3. Use the chat widget at the bottom

In [None]:
# Install dependencies
!pip install -q boto3 ipywidgets Pillow

In [None]:
# ============================================================
# CONFIGURATION
# ============================================================

import os
import json
from dataclasses import dataclass, asdict

@dataclass
class Config:
    region: str = "ap-southeast-2"
    model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"
    workspace: str = "."
    max_turns: int = 30
    max_tokens: int = 4096
    mock_mode: bool = False  # Set True to test without Bedrock

# Create config
CONFIG = Config()
print(f"Config: region={CONFIG.region}, model={CONFIG.model_id[:30]}..., mock={CONFIG.mock_mode}")

In [None]:
# ============================================================
# SECURITY MODULE
# ============================================================

import re
import hashlib
from typing import List, Dict, Tuple, Optional, Any, Set, Callable
from datetime import datetime

class Security:
    """Security controls for workspace boundary and command filtering."""
    
    DANGEROUS_COMMANDS = [
        r'\brm\s+-rf\s+/',
        r'\bsudo\s+',
        r'\bcurl\s+.*\|\s*bash',
        r'\bwget\s+.*\|\s*sh',
        r'\bdd\s+if=',
        r'\bmkfs',
        r'\bchmod\s+777',
    ]
    
    SECRET_PATTERNS = [
        (r'(?i)(api[_-]?key|apikey)\s*[=:]\s*["\']?[\w-]{20,}', "API Key"),
        (r'(?i)(secret|password|passwd)\s*[=:]\s*["\']?[^\s"\']{ 8,}', "Password"),
        (r'(?i)(aws[_-]?access[_-]?key[_-]?id)\s*[=:]\s*["\']?[A-Z0-9]{20}', "AWS Key"),
        (r'-----BEGIN (RSA |DSA |EC )?PRIVATE KEY-----', "Private Key"),
    ]
    
    def __init__(self, workspace: str):
        self.workspace = os.path.abspath(workspace)
    
    def validate_path(self, path: str) -> Tuple[bool, str]:
        """Check if path is within workspace."""
        abs_path = os.path.abspath(path)
        if not abs_path.startswith(self.workspace):
            return False, f"Path outside workspace: {path}"
        return True, ""
    
    def validate_command(self, cmd: str) -> Tuple[bool, str]:
        """Check if command is safe."""
        for pattern in self.DANGEROUS_COMMANDS:
            if re.search(pattern, cmd):
                return False, f"Dangerous command blocked"
        return True, ""
    
    def scan_secrets(self, content: str) -> List[str]:
        """Scan for potential secrets."""
        findings = []
        for pattern, name in self.SECRET_PATTERNS:
            if re.search(pattern, content):
                findings.append(name)
        return findings

SECURITY = Security(CONFIG.workspace)
print(f"Security initialized. Workspace: {SECURITY.workspace}")

In [None]:
# ============================================================
# BEDROCK CLIENT
# ============================================================

import boto3
from dataclasses import dataclass, field

@dataclass
class ToolCall:
    id: str
    name: str
    input: dict

@dataclass 
class Response:
    text: str
    tool_calls: List[ToolCall]
    stop_reason: str
    usage: dict

class BedrockClient:
    def __init__(self, model_id: str, region: str, mock_mode: bool = False):
        self.model_id = model_id
        self.region = region
        self.mock_mode = mock_mode
        if not mock_mode:
            self.client = boto3.client("bedrock-runtime", region_name=region)
        else:
            self.client = None
            print("[MOCK MODE] No API calls will be made")
    
    def _mock_response(self, messages, tools):
        last = messages[-1]["content"] if messages else ""
        if isinstance(last, str):
            low = last.lower()
            if "list" in low and "file" in low:
                return Response("I'll list files.", [ToolCall("m1", "list_dir", {"path": "."})], "tool_use", {})
            if "read" in low:
                return Response("I'll read that.", [ToolCall("m2", "read_file", {"file_path": "config.py"})], "tool_use", {})
        return Response(f"[MOCK] Received: {str(last)[:100]}", [], "end_turn", {})
    
    def chat(self, messages, system, tools=None, max_tokens=4096):
        if self.mock_mode:
            return self._mock_response(messages, tools)
        
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "system": system,
            "messages": messages,
            "temperature": 0.0,
        }
        if tools:
            body["tools"] = tools
        
        response = self.client.invoke_model(
            modelId=self.model_id,
            body=json.dumps(body),
            contentType="application/json"
        )
        result = json.loads(response["body"].read())
        return self._parse(result)
    
    def _parse(self, result):
        text = ""
        tool_calls = []
        for block in result.get("content", []):
            if block.get("type") == "text":
                text += block.get("text", "")
            elif block.get("type") == "tool_use":
                tool_calls.append(ToolCall(
                    id=block.get("id", ""),
                    name=block.get("name", ""),
                    input=block.get("input", {})
                ))
        return Response(text, tool_calls, result.get("stop_reason", ""), result.get("usage", {}))

CLIENT = BedrockClient(CONFIG.model_id, CONFIG.region, CONFIG.mock_mode)
print(f"Bedrock client ready. Mock mode: {CONFIG.mock_mode}")

In [None]:
# ============================================================
# TOOLS
# ============================================================

import subprocess
import glob as glob_module

# Global todo list
_TODOS = []

def tool_read_file(args):
    """Read a file."""
    path = args["file_path"]
    ok, msg = SECURITY.validate_path(path)
    if not ok:
        return f"Error: {msg}"
    try:
        with open(path, 'r', encoding='utf-8', errors='replace') as f:
            lines = f.readlines()[:500]  # Limit lines
        result = "".join(f"{i+1:4}| {line}" for i, line in enumerate(lines))
        return result[:10000]  # Truncate
    except Exception as e:
        return f"Error: {e}"

def tool_write_file(args):
    """Write a file."""
    path = args["file_path"]
    content = args["content"]
    ok, msg = SECURITY.validate_path(path)
    if not ok:
        return f"Error: {msg}"
    try:
        os.makedirs(os.path.dirname(path) or '.', exist_ok=True)
        with open(path, 'w', encoding='utf-8') as f:
            f.write(content)
        return f"Written {len(content)} chars to {path}"
    except Exception as e:
        return f"Error: {e}"

def tool_edit_file(args):
    """Edit a file by replacing text."""
    path = args["file_path"]
    old = args["old_string"]
    new = args["new_string"]
    ok, msg = SECURITY.validate_path(path)
    if not ok:
        return f"Error: {msg}"
    try:
        with open(path, 'r', encoding='utf-8') as f:
            content = f.read()
        if old not in content:
            return f"Error: old_string not found in file"
        new_content = content.replace(old, new, 1)
        with open(path, 'w', encoding='utf-8') as f:
            f.write(new_content)
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"

def tool_glob(args):
    """Find files by pattern."""
    pattern = args["pattern"]
    matches = glob_module.glob(pattern, recursive=True)[:100]
    return "\n".join(matches) if matches else "No matches found"

def tool_grep(args):
    """Search file contents."""
    pattern = args["pattern"]
    path = args.get("path", ".")
    results = []
    for filepath in glob_module.glob(f"{path}/**/*", recursive=True):
        if os.path.isfile(filepath):
            try:
                with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                    for i, line in enumerate(f, 1):
                        if re.search(pattern, line):
                            results.append(f"{filepath}:{i}: {line.strip()[:100]}")
                            if len(results) >= 50:
                                return "\n".join(results) + "\n[truncated]"
            except:
                pass
    return "\n".join(results) if results else "No matches"

def tool_list_dir(args):
    """List directory contents."""
    path = args.get("path", ".")
    try:
        entries = os.listdir(path)[:100]
        return "\n".join(entries)
    except Exception as e:
        return f"Error: {e}"

def tool_bash(args):
    """Run bash command."""
    cmd = args["command"]
    ok, msg = SECURITY.validate_command(cmd)
    if not ok:
        return f"Blocked: {msg}"
    try:
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60)
        output = result.stdout + result.stderr
        return output[:5000] if output else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Command timed out"
    except Exception as e:
        return f"Error: {e}"

def tool_todo_write(args):
    """Update todo list."""
    global _TODOS
    _TODOS = args["todos"]
    lines = ["Todo List:"]
    for t in _TODOS:
        icon = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
        lines.append(f"  {icon} {t['content']}")
    return "\n".join(lines)

def tool_todo_read(args):
    """Read todo list."""
    if not _TODOS:
        return "No todos"
    lines = ["Todos:"]
    for t in _TODOS:
        icon = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
        lines.append(f"  {icon} {t['content']}")
    return "\n".join(lines)

# Tool definitions for Bedrock
TOOLS = {
    "read_file": (tool_read_file, False, "Read a file", {"type": "object", "properties": {"file_path": {"type": "string"}}, "required": ["file_path"]}),
    "write_file": (tool_write_file, True, "Write a file", {"type": "object", "properties": {"file_path": {"type": "string"}, "content": {"type": "string"}}, "required": ["file_path", "content"]}),
    "edit_file": (tool_edit_file, True, "Edit file by replacing text", {"type": "object", "properties": {"file_path": {"type": "string"}, "old_string": {"type": "string"}, "new_string": {"type": "string"}}, "required": ["file_path", "old_string", "new_string"]}),
    "glob": (tool_glob, False, "Find files by pattern", {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}),
    "grep": (tool_grep, False, "Search file contents", {"type": "object", "properties": {"pattern": {"type": "string"}, "path": {"type": "string"}}, "required": ["pattern"]}),
    "list_dir": (tool_list_dir, False, "List directory", {"type": "object", "properties": {"path": {"type": "string"}}, "required": []}),
    "bash": (tool_bash, True, "Run shell command", {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}),
    "todo_write": (tool_todo_write, False, "Update todo list", {"type": "object", "properties": {"todos": {"type": "array", "items": {"type": "object"}}}, "required": ["todos"]}),
    "todo_read": (tool_todo_read, False, "Read todo list", {"type": "object", "properties": {}, "required": []}),
}

def get_tool_definitions():
    return [{"name": k, "description": v[2], "input_schema": v[3]} for k, v in TOOLS.items()]

print(f"Loaded {len(TOOLS)} tools: {', '.join(TOOLS.keys())}")

In [None]:
# ============================================================
# SYSTEM PROMPT
# ============================================================

SYSTEM_PROMPT = """You are SageMaker Coding Agent, an AI assistant for software engineering.

# Style
- Be concise. Use markdown. No emojis unless asked.
- Prefer editing existing files over creating new ones.

# Rules
- ALWAYS read a file before editing (edit_file fails otherwise)
- old_string in edit_file must be EXACT match
- Use specialized tools over bash: read_file (not cat), glob (not find)
- Use todo_write to track multi-step tasks

# Security
- Workspace boundary enforced
- Dangerous commands blocked
- Write operations need approval
"""

print("System prompt ready")

In [None]:
# ============================================================
# AGENT LOOP
# ============================================================

from collections import deque

class Agent:
    def __init__(self, client, on_approval=None):
        self.client = client
        self.messages = []
        self.on_approval = on_approval
        self.tool_history = deque(maxlen=10)
    
    def run(self, user_message, output_fn=print):
        self.messages.append({"role": "user", "content": user_message})
        
        for turn in range(CONFIG.max_turns):
            # Trim history if too long
            if len(self.messages) > 20:
                self.messages = [{"role": "user", "content": "[Earlier messages trimmed]"}] + self.messages[-20:]
            
            try:
                response = self.client.chat(self.messages, SYSTEM_PROMPT, get_tool_definitions(), CONFIG.max_tokens)
            except Exception as e:
                output_fn(f"Error: {e}")
                return str(e)
            
            if response.text:
                output_fn(response.text)
            
            if not response.tool_calls:
                return response.text
            
            # Check doom loop
            for tc in response.tool_calls:
                key = (tc.name, str(tc.input))
                if sum(1 for h in self.tool_history if h == key) >= 3:
                    output_fn("[Warning: Repetitive calls detected, stopping]")
                    return response.text
                self.tool_history.append(key)
            
            # Build assistant message
            assistant_content = []
            if response.text:
                assistant_content.append({"type": "text", "text": response.text})
            for tc in response.tool_calls:
                assistant_content.append({"type": "tool_use", "id": tc.id, "name": tc.name, "input": tc.input})
            self.messages.append({"role": "assistant", "content": assistant_content})
            
            # Execute tools
            tool_results = []
            for tc in response.tool_calls:
                tool_info = TOOLS.get(tc.name)
                if not tool_info:
                    tool_results.append({"type": "tool_result", "tool_use_id": tc.id, "content": f"Unknown tool: {tc.name}"})
                    continue
                
                func, needs_approval, _, _ = tool_info
                
                # Check approval
                if needs_approval and self.on_approval:
                    if not self.on_approval(tc.name, tc.input):
                        tool_results.append({"type": "tool_result", "tool_use_id": tc.id, "content": "User denied permission"})
                        continue
                
                output_fn(f"[Calling {tc.name}...]")
                result = func(tc.input)
                # Truncate result
                if len(result) > 5000:
                    result = result[:5000] + f"\n[Truncated - {len(result)} chars total]"
                tool_results.append({"type": "tool_result", "tool_use_id": tc.id, "content": result})
            
            self.messages.append({"role": "user", "content": tool_results})
        
        output_fn(f"[Reached max turns ({CONFIG.max_turns})]")
        return response.text if response else ""
    
    def reset(self):
        self.messages = []
        self.tool_history.clear()

print("Agent class ready")

In [None]:
# ============================================================
# CHAT INTERFACE
# ============================================================

import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# UI Components
chat_output = widgets.Output(layout=widgets.Layout(height='400px', overflow_y='auto'))
input_box = widgets.Textarea(placeholder='Type your message...', layout=widgets.Layout(width='100%', height='80px'))
send_btn = widgets.Button(description='Send', button_style='primary')
clear_btn = widgets.Button(description='Clear')
status = widgets.HTML(value='<b>Ready</b>')

# Approval dialog
approval_output = widgets.Output()
approve_btn = widgets.Button(description='Approve', button_style='success')
deny_btn = widgets.Button(description='Deny', button_style='danger')
approval_box = widgets.VBox([approval_output, widgets.HBox([approve_btn, deny_btn])])
approval_box.layout.display = 'none'

pending_approval = {"result": None}

def add_message(role, content):
    with chat_output:
        ts = datetime.now().strftime('%H:%M:%S')
        if role == 'user':
            display(HTML(f'<div style="background:#e3f2fd;padding:10px;margin:5px;border-radius:5px;"><b>[{ts}] You:</b><br>{content}</div>'))
        elif role == 'assistant':
            display(HTML(f'<div style="background:#f5f5f5;padding:10px;margin:5px;border-radius:5px;"><b>[{ts}] Agent:</b><br><pre style="white-space:pre-wrap;">{content}</pre></div>'))
        elif role == 'tool':
            display(HTML(f'<div style="background:#fff3e0;padding:8px;margin:3px;border-radius:5px;font-size:12px;"><b>{content}</b></div>'))

def request_approval(tool_name, tool_input):
    pending_approval["result"] = None
    with approval_output:
        clear_output()
        display(HTML(f'<h4>Approval Required</h4><p><b>Tool:</b> {tool_name}</p><pre>{str(tool_input)[:300]}</pre>'))
    approval_box.layout.display = 'block'
    send_btn.disabled = True
    
    import time
    while pending_approval["result"] is None:
        time.sleep(0.1)
    
    approval_box.layout.display = 'none'
    send_btn.disabled = False
    return pending_approval["result"]

def on_approve(b):
    pending_approval["result"] = True
    add_message('tool', 'Approved')

def on_deny(b):
    pending_approval["result"] = False
    add_message('tool', 'Denied')

approve_btn.on_click(on_approve)
deny_btn.on_click(on_deny)

# Create agent
agent = Agent(CLIENT, on_approval=request_approval)

def on_send(b):
    msg = input_box.value.strip()
    if not msg:
        return
    input_box.value = ''
    add_message('user', msg)
    status.value = '<b>Processing...</b>'
    
    def output_fn(text):
        if text.startswith('[Calling'):
            add_message('tool', text)
        else:
            add_message('assistant', text)
    
    agent.run(msg, output_fn)
    status.value = '<b>Ready</b>'

def on_clear(b):
    agent.reset()
    chat_output.clear_output()
    with chat_output:
        display(HTML('<p><i>Chat cleared</i></p>'))

send_btn.on_click(on_send)
clear_btn.on_click(on_clear)

# Layout
header = widgets.HTML('<h2>SageMaker Coding Agent</h2>')
buttons = widgets.HBox([send_btn, clear_btn, status])
ui = widgets.VBox([header, chat_output, approval_box, input_box, buttons])

display(ui)

---

## Usage

Type in the input box and click **Send**:

```
"List all Python files"
"Read config.py"
"Create a hello.py file"
"Run git status"
```

Click **Approve** or **Deny** for write operations.

---

## Configuration

Edit the `Config` class in cell 2:

- `region`: AWS region (default: ap-southeast-2)
- `model_id`: Bedrock model ID
- `mock_mode`: Set `True` to test without Bedrock

---

## Troubleshooting

**"Access denied"**: Enable Claude models in Bedrock console

**"Throttling"**: Hit token quota, wait or request increase

**Test without Bedrock**: Set `mock_mode = True` in Config