## Imports

In [83]:
from typing import Annotated
from langchain_experimental.utilities import PythonREPL
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage,ToolMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers import LanguageParser
from langchain_text_splitters import Language
from langchain_together import ChatTogether,Together
from together import Together as TogetherOG
from langchain_community.chat_models.ollama import ChatOllama

from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal, List, Union
import ast
import os
import re
import glob


from IPython.display import Image, display
from dotenv import load_dotenv
load_dotenv("./.env")

True

## Custom Engines

In [2]:
class LangchainJSONEngine:
    def __init__(self, sampleBaseModel: BaseModel, systemPromptText: str=None, humanPromptText: str=None):
        self.llm = llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        self.structured_llm = llm.with_structured_output(sampleBaseModel)
        
        if systemPromptText is None:
            self.systemPromptText = """
            You are an AI assistant. You are helping a user with a task. The user is asking you questions and you are answering them.
            """
        else:
            self.systemPromptText = systemPromptText

        if humanPromptText is None:
            self.HumanPromptText = """
            Human: {query}
            """
        else:
            self.humanPromptText = humanPromptText

        self.prompt = ChatPromptTemplate.from_messages(
            [("system", self.systemPromptText), ("human", "Query:\n\n {query}")]
            )
        
        self.micro_agent = self.prompt | self.structured_llm

    def run(self, query: str):
        result = self.micro_agent.invoke({
            "query": query
        }) 
        return result
    

class LangchainSimpleEngine:
    def __init__(self, tools:List[tool]=[], systemPromptText: str=None, humanPromptText: str=None):
        self.llm = llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        self.tools = tools
        
        if len(tools) == 0:
            self.llm_with_tools = llm
        else:
            self.llm_with_tools = llm.bind_tools(tools)
            
        if systemPromptText is None:
            self.systemPromptText = """
            You are an AI assistant. You are helping a user with a task. The user is asking you questions and you are answering them.
            """
        else:
            self.systemPromptText = systemPromptText

        if humanPromptText is not None: 
            print("Skipping human prompt text ...")

    def run(self, query: str):
        messages = [
            SystemMessage(self.systemPromptText),
            HumanMessage(content=query)
        ]
        level1_result = self.llm_with_tools.invoke(messages)
        if len(level1_result.tool_calls) == 0:
            print("No tools to run ...")
            return level1_result
        else:
            print("Running tools ...")
            for tool_call in level1_result.tool_calls:
                tool_output = tool_call.invoke()
                messages.append(ToolMessage(tool_output, tool_call_id=tool_call["id"]))
            level2_result = self.llm_with_tools.invoke(messages)
            return level2_result

## Code Breakdown

In [3]:
def get_functions_and_classes(filepath):
    """
    It takes a file path and returns the original source code of functions and code-snippets in the file
    Code snippets are the source code of functions and classes in the file
    """
    with open(filepath, "r") as file:
        file_content = file.read()
    
    # Parse the file content into an Abstract Syntax Tree (AST)
    tree = ast.parse(file_content)
    
    # List to store the source code of functions and classes
    code_snippets = {
        "classes": [],
        "functions": []
    }

    # Walk through the AST and find all functions and classes
    for node in ast.walk(tree):
        if isinstance(node, ast.ClassDef):
            start_line = node.lineno - 1
            end_line = node.end_lineno
            code_snippets["classes"].append("".join(file_content.splitlines(keepends=True)[start_line:end_line]))
        if isinstance(node, ast.FunctionDef):
            start_line = node.lineno - 1
            end_line = node.end_lineno
            code_snippets["functions"].append("".join(file_content.splitlines(keepends=True)[start_line:end_line]))
    return file_content , code_snippets


## Code LLAMA Utils

In [4]:
def code_llama_prompt_formatter(query: str, system_prompt: str=None):
    B_INST, E_INST = "[INST]", "[/INST]"
    B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n"

    if system_prompt is None:
        SYSTEM_PROMPT = """You are helpful coding assistant. User is asking you to write a function or class for a specific task. 
        Write the function or class in the programming language specified in the query."""
    else:
        SYSTEM_PROMPT = system_prompt

    USER_INSTRUCTION = f"User: {query}"
    
    SYSTEM_PROMPT = B_SYS + SYSTEM_PROMPT + E_SYS
    PROMPT = B_INST + SYSTEM_PROMPT + USER_INSTRUCTION + E_INST
    return PROMPT

In [5]:
def extract_and_clean_code(s):
    # Extract text between ``` and ```
    pattern = r'```(.*?)```'
    matches = re.findall(pattern, s, re.DOTALL)
    
    # Remove "import logging" from each extracted code block
    cleaned_matches = [re.sub(r'\bimport logging\b', '', match).strip() for match in matches]
    
    return cleaned_matches[0]

In [6]:
# Create Code LLama Instruct Engine
class CodeLLamaInstructEngine:
    def __init__(self, systemPromptText: str=None, paramcount: int=7):
        self.AVL_PARAMS = [7,13,34]
        if paramcount not in self.AVL_PARAMS:
            raise ValueError(f"Invalid paramcount. Choose from {self.AVL_PARAMS}")
        self.paramcount = paramcount
        self.client = TogetherOG(api_key=os.environ.get('TOGETHER_API_KEY'))
        self.model = f"codellama/CodeLlama-{self.paramcount}b-Instruct-hf"
        if systemPromptText is None:
            self.systemPromptText = """
            You are an AI assistant. You are helping a user with a task. The user is asking you to write a function or class for a specific task.
            """
        else:
            self.systemPromptText = systemPromptText
        
    def run(self, query: str, clean_code: bool=True):
        PROMPT = code_llama_prompt_formatter(
            system_prompt=self.systemPromptText,
            query=query
        )
        response = self.client.completions.create(model=self.model, prompt=PROMPT)
        response_text = response.choices[0].text
        if clean_code:
            return extract_and_clean_code(response_text)
        else:
            return response_text

## Log Embedding Workflow  

In [7]:
def replace_code_in_file(file_content, old_code, new_code, handle_indentation=True):
    if not handle_indentation:
        return file_content.replace(old_code, new_code)
    
    # Count left side whitespaces in the old code
    left_spaces = len(old_code) - len(old_code.lstrip())
    # Add left spaces at the beginning of each line of the new code
    new_code = "\n".join([f"{' '*left_spaces}{line}" for line in new_code.splitlines()])
    # Replace old code with new code
    new_file_content = file_content.replace(old_code, new_code)
    return new_file_content

def read_code(filepath):
    with open(filepath, "r") as file:
        return file.read()
    

def write_code(filepath, content):
    with open(filepath, "w") as file:
        file.write(content)
    
def hash(s):
    return sum([ord(c) for c in s])

In [75]:
class LogEmbedderWorkflow:
    def __init__(self,paramcount:int=7):
        self.embedder_code_llama_engine = CodeLLamaInstructEngine(
                systemPromptText="""
                You are an AI assistant. You will be given python function code snippet and a log message.
                Firstly you need to add logging at the beginning of the function body.
                Then you need to add logging BEFORE all 'return statements' in the function.
                If there is NO return statement, add logging at the end of the function.

                Don't add any logging after the return statement, always add BEFORE the return statement.
                Don't modify the function code snippet. Don't write 'import logging'. Just add logging.
                Also maintain the indentation of the code snippet.
                """,
                paramcount=paramcount
            )
        self.rectify_code_llama_engine = CodeLLamaInstructEngine(
                systemPromptText="""
                You are an AI assistant. You will be given python function code snippet with logging added.
                
                You have to check if there is any logging after return statement in the function that is unreachable.
                You have to rellocate such logging before each return statement in the function.

                Don't modify the rest of the function code snippet.
                """,
                paramcount=paramcount)

    @DeprecationWarning 
    def create_user_prompt(self, function_code_snippet, start_log_message, end_log_message):
        return f"""
        The function where the logging should be added is:
        ```
        {function_code_snippet}
        ```

        The log message to be added at beggining of the function body is:
        ```
        {start_log_message}
        ```

        The log message to be added before all return statements is (if there is no return statement, then add at the end of the function):
        ```
        {end_log_message}
        ```

        DON'T add any logging AFTER the return statement , always ADD BEFORE the return statement.
        """
    
    @DeprecationWarning
    def create_rectify_user_prompt(self, function_code_snippet):
        return f"""
        The function where the logging should be rectified is:
        ```
        {function_code_snippet}
        ```

        You have to check if there is any logging after return statement in the function that is unreachable.
        You have to rellocate such logging before each return statement in the function.
        """
    
    def add_logging_to_function(self,function_code_snippet, start_log_message_snippet, end_log_message_snippet):
        # Add logging at the beginning of the function body
        second_line = function_code_snippet.splitlines()[1]
        second_line_ws = len(second_line) - len(second_line.lstrip())
        intended_start_log_message_snippet = f"\n{' '*second_line_ws}{start_log_message_snippet}\n"
        function_code_snippet = function_code_snippet.splitlines()[0] + intended_start_log_message_snippet + "\n".join(function_code_snippet.splitlines()[1:])

        # Add logging before all return statements
        reconstructed_function_code_snippet = ""
        for line in function_code_snippet.splitlines():
            if line.strip().startswith("return"):
                ws = len(line) - len(line.lstrip())
                reconstructed_function_code_snippet += f"\n{' '*ws}{end_log_message_snippet}\n{line}\n"
            else:
                reconstructed_function_code_snippet += f"\n{line}"
            
        return reconstructed_function_code_snippet
    
    def modify_functions(self,function_code_snippets):
        old_and_new_code_snippets = {}
        hash_values = []
        for function in function_code_snippets:
            print(f"Log: Modifying function: {function.splitlines()[0]}")
            hash_value = hash(function)
            logging_start = f"logging.info('<START{hash_value}>')"
            logging_end = f"logging.info('<END{hash_value}>')"

            # # LLM INVOKATION START
            # user_prompt = self.create_user_prompt(function, logging_start, logging_end)
            # response = self.embedder_code_llama_engine.run(user_prompt)
            # # LLM INVOKATION END

            # Manually adding logging to the function
            response = self.add_logging_to_function(function, logging_start, logging_end)

            # print("----- Original Function -----")
            # print(function)
            # print("----- Modified Function -----")
            # print(response)

            old_and_new_code_snippets[function] = response
            hash_values.append(hash_value)

        return old_and_new_code_snippets,hash_values

    def hash_value_to_line_number(self, file_content, old_and_new_code_snippets, relative_filepath):
        hash_to_lineno = {}
        for old_code_snip, new_code_snip in old_and_new_code_snippets.items():
            hash_value = hash(old_code_snip)
            # Starting index where old_code_snip is present in the file_content
            start_index = file_content.find(old_code_snip)
            # Ending index where old_code_snip is present in the file_content
            end_index = start_index + len(old_code_snip)
            
            # Find the line number of the starting index
            start_index_lineno = file_content[:start_index].count("\n")
            # Find the line number of the ending index
            end_index_lineno = file_content[:end_index].count("\n")
            
            hash_to_lineno[hash_value] = [start_index_lineno, end_index_lineno, relative_filepath]
        return hash_to_lineno

    def run(self, read_filepath, write_filepath, logging_filepath):
        """
        read_filepath: str: The path of the original file to read
        write_filepath: str: The path of the file to write the modified content
        logging_filepath: str: The path of the logging file to write the logging content when the code is executed
        """
        # Read the content of the file
        logging_header = f"import logging\nlogging.basicConfig(filename='{logging_filepath}', level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')"
        file_content, code_snippets = get_functions_and_classes(read_filepath)
        # Modify the functions
        old_and_new_code_snippets,hash_values = self.modify_functions(code_snippets["functions"])

        # Hash value to line number mapping
        hash_to_lineno = self.hash_value_to_line_number(file_content, old_and_new_code_snippets, read_filepath)

        # Add logging header to the file content
        file_content = f"{logging_header}\n{file_content}"
        # Replace the old code with the new code in the file content
        for old_code_snip, new_code_snip in old_and_new_code_snippets.items():
            hash_value = hash(old_code_snip)
            file_content = replace_code_in_file(file_content, old_code_snip, new_code_snip,handle_indentation=False) # handle_indentation is False because indentation is not altered in the modified code

        # Write the modified content to the file
        write_code(write_filepath, file_content)
        print("Log: Done")

        return old_and_new_code_snippets,hash_to_lineno

## To Ananlyze Input Parameters

In [112]:
#  A function to read body inside if __name__ == '__main__': block

def read_main_block(filepath):
    with open(filepath, "r") as file:
        file_content = file.read()
    tree = ast.parse(file_content)
    main_block = ""
    for node in ast.walk(tree):
        if isinstance(node, ast.If):
            if isinstance(node.test, ast.Compare):
                if isinstance(node.test.left, ast.Name) and isinstance(node.test.comparators[0], ast.Str):
                    if node.test.left.id == "__name__" and node.test.comparators[0].s == "__main__":
                        start_line = node.lineno - 1
                        end_line = node.end_lineno
                        main_block = "".join(file_content.splitlines(keepends=True)[start_line:end_line])
    return main_block

def retireve_all_parse_args(filepath):
    """
    It takes a file path and returns the arguments of the parser in the file
    returns: A List of dictionaries containing the arguments of the parser
    [
        {
            "name":"arg1",
            "type":"int",
            "default":None,
            "required":True,
            "help":"This is arg1"
        },
        ...
    ]
    """
    main_block = read_main_block(filepath)
    tree = ast.parse(main_block)
    parse_args = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            if isinstance(node.func, ast.Attribute):
                if isinstance(node.func.value, ast.Name) and node.func.value.id == "parser" and node.func.attr == "add_argument":
                    arg_name = node.args[0].s
                    arg_type = node.keywords[0].value.id
                    arg_default = None
                    arg_required = True
                    arg_help = None
                    for keyword in node.keywords:
                        if keyword.arg == "default":
                            arg_default = keyword.value.s
                        if keyword.arg == "required":
                            arg_required = keyword.value.value
                        if keyword.arg == "help":
                            arg_help = keyword.value.s
                    parse_args.append({
                        "name":arg_name.split("--")[1],
                        "type":arg_type,
                        "default":arg_default,
                        "required":arg_required,
                        "help":arg_help
                    })
    return parse_args

## Wrapping Up

In [120]:
def process_project_folder(folderpath):
    """
    It takes a folder path and processes all the python files in the folder
    """
    # Extract project name from folder path
    project_name = folderpath.split("/")[2]
    # Creating a folder to store the modified files
    try:
        os.mkdir(f"./interim_projects/{project_name}")
    except FileExistsError:
        print("Interim folder already exists")

    # Creating log embedding workflow object
    log_embedder = LogEmbedderWorkflow(paramcount=7)
                                
    # Dictionary to store hash value to line number mapping for all the files in the project
    hash_to_lineno_fullproj = {}
    # Dictionary to store the arguments of the parser in all the files in the project
    parse_args_fullproj = {}
    
     # Get all the python files in the folder
    python_files = glob.glob(f"{folderpath}/*.py")
    for file in python_files:
        read_filepath = file
        write_filepath = f"./interim_projects/{project_name}/{file.split('/')[-1]}"
        logging_filepath = f"./function_logs/{project_name}.log"
        print(f"Read File: {read_filepath} | Write File: {write_filepath} | Logging File: {logging_filepath}")
        # Process the file
        old_and_new_code_snippets,hash_to_lineno = log_embedder.run(read_filepath, write_filepath, logging_filepath)
        hash_to_lineno_fullproj.update(hash_to_lineno)

        parse_args = retireve_all_parse_args(read_filepath)
        parse_args_fullproj[read_filepath] = parse_args

        print("Log: Done")
    
    return hash_to_lineno_fullproj,parse_args_fullproj

## Testing Area

In [121]:
# _log_embedder_workflow = LogEmbedderWorkflow(paramcount=7)

In [122]:
# _old_and_new_code_snippets,_hash_to_lineno = _log_embedder_workflow.run(
#     read_filepath="./target_project/prog3.py",
#     write_filepath="./target_project/prog3_mod.py",
#     logging_filepath="./function_logs/hello_world.log"
# )

In [123]:
# _parse_args = retireve_all_parse_args("./target_project/prog3_mod.py")
# for arg in _parse_args:
#     print(arg)

In [124]:
_hash_to_lineno_fullproj, _parse_args_fullproj = process_project_folder("./target_projects/proj1")

Interim folder already exists
Read File: ./target_projects/proj1/lib1.py | Write File: ./interim_projects/proj1/lib1.py | Logging File: ./function_logs/proj1.log
Log: Modifying function: def cpu_bound(p):
Log: Modifying function: def memory_bound(p):
Log: Modifying function: def io_bound(p):
Log: Modifying function: def power_bound():
Log: Done
Log: Done
Read File: ./target_projects/proj1/lib2.py | Write File: ./interim_projects/proj1/lib2.py | Logging File: ./function_logs/proj1.log
Log: Modifying function: def bubble_sort(arr):
Log: Modifying function: def selection_sort(arr):
Log: Modifying function: def python_sort(arr):
Log: Modifying function: def generate_random_list(power_of_ten):
Log: Done
Log: Done
Read File: ./target_projects/proj1/main.py | Write File: ./interim_projects/proj1/main.py | Logging File: ./function_logs/proj1.log
Log: Done
Log: Done


In [125]:
_hash_to_lineno_fullproj

{17049: [4, 14, './target_projects/proj1/lib1.py'],
 19555: [16, 26, './target_projects/proj1/lib1.py'],
 41611: [28, 47, './target_projects/proj1/lib1.py'],
 9996: [49, 54, './target_projects/proj1/lib1.py'],
 13634: [3, 10, './target_projects/proj1/lib2.py'],
 19055: [12, 21, './target_projects/proj1/lib2.py'],
 4247: [23, 26, './target_projects/proj1/lib2.py'],
 9412: [28, 30, './target_projects/proj1/lib2.py']}

In [119]:
_parse_args_fullproj

{'./target_projects/proj1/lib1.py': [],
 './target_projects/proj1/lib2.py': [],
 './target_projects/proj1/main.py': [{'name': 'cpu_power',
   'type': 'int',
   'default': None,
   'required': True,
   'help': 'Power of CPU bound task'},
  {'name': 'memory_power',
   'type': 'int',
   'default': None,
   'required': True,
   'help': 'Power of memory bound task'},
  {'name': 'io_power',
   'type': 'int',
   'default': None,
   'required': True,
   'help': 'Power of I/O bound task'},
  {'name': 'power_of_10',
   'type': 'int',
   'default': None,
   'required': True,
   'help': 'Power of 10 for sorting list'}]}