# Code Generation

## How to develop AI Engineer

![Intro](../imgs/intro.png)

## Langgraph Architecture

![langgraph.png](../imgs/langgraph.png)

# Set .env file

```bash
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=XXX
LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
ANTHROPIC_API_KEY=XXX
```

In [1]:
%load_ext dotenv

# Langgraph

In [2]:
# common
import os
from typing import Dict, TypedDict

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        keys: A dictionary where each key is a string.
    """

    keys: Dict[str, any]

COLOR = {
    "HEADER": "\033[95m",
    "BLUE": "\033[94m",
    "GREEN": "\033[92m",
    "RED": "\033[91m",
    "ENDC": "\033[0m",
}

In [3]:
# nodes

import sys, os
from operator import itemgetter
import subprocess
import tempfile

from langchain.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_anthropic.experimental import ChatAnthropicTools
from langchain import hub

ANTHROPIC_API_KEY=os.getenv("ANTHROPIC_API_KEY")

class Nodes:
    def __init__(self, context: str, debug: bool = False):
        self.context = context
        self.debug = debug
        self.model = (
            "claude-3-opus-20240229"
        )
        self.node_map = {
            "generate": self.generate,
            "check_code_execution": self.check_code_execution,
            "finish": self.finish,
        }

    def generate(self, state: GraphState) -> GraphState:
        """
        Generate a code solution based on docs and the input question
        with optional feedback from code execution tests

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): New key added to state, documents, that contains retrieved documents
        """

        ## State
        state_dict = state["keys"]
        question = state_dict["question"]
        iter = state_dict["iterations"]

        ## Data model
        class code(BaseModel):
            """Code output"""

            prefix: str = Field(
                description="Description of the problem and approach"
            )
            code: str = Field(
                description="Code block not including import statements"
            )
            
        llm_with_tool = ChatAnthropicTools(
            model=self.model
            ).bind_tools(
            tools=[code],
            tool_choice="code",
        ).with_config(
            run_name="code"
        ) 

        # Parser
        parser_tool = PydanticToolsParser(tools=[code])

        ## Prompt
        prompt = hub.pull("neuronslab/aws_cdk_engineer")

        # Chain
        chain = (
            {
                "context": lambda _: self.context,
                "question": itemgetter("question"),
                "generation": itemgetter("generation"),
                "error": itemgetter("error"),
            }
            | prompt
            | llm_with_tool
            | parser_tool
        )

        ## Generation
        if "error" in state_dict:
            print("---RE-GENERATE SOLUTION w/ ERROR FEEDBACK---")

            error = state_dict["error"]
            code_solution = state_dict["generation"]

            code_solution = chain.invoke(
                {
                    "question": question,
                    "generation": str(code_solution[0]),
                    "error": error,
                }
            )

        else:
            print("---GENERATE SOLUTION---")

            code_solution = chain.invoke(
                {
                    "question": question,
                    "generation": "",
                    "error": ""
                }
            )

        iter = iter + 1
        return {
            "keys": {
                "generation": code_solution,
                "question": question,
                "iterations": iter,
            }
        }

    def check_code_execution(self, state: GraphState) -> GraphState:
        """
        Check code block execution

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): New key added to state, error
        """

        ## State
        print("---CHECKING CODE EXECUTION---")
        state_dict = state["keys"]
        question = state_dict["question"]
        code_solution = state_dict["generation"]
        prefix = code_solution[0].prefix
        code_block = code_solution[0].code
        iter = state_dict["iterations"]

        print(
            f"{COLOR['GREEN']}{code_block}{COLOR['ENDC']}",
            sep="\n",
        )


        
        tmp = tempfile.NamedTemporaryFile()
        tmp.write(code_block.encode())
        tmp.flush()
        result = subprocess.run(
            f'cdk synth --validation --app "python3 {tmp.name}"',
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            )
        tmp.close()


        output, error = result.stdout, result.stderr
        if result.returncode:
            print("---CODE BLOCK CHECK: FAILED---")
            error = f"Execution error: {error}"
            print(f"Error: {error}", file=sys.stderr)
            if "error" in state_dict:
                error_prev_runs = state_dict["error"]
                error = (
                    error_prev_runs
                    + "\n --- Most recent run output and error --- \n"
                    " ------ output ------ \n"
                    + output
                    + "\n ------ error ------ \n"
                    + error
                )
        else:
            print("---CODE BLOCK CHECK: SUCCESS---")
            # No errors occurred
            error = "None"

        return {
            "keys": {
                "generation": code_solution,
                "question": question,
                "error": error,
                "prefix": prefix,
                "iterations": iter,
                "code": code_block,
            }
        }

    def finish(self, state: GraphState) -> dict:
        """
        Finish the graph

        Returns:
            dict: Final result
        """

        print("---FINISHING---")

        response = extract_response(state)

        return {"keys": {"response": response}}


def extract_response(state: GraphState) -> str:
    """
    Extract the response from the graph state

    Args:
        state (dict): The current graph state

    Returns:
        str: The response
    """

    state_dict = state["keys"]
    code_solution = state_dict["generation"][0]
    prefix = code_solution.prefix
    code = code_solution.code

    return {
        "prefix": prefix,
        "code": code,
    }


In [4]:
# Edges

"""Defines functions that transition our agent from one state to another."""


def enrich(graph):
    """Adds transition edges to the graph."""

    graph.add_edge("generate", "check_code_execution")
    graph.add_conditional_edges(
        "check_code_execution",
        decide_to_finish,
        {
            "finish": "finish",
            "generate": "generate",
        },
    )

    return graph


def decide_to_check_code_exec(state: GraphState) -> str:
    """
    Determines whether to test code execution, or re-try answer generation.

    Args:
    state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---DECIDE TO TEST CODE EXECUTION---")
    state_dict = state["keys"]
    error = state_dict["error"]

    if error == "None":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: TEST CODE EXECUTION---")
        return "check_code_execution"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: RE-TRY SOLUTION---")
        return "generate"


def decide_to_finish(state: GraphState) -> str:
    """
    Determines whether to finish (re-try code 3 times).

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---DECIDE TO TEST CODE EXECUTION---")
    state_dict = state["keys"]
    error = state_dict["error"]
    iter = state_dict["iterations"]

    if error == "None" or iter >= 3:
        print("---DECISION: FINISH---")
        return "finish"
    else:
        print("---DECISION: RE-TRY SOLUTION---")
        return "generate"


In [5]:
# Agent

def construct_graph(debug=False):
    from langgraph.graph import StateGraph

    context = "" # retrieval.retrieve_docs(debug=debug)

    graph = StateGraph(GraphState)

    # attach our nodes to the graph
    graph_nodes = Nodes(context, debug=debug)
    for key, value in graph_nodes.node_map.items():
        graph.add_node(key, value)

    # construct the graph by adding edges
    graph = enrich(graph)

    # set the starting and ending nodes of the graph
    graph.set_entry_point(key="generate")
    graph.set_finish_point(key="finish")

    return graph

In [6]:
# App
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

web_app = FastAPI(
    title="CodeLangChain Server",
    version="1.0",
    description="Answers questions about LangChain Expression Language (LCEL).",
)


# Set all CORS enabled origins
web_app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["*"],
)


def serve():
    from langchain_core.runnables import RunnableLambda
    from langserve import add_routes

    def inp(question: str) -> dict:
        return {"keys": {"question": question, "iterations": 0}}

    def out(state: dict) -> str:
        if "keys" in state:
            return state["keys"]["response"]
        elif "generate" in state:
            return extract_response(state["generate"])
        else:
            return str(state)

    graph = construct_graph(debug=False).compile()

    chain = RunnableLambda(inp) | graph | RunnableLambda(out)

    add_routes(
        web_app,
        chain,
        path="/codelangchain",
    )

    return web_app


In [7]:
def go(question: str = "Create VPC with 2 Public subnets", debug: bool = False):
    """Compiles the code generation agent graph and runs it, returning the result."""
    graph = construct_graph(debug=debug)
    runnable = graph.compile()
    result = runnable.invoke(
        {"keys": {"question": question, "iterations": 0}},
        config={"recursion_limit": 50},
    )

    return result

In [8]:
question = """Create a EC2 instance with latest version of Amazon Linux.
Install the latest version of nginx, enable it and start the service.
Instance is in public subnet.
Find existing VPC vpc-0fed7b21fa59b0985, in us-east-1 in 433559402488 aws account for the instance.
Open port 80 and 443 in the security group to 0.0.0.0/0.
Secure the instance with EBS encryption.
"""
result = go(question=question)

  warn_beta(


---GENERATE SOLUTION---
---CHECKING CODE EXECUTION---
[92mfrom aws_cdk import (
    aws_ec2 as ec2,
    aws_iam as iam,
    Stack,
    Tags,
    App
)
from constructs import Construct

class Ec2Stack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)
        
        # Find existing VPC
        vpc = ec2.Vpc.from_lookup(self, "VPC",
            vpc_id="vpc-0fed7b21fa59b0985"
        )

        # Create EC2 instance role and policies 
        role = iam.Role(self, "ec2Role", 
            assumed_by=iam.ServicePrincipal("ec2.amazonaws.com")
        )
        role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))

        # AMI
        amzn_linux = ec2.MachineImage.latest_amazon_linux(
            generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2
        )
        
        # Instance
        instance = ec2.Instance(self, "Instance",
            instance_type=ec2.Inst

In [10]:
# parse http response and extract json

import json

def parse_sse_and_extract_json(sse_content):
    events = sse_content.strip().split("\n\n")
    for event in events:
        lines = [line for line in event.split("\n") if line]  # Remove empty lines
        for i, line in enumerate(lines):
            if line.startswith("event: data"):
                # Assuming the very next line after "event: data" contains the JSON data
                if i + 1 < len(lines) and lines[i+1].startswith("data:"):
                    try:
                        json_data = json.loads(lines[i+1][len("data:"):].strip())
                        return json_data
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON: {e}")
                        return None
    return None

In [11]:
import requests

response = requests.post(
    "http://localhost:8000/codelangchain/stream",
    json={
        "input": "Create VPC with 2 Public subnets",
        "config": {},
        "kwargs": {}
    }
)
code = parse_sse_and_extract_json(response.text)["code"]
print(code)

from aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    App
)
from constructs import Construct

class VpcStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        vpc = ec2.Vpc(
            self, "MyVpc", 
            max_azs=2, # Use 2 AZs
            cidr="10.0.0.0/16", 
            
            # Configure 2 public subnets 
            subnet_configuration=[
                ec2.SubnetConfiguration(
                    name="PublicSubnet1",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="PublicSubnet2",  
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                )
            ]
        )

# Create the CDK app and VPC stack        
app = App()
VpcStack(app, "VpcStack")

# Generate the CloudFormation template
app.syn

In [31]:
import tempfile
tmp = tempfile.NamedTemporaryFile()
tmp.write(code.encode())
tmp.flush()

In [19]:
tmp.name

'/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpl6ycro4b'

In [23]:
!cdk synth --validation --app "python3 /var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpl6ycro4b"


  Use ipAddresses instead
  This API will be removed in the next major release.
Resources:
  MyVpcF9F0CA6F:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.0.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      InstanceTenancy: default
      Tags:
        - Key: Name
          Value: VpcStack/MyVpc
    Metadata:
      aws:cdk:path: VpcStack/MyVpc/Resource
  MyVpcPublicSubnet1Subnet1SubnetAB9C6797:
    Type: AWS::EC2::Subnet
    Properties:
      AvailabilityZone:
        Fn::Select:
          - 0
          - Fn::GetAZs: ""
      CidrBlock: 10.0.0.0/24
      MapPublicIpOnLaunch: true
      Tags:
        - Key: aws-cdk:subnet-name
          Value: PublicSubnet1
        - Key: aws-cdk:subnet-type
          Value: Public
        - Key: Name
          Value: VpcStack/MyVpc/PublicSubnet1Subnet1
      VpcId:
        Ref: MyVpcF9F0CA6F
    Metadata:
      aws:cdk:path: VpcStack/MyVpc/PublicSubnet1Subnet1/Subnet
  MyVpcPublicSubnet1Subnet1RouteTableE4064ACD:
    

# Architecture Diagram to prompt

In [19]:
import boto3
import json
import base64

bedrock = boto3.client('bedrock-runtime')

def describe_image(image):
    """
    Describe an image using the Bedrock API
    """
    # Convert to base64 encoding
    image_base64 = base64.b64encode(image).decode('utf-8')

    # # Body for Claude v3 Sonnet
    body = json.dumps({
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 10000,
    "messages": [
        {
            "role": "user",
            "content": [
            {
                "type": "image",
                "source": {
                "type": "base64",
                "media_type": "image/jpeg",
                "data": image_base64
                }
            },
            {
                "type": "text",
                "text": """Your task is to analyze the diagram to capture all of the AWS resources, configurations and components it depicts,
    along with the relationships between them. The end goal is to gather the necessary information to
    create tasks for Cloud Engineer to create the infrastructure-as-code."""
            }
            ]
        }
        ]
    })
    # Parameters for Claude V3 sonnet
    model_id = 'anthropic.claude-3-sonnet-20240229-v1:0'
    accept = 'application/json'
    content_type = 'application/json'

    # Invoke Bedrock API
    response = bedrock.invoke_model(body=body, modelId=model_id, accept=accept, contentType=content_type)

    # Parse the response body
    response_body = json.loads(response.get('body').read())
    #print(response_body)

    # Extract text
    text = response_body['content'][0]['text']
    return text

![DemoDiagram.png](../imgs/DemoDiagram.png)

In [21]:
image_file= open("../imgs/langgraph.png", "rb")
image = image_file.read()
architecture = describe_image(image)
print(architecture)

{'id': 'msg_019outdYwr9MghyE6o2zdEXM', 'type': 'message', 'role': 'assistant', 'content': [{'type': 'text', 'text': "The diagram appears to depict a code generation and execution system. Let me break it down for you:\n\nComponents:\n1. Node: This seems to be a component that takes a question as input and generates an answer in the form of preamble code and actual code.\n2. Code Execution Test Node: This component takes the generated code and tests its execution, determining if it fails or not.\n3. Tools: The diagram shows three tools - Code Samples, API Docs, and Google API, which are likely used as references for code generation.\n\nWorkflow:\n1. A question is provided as input to the Node component.\n2. The Node component generates an answer consisting of a preamble and code, possibly using the Code Samples, API Docs, and Google API as references.\n3. The generated code is then passed to the Code Execution Test Node for testing.\n4. If the code execution fails, it loops back to the N

In [24]:
message = """Install the latest version of nginx, enable it and start the service on the EC2 instance
Install the instance in the existing VPC vpc-0fed7b21fa59b0985, in us-east-1 in 433559402488 aws account for the instance.
Secure the instance with EBS encryption."""

go(question=message+"\n\n"+architecture)

---GENERATE SOLUTION---
---CHECKING CODE EXECUTION---
[92mfrom aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    App
)
from constructs import Construct

class Ec2Stack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
        
        # Get the existing VPC 
        vpc = ec2.Vpc.from_lookup(self, "VPC",
            vpc_id="vpc-0fed7b21fa59b0985"
        )
        
        # AMI
        amzn_linux = ec2.MachineImage.latest_amazon_linux(
            generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
            edition=ec2.AmazonLinuxEdition.STANDARD,
            virtualization=ec2.AmazonLinuxVirt.HVM,
            storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE
        )

        # Create security group for EC2 
        sec_group = ec2.SecurityGroup(
            self, "SecurityGroup",
            vpc=vpc,
            allow_all_outbound=True
        )
        
        # Add inbound SSH and H

Error: Execution error: b'jsii.errors.JavaScriptError: \n  @jsii/kernel.RuntimeError: Error: Cannot retrieve value from context provider vpc-provider since account/region are not specified at the stack level. Configure "env" with an account and region when you define your stack.See https://docs.aws.amazon.com/cdk/latest/guide/environments.html for more details.\n      at Kernel._Kernel_ensureSync (/private/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpjlf_ehpj/lib/program.js:10491:23)\n      at Kernel.sinvoke (/private/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpjlf_ehpj/lib/program.js:9876:102)\n      at KernelHost.processRequest (/private/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpjlf_ehpj/lib/program.js:11696:36)\n      at KernelHost.run (/private/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpjlf_ehpj/lib/program.js:11656:22)\n      at Immediate._onImmediate (/private/var/folders/z8/mdmwx43d3dncqq27d29dhhjw0000gn/T/tmpjlf_ehpj/lib/program.js:11657:46)\n     

---RE-GENERATE SOLUTION w/ ERROR FEEDBACK---
---CHECKING CODE EXECUTION---
[92mfrom aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    App
)
from constructs import Construct

class Ec2Stack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
        
        # Get the existing VPC 
        vpc = ec2.Vpc.from_lookup(self, "VPC",
            vpc_id="vpc-0fed7b21fa59b0985"
        )
        
        # AMI
        amzn_linux = ec2.MachineImage.latest_amazon_linux(
            generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
            edition=ec2.AmazonLinuxEdition.STANDARD,
            virtualization=ec2.AmazonLinuxVirt.HVM,
            storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE
        )

        # Create security group for EC2 
        sec_group = ec2.SecurityGroup(
            self, "SecurityGroup",
            vpc=vpc,
            allow_all_outbound=True
        )
        
        # 

{'keys': {'response': {'prefix': '\nThis code creates an EC2 instance in the existing VPC vpc-0fed7b21fa59b0985 in the us-east-1 region and 433559402488 AWS account. \n\nIt installs the latest version of nginx on the instance, enables it, and starts the nginx service. \n\nThe EBS volume attached to the instance is encrypted for security.\n',
   'code': 'from aws_cdk import (\n    Stack,\n    aws_ec2 as ec2,\n    App\n)\nfrom constructs import Construct\n\nclass Ec2Stack(Stack):\n\n    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:\n        super().__init__(scope, construct_id, **kwargs)\n        \n        # Get the existing VPC \n        vpc = ec2.Vpc.from_lookup(self, "VPC",\n            vpc_id="vpc-0fed7b21fa59b0985"\n        )\n        \n        # AMI\n        amzn_linux = ec2.MachineImage.latest_amazon_linux(\n            generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,\n            edition=ec2.AmazonLinuxEdition.STANDARD,\n            virtualization