In [1]:
!ada credentials update --account=491718866258 --provider=isengard --role=Admin --profile=default --once

2025/03/12 18:35:48 Refreshing aws credentials for default
2025/03/12 18:35:52 Successfully refreshed aws credentials for default


In [2]:
import os
import logging
import boto3

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Configuration
AWS_REGION = "us-east-1"
BEDROCK_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
CODEBASE_PATH = "/Users/wuzhiche/Workspace/ATE"  # Default path
BATCH_SIZE = 8

# Use boto3 to load credentials from ~/.aws/credentials default profile
session = boto3.Session(profile_name="default")
credentials = session.get_credentials()
if credentials:
    logger.info(f"Loaded credentials from ~/.aws/credentials: Access Key ID = {credentials.access_key[:4]}...")
else:
    logger.warning("No credentials found in ~/.aws/credentials. Configure via AWS CLI ('aws configure').")


2025-03-12 18:35:52,552 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-03-12 18:35:52,552 - INFO - Loaded credentials from ~/.aws/credentials: Access Key ID = ASIA...


In [3]:
import operator
from typing import List, TypedDict, Annotated
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph.message import add_messages

class OpenFilesSchema(BaseModel):
    """Schema for the open_files tool."""
    file_paths: List[str] = Field(description="List of file paths to open, relative to the codebase root.")

class ChatState(TypedDict):
    messages: Annotated[List[HumanMessage | AIMessage | ToolMessage], add_messages]
    all_files_opened: Annotated[List[str], operator.add]


In [4]:
from langchain.tools import Tool, StructuredTool
from langchain_core.utils.function_calling import convert_to_openai_tool
from typing import List

def create_tools(filesystem):
    """Create and return the tools used by the chatbot."""
    
    def open_files(file_paths: List[str]) -> str:
        """Open and return the contents of the specified files (up to 30000 chars each)."""
        return filesystem.read_files(file_paths, max_chars=30000)
    
    tools = [
        Tool(
            name="get_file_structure",
            func=filesystem.get_file_structure,
            description="Retrieve the file structure of the codebase."
        ),
        StructuredTool.from_function(
            func=open_files,
            name="open_files",
            description="Open and retrieve contents of files from the codebase.",
            args_schema=OpenFilesSchema
        )
    ]
    
    return tools, [convert_to_openai_tool(t) for t in tools]


In [5]:
import os
import logging
from typing import List
from pathlib import Path

logger = logging.getLogger(__name__)

class FileSystem:
    def __init__(self, codebase_path: str):
        self.path = Path(codebase_path)
        self.files = self._list_files()

    def _list_files(self) -> List[str]:
        """List all text files in the codebase."""
        text_extensions = {'.java', '.py', '.txt', '.js', '.cpp', '.h', '.yml', '.yaml', '.properties'}
        return [str(f.relative_to(self.path)) for f in self.path.rglob("*") 
                if f.is_file() and f.suffix.lower() in text_extensions]

    def get_file_structure(self) -> str:
        """Generate a nested file structure with total size."""
        total_size = sum(f.stat().st_size for f in self.path.rglob("*") if f.is_file()) / 1024
        structure = f"{self.path.name} ({total_size:.1f}KB, {len(self.files)} files)\n"
        packages = {}
        for file in self.files:
            parts = file.split(os.sep)
            pkg = "/".join(parts[:-1]) if len(parts) > 1 else ""
            if pkg not in packages:
                packages[pkg] = []
            packages[pkg].append(parts[-1])
        for pkg, files in sorted(packages.items()):
            if pkg:
                structure += f"├── {pkg} ({len(files)} files)\n"
            for file in sorted(files):
                structure += f"│   ├── {file}\n" if pkg else f"├── {file}\n"
        return structure.strip()

    def read_files(self, file_paths: List[str], max_chars: int = 30000) -> str:
        """Read content of selected files, trimmed to max_chars, with path cleaning."""
        contents = {}
        cleaned_paths = []

        if isinstance(file_paths, str):
            fp_cleaned = file_paths.replace('\n', '').strip()
            if fp_cleaned.startswith('[') and fp_cleaned.endswith(']'):
                try:
                    import ast
                    cleaned_paths = ast.literal_eval(fp_cleaned)
                    logger.debug("Converted stringified files list: %s", cleaned_paths)
                except (ValueError, SyntaxError) as e:
                    logger.error("Failed to parse stringified file list: %s", str(e))
                    return "No valid file contents retrieved."
            else:
                cleaned_paths = [fp_cleaned]
        elif isinstance(file_paths, list):
            cleaned_paths = file_paths

        for fp in cleaned_paths:
            fp_clean = fp.strip().strip("'\"[]").strip()
            fp_clean = ''.join(c for c in fp_clean if not c.isdigit()).strip().lstrip('. ').strip()
            if not fp_clean or fp_clean == '/':
                logger.warning(f"Skipping invalid file path: {fp}")
                continue
            full_path = self.path / fp_clean
            if full_path.exists() and full_path.is_file():
                try:
                    with open(full_path, "r", encoding="utf-8") as f:
                        content = f.read()
                        contents[fp_clean] = content[:max_chars] + ("..." if len(content) > max_chars else "")
                except Exception as e:
                    logger.error(f"Error reading file {fp_clean}: {str(e)}")
            else:
                logger.warning(f"File not found or not a file: {fp_clean}")
        return "\n\n".join([f"{fp}:\n{cont}" for fp, cont in contents.items()]) if contents else "No valid file contents retrieved."


In [6]:
from typing import Dict, Any
import logging
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from config import session, AWS_REGION, BEDROCK_MODEL

logger = logging.getLogger(__name__)


def route_tools(state: ChatState):
    if not state["messages"]:
        return END
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return END

class CodeExplorerChatbot:
    def __init__(self, codebase_path: str):
        self.fs = FileSystem(codebase_path)
        self.checkpointer = MemorySaver()
        self.llm = ChatBedrock(
            model_id=BEDROCK_MODEL,
            region_name=session.region_name or AWS_REGION,
            model_kwargs={"temperature": 0.7, "max_tokens": 200000}
        )
        self.tools, openai_tools = create_tools(self.fs)
        self.llm_with_tools = self.llm.bind_tools(openai_tools)
        self._initialize_workflow()

    def _initialize_workflow(self):
        async def agent(state: ChatState, config) -> ChatState:
            response = await self.llm_with_tools.ainvoke(state["messages"], config)
            return {
                "messages": [response]
            }
    
        def execute_tools(state: ChatState) -> ChatState:
            messages = []
            last_message = state["messages"][-1]
            all_files_opened = []
            
            for tool_call in last_message.tool_calls:
                if tool_call["name"] == "open_files":
                    file_paths = tool_call["args"].get("file_paths", [])
                    result = self.tools[1].func(file_paths)
                    all_files_opened.extend(file_paths)
                elif tool_call["name"] == "get_file_structure":
                    result = self.tools[0].func()
                else:
                    result = f"Unknown tool: {tool_call['name']}"
                messages.append(ToolMessage(
                    content=result, 
                    tool_call_id=tool_call["id"]
                ))
            
            return {
                "messages": messages,
                "all_files_opened": all_files_opened
            }

        workflow = StateGraph(ChatState)
        workflow.add_node("agent", agent)
        workflow.add_node("tools", execute_tools)
        workflow.set_entry_point("agent")
        
        workflow.add_conditional_edges(
            "agent",
            route_tools,
            {"tools": "tools", END: END}
        )
        
        workflow.add_edge("tools", "agent")
        self.app = workflow.compile(
            checkpointer=self.checkpointer
        )

2025-03-12 18:35:53,447 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-03-12 18:35:53,447 - INFO - Loaded credentials from ~/.aws/credentials: Access Key ID = ASIA...


In [7]:
a = CodeExplorerChatbot(CODEBASE_PATH)

2025-03-12 18:35:53,495 - INFO - Found credentials in shared credentials file: ~/.aws/credentials


In [8]:
async for message_chunk, metadata in a.app.astream(
    {"messages": [HumanMessage(content="hello")]},
    {"configurable": {"thread_id": "1"}},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, flush=True)

[{'type': 'text', 'text': 'Hello', 'index': 0}]
[{'type': 'text', 'text': '!', 'index': 0}]


In [10]:
async for message_chunk, metadata in a.app.astream(
    {"messages": [HumanMessage(content="what's the file structure")]},
    {"configurable": {"thread_id": "1"}},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, flush=True)

[{'type': 'text', 'text': 'To', 'index': 0}]
[{'type': 'text', 'text': ' get', 'index': 0}]
[{'type': 'text', 'text': ' the', 'index': 0}]
[{'type': 'text', 'text': ' file', 'index': 0}]
[{'type': 'text', 'text': ' structure', 'index': 0}]
[{'type': 'text', 'text': ' of', 'index': 0}]
[{'type': 'text', 'text': ' the', 'index': 0}]
[{'type': 'text', 'text': ' c', 'index': 0}]
[{'type': 'text', 'text': 'ode', 'index': 0}]
[{'type': 'text', 'text': 'base', 'index': 0}]
[{'type': 'text', 'text': ',', 'index': 0}]
[{'type': 'text', 'text': ' we', 'index': 0}]
[{'type': 'text', 'text': ' can', 'index': 0}]
[{'type': 'text', 'text': ' invoke', 'index': 0}]
[{'type': 'text', 'text': ' the', 'index': 0}]
[{'type': 'text', 'text': ' `', 'index': 0}]
[{'type': 'text', 'text': 'get', 'index': 0}]
[{'type': 'text', 'text': '_', 'index': 0}]
[{'type': 'text', 'text': 'file', 'index': 0}]
[{'type': 'text', 'text': '_', 'index': 0}]
[{'type': 'text', 'text': 'structure', 'index': 0}]
[{'type': 'text',

In [11]:
a.app.get_state({"configurable": {"thread_id": "1"}})

StateSnapshot(values={'messages': [HumanMessage(content='hello', additional_kwargs={}, response_metadata={}, id='e8335f01-a693-4451-886e-aeaca7721892'), AIMessage(content=[{'type': 'text', 'text': 'Hello!', 'index': 0}], additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-f1711496-aca9-4c11-be41-e8c185f644e2', usage_metadata={'input_tokens': 330, 'output_tokens': 5, 'total_tokens': 335}), HumanMessage(content="what's the file structure", additional_kwargs={}, response_metadata={}, id='b84726d4-4bd8-4bb5-a512-a454ebb02980'), AIMessage(content=[{'type': 'text', 'text': 'To get the file structure of the codebase, we can invoke the `get_file_structure` tool:', 'index': 0}, {'type': 'tool_use', 'id': 'toolu_bdrk_016MGJDYt5JJhd1BdASLJHLd', 'name': 'get_file_structure', 'input': {}, 'index': 1, 'partial_json': '{"__arg1": "."}'}], additional_kwargs={}, response_metadata={'stop_reason': 'tool_use', 'stop_sequence': None}, id='run-51103cc0-807e-43