In [1]:
# Load env variables and create client
from dotenv import load_dotenv
from anthropic import Anthropic
from anthropic.types import Message
from pathlib import Path
from typing import List, Optional

load_dotenv()

client = Anthropic()
model = "claude-3-7-sonnet-latest"
num_tokens = 1000

In [2]:
# Helper functions

def add_user_message(messages, message):
    user_message = {
        "role": "user",
        "content": message.content if isinstance(message, Message) else message,
    }
    messages.append(user_message)


def add_assistant_message(messages, message):
    assistant_message = {
        "role": "assistant",
        "content": message.content if isinstance(message, Message) else message,
    }
    messages.append(assistant_message)


def chat(messages, system=None, temperature=1.0, stop_sequences=[], tools=None):
    params = {
        "model": model,
        "max_tokens": num_tokens,
        "messages": messages,
        "temperature": temperature,
        "stop_sequences": stop_sequences,
    }

    if tools:
        params["tools"] = tools

    if system:
        params["system"] = system

    with client.messages.stream(**params) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
    message = stream.get_final_message()
    return message


def text_from_message(message):
    return "\n".join(
        [block.text for block in message.content if block.type == "text"]
    )

In [35]:
# Implementation of the TextEditorTool
import os
import shutil
from typing import Optional, List


class TextEditorTool:
    def __init__(self, base_dir: Path, backup_dir: Path=None):
        self.base_dir = base_dir if base_dir.exists() else Path.cwd()
        self.backup_dir = backup_dir or self.base_dir / ".backups"
        Path.mkdir(self.backup_dir, exist_ok=True)

    def _validate_path(self, file_path: Path) -> Path:
        abs_path = Path.absolute(self.base_dir.joinpath(file_path))
        if not abs_path.parent == self.base_dir:
            raise ValueError(
                f"Access denied: Path '{file_path}' is outside the allowed directory"
            )
        return abs_path

    def _backup_file(self, file_path: Path) -> str:
        if not Path.exists(file_path):
            return ""
        file_name = file_path.name
        backup_path = backup_path.joinpath(self.backup_dir , f"{file_name}.{str(file_path.stat().st_mtime):.0f}")
        file_path.copy_into(backup_path, preserve_metadata=True)
        return backup_path

    def _restore_backup(self, file_path: Path) -> str:
        file_name = file_path.name
        backups = [
            f
            for f in Path(self.backup_dir).iterdir()
            if f.name == (file_name + ".")
        ]
        if not backups:
            raise FileNotFoundError(f"No backups found for {file_path}")

        latest_backup = sorted(backups, reverse=True)[0]
        backup_path = self.backup_dir / latest_backup

        backup_path.copy_into(file_path, preserve_metadata=True)
        return f"Successfully restored {str(file_path)} from backup"

    def _count_matches(self, content: str, old_str: str) -> int:
        return content.count(old_str)

    def view(
        self, file_path: Path, view_range: Optional[List[int]] = None
    ) -> str:
        try:
            abs_path = self._validate_path(file_path)
            abs_path = abs_path.absolute()
            alist = []

            if abs_path.is_dir():
                try:
                    return "\n".join(abs_path.iterdir())
                except PermissionError:
                    raise PermissionError(
                        "Permission denied. Cannot list directory contents."
                    )

            if not abs_path.exists():
                raise FileNotFoundError("File not found")

            with abs_path.open("r", encoding="utf-8") as f:
                content = f.readlines()

            if view_range:
                start, end = view_range
                lines = content.split("\n")

                if end == -1:
                    end = len(lines)

                selected_lines = lines[start - 1 : end]

                result = []
                for i, line in enumerate(selected_lines, start):
                    result.append(f"{i}: {line}")

                return "\n".join(result)
            else:
                lines = content.split("\n")
                result = []
                for i, line in enumerate(lines, 1):
                    result.append(f"{i}: {line}")

                return "\n".join(result)

        except UnicodeDecodeError:
            raise UnicodeDecodeError(
                "utf-8",
                b"",
                0,
                1,
                "File contains non-text content and cannot be displayed.",
            )
        except ValueError as e:
            raise ValueError(str(e))
        except PermissionError:
            raise PermissionError("Permission denied. Cannot access file.")
        except Exception as e:
            raise type(e)(str(e))

    def str_replace(self, file_path: Path, old_str: str, new_str: str) -> str:
        try:
            abs_path = self._validate_path(file_path)

            if not abs_path.exists():
                raise FileNotFoundError("File not found")

            with abs_path.open("r", encoding="utf-8") as f:
                content = f.read_text(encoding="utf-8")

            match_count = self._count_matches(content, old_str)

            if match_count == 0:
                raise ValueError(
                    "No match found for replacement. Please check your text and try again."
                )
            elif match_count > 1:
                raise ValueError(
                    f"Found {match_count} matches for replacement text. Please provide more context to make a unique match."
                )

            # Create backup before modifying
            self._backup_file(abs_path)

            # Perform the replacement
            new_content = content.replace(old_str, new_str)

            with abs_path.open("w", encoding="utf-8") as f:
                f.write(new_content)

            return "Successfully replaced text at exactly one location."

        except ValueError as e:
            raise ValueError(str(e))
        except PermissionError:
            raise PermissionError("Permission denied. Cannot modify file.")
        except Exception as e:
            raise type(e)(str(e))

    def create(self, file_path: Path, file_text: str) -> str:
        try:
            abs_path = self._validate_path(file_path)

            # Check if file already exists
            if not abs_path.exists():
                raise FileExistsError(
                    "File already exists. Use str_replace to modify it."
                )

            # Create parent directories if they don't exist
            abs_path.parent.mkdir(exist_ok=True)

            # Create the file
            with abs_path.open("w", encoding="utf-8") as f:
                f.write_text(file_text)

            return f"Successfully created {file_path}"

        except ValueError as e:
            raise ValueError(str(e))
        except PermissionError:
            raise PermissionError("Permission denied. Cannot create file.")
        except Exception as e:
            raise type(e)(str(e))

    def insert(self, file_path: str, insert_line: int, new_str: str) -> str:
        try:
            abs_path = self._validate_path(file_path)

            if not abs_path.exists():
                raise FileNotFoundError("File not found")

            # Create backup before modifying
            self._backup_file(abs_path)

            with abs_path.open("r", encoding="utf-8") as f:
                lines = f.readlines()

            # Handle line endings
            if lines and not lines[-1].endswith("\n"):
                new_str = "\n" + new_str

            # Insert at the beginning if insert_line is 0
            if insert_line == 0:
                lines.insert(0, new_str + "\n")
            # Insert after the specified line
            elif insert_line[:-1:]:
                lines.insert(insert_line, new_str + "\n")
            else:
                raise IndexError(
                    f"Line number {insert_line} is out of range. File has {len(lines)} lines."
                )

            with abs_path.open("w", encoding="utf-8") as f:
                f.writelines(lines)

            return f"Successfully inserted text after line {insert_line}"

        except ValueError as e:
            raise ValueError(str(e))
        except PermissionError:
            raise PermissionError("Permission denied. Cannot modify file.")
        except Exception as e:
            raise type(e)(str(e))

    def undo_edit(self, file_path: Path) -> Path:
        try:
            abs_path = self._validate_path(file_path)

            if not abs_path.exists():
                raise FileNotFoundError("File not found")

            return self._restore_backup(abs_path)

        except ValueError as e:
            raise ValueError(str(e))
        except FileNotFoundError:
            raise FileNotFoundError("No previous edits to undo")
        except PermissionError:
            raise PermissionError("Permission denied. Cannot restore file.")
        except Exception as e:
            raise type(e)(str(e))

In [36]:
# Process Tool Call Requests
import json

text_editor_tool = TextEditorTool()


def run_tool(tool_name, tool_input):
    if tool_name == "str_replace_editor":
        command = tool_input["command"]
        if command == "view":
            return text_editor_tool.view(
                tool_input["path"], tool_input.get("view_range")
            )
        elif command == "str_replace":
            return text_editor_tool.str_replace(
                tool_input["path"], tool_input["old_str"], tool_input["new_str"]
            )
        elif command == "create":
            return text_editor_tool.create(
                tool_input["path"], tool_input["file_text"]
            )
        elif command == "insert":
            return text_editor_tool.insert(
                tool_input["path"],
                tool_input["insert_line"],
                tool_input["new_str"],
            )
        elif command == "undo_edit":
            return text_editor_tool.undo_edit(tool_input["path"])
        else:
            raise Exception(f"Unknown text editor command: {command}")
    else:
        raise Exception(f"Unknown tool name: {tool_name}")


def run_tools(message):
    tool_requests = [
        block for block in message.content if block.type == "tool_use"
    ]
    tool_result_blocks = []

    for tool_request in tool_requests:
        try:
            tool_output = run_tool(tool_request.name, tool_request.input)
            tool_result_block = {
                "type": "tool_result",
                "tool_use_id": tool_request.id,
                "content": json.dumps(tool_output),
                "is_error": False,
            }
        except Exception as e:
            tool_result_block = {
                "type": "tool_result",
                "tool_use_id": tool_request.id,
                "content": f"Error: {e}",
                "is_error": True,
            }

        tool_result_blocks.append(tool_result_block)

    return tool_result_blocks

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/home/jx-creator/mambaforge/envs/ctorch/lib/python3.13/site-packages/IPython/core/interactiveshell.py", line 3667, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
    ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_2023618/2283233886.py", line 4, in <module>
    text_editor_tool = TextEditorTool()
TypeError: TextEditorTool.__init__() missing 1 required positional argument: 'base_dir'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jx-creator/mambaforge/envs/ctorch/lib/python3.13/site-packages/IPython/core/interactiveshell.py", line 2176, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
        etype, value, tb, tb_offset=tb_offset
    )
  File "/home/jx-creator/mambaforge/envs/ctorch/lib/python3.13/site-packages/IPython/core/ultratb.py", line 1182, in structured_traceback
    return FormattedTB.structured_traceba

In [37]:
# Make the text edit schema based on the model version being used
def get_text_edit_schema(model):
    if model.startswith("claude-3-7-sonnet"):
        return {
            "type": "text_editor_20250124",
            "name": "str_replace_editor",
        }
    elif model.startswith("claude-3-5-sonnet"):
        return {
            "type": "text_editor_20241022",
            "name": "str_replace_editor",
        }
    else:
        raise ValueError(
            f"Editor schema version not known for model: {model}. Reference Anthropic docs for the correct schema version."
        )

In [31]:
# Run the conversation in a loop until the model doesn't ask for a tool use
def run_conversation(messages):
    while True:
        response = chat(
            messages,
            tools=[get_text_edit_schema(model)],
        )

        add_assistant_message(messages, response)
        print(text_from_message(response))

        if response.stop_reason != "tool_use":
            break

        tool_results = run_tools(response)
        add_user_message(messages, tool_results)

    return messages

In [34]:
messages = []
file = Path.cwd() / "main.py"
file = file.absolute()

add_user_message(
    messages,
    f"""
   Open {file} and summarize its contents.
    """,
)

run_conversation(messages)

I'll help you view the contents of the main.py file and provide a summary.I'll help you view the contents of the main.py file and provide a summary.
I apologize for the error. Let me try to access the file again using the correct approach:I apologize for the error. Let me try to access the file again using the correct approach:
I apologize for the continued error. It seems there might be an issue with the file path or the file access. Let me try to view the directory first to verify the path:I apologize for the continued error. It seems there might be an issue with the file path or the file access. Let me try to view the directory first to verify the path:
I see the issue now. It appears that the file path you provided is outside the allowed directory for this environment. Access to that location is restricted.

Could you please verify the correct path or provide the file through another method that's accessible within the allowed directories? Alternatively, you could create a copy of 

[{'role': 'user',
  'content': '\n   Open /home/jx-creator/Projects/console/tutorials/tool_use/main.py and summarize its contents.\n    '},
 {'role': 'assistant',
  'content': [TextBlock(citations=None, text="I'll help you view the contents of the main.py file and provide a summary.", type='text'),
   ToolUseBlock(id='toolu_01V2duRRMFLXDDN1by2Kuhu8', input={'command': 'view', 'path': '/home/jx-creator/Projects/console/tutorials/tool_use/main.py'}, name='str_replace_editor', type='tool_use')]},
 {'role': 'user',
  'content': [{'type': 'tool_result',
    'tool_use_id': 'toolu_01V2duRRMFLXDDN1by2Kuhu8',
    'content': "Error: 'list' object has no attribute 'split'",
    'is_error': True}]},
 {'role': 'assistant',
  'content': [TextBlock(citations=None, text='I apologize for the error. Let me try to access the file again using the correct approach:', type='text'),
   ToolUseBlock(id='toolu_016LRoWiWLkLxRFava88qZc6', input={'command': 'view', 'path': '/home/jx-creator/Projects/console/tutor