# Embeddings and "tools" test

The purpose of this document is to test different approaches in the use of embeddings and tools to improve the context and objectives of the LLM project in terms of code structure and existing functionalities, in order to enhance the development of new code and functionalities.

In [None]:
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document


embeddings_model = OpenAIEmbeddings()
chroma_db = Chroma(
    collection_name="codebase",
    embedding_function=embeddings_model,
    persist_directory="./chroma_db",
)

## Pure embeddings

Although the use of simple embeddings can improve the LLM's context and give it "awareness" of certain existing code snippets that match the request, on their own, they do not provide significant value or context to the LLM, being useful only for identifying repeated or identical functionality in the code.

In cases where it is necessary to add a new feature to an existing code snippet (such as a new method in a class), it has been observed that relying solely on embeddings as a code source results in the LLM having great difficulty integrating the feature while keeping the rest of the code in the file intact, as it often needs to be entirely rewritten. This leads to incorrect code and/or the loss of other existing functionalities.

Two approaches were tested here:

   * Creating embeddings for each existing function in the code.
   * Creating embeddings by splitting the document into parts (e.g., 100 characters each).


### Creating embeddings for each existing function in code

To create embeddings for each function, we first need to go through each Python file and extract every function along with the code that makes it up. 

The following class is used to parse Python code and extract various aspects of it, such as imports, classes, functions, etc., into a JSON structure:

In [None]:
import ast

class PythonFileParser(ast.NodeVisitor):
    """Parse a Python file and return its structure."""

    def __init__(self, file_name):
        self.file_name = file_name
        self.imports = []
        self.classes = []
        self.functions = []
        self.constants = []
        self.main_block = False
        self.global_statements = []
        self.comments = []

    def _create_func_dict(self, func_node):
        # Pre-process the function returns
        if not func_node.returns:
            returns = None

        elif isinstance(func_node.returns, ast.Constant):
            returns = func_node.returns.value

        elif isinstance(func_node.returns, ast.BinOp):
            returns = [func_node.returns.left.id, func_node.returns.right.id]

        else:
            returns = func_node.returns.id

        return {
            "name": func_node.name,
            "start_line": func_node.lineno,
            "end_line": func_node.end_lineno,
            "parameters": [arg.arg for arg in func_node.args.args],
            "returns": returns,
        }

    def visit_Import(self, node):
        self.imports.append(
            {
                "module": node.names[0].name,
                "alias": None,
                "start_line": node.lineno,
                "end_line": node.end_lineno,
            }
        )
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        self.imports.append(
            {
                "module": node.module,
                "alias": node.names[0].name,
                "start_line": node.lineno,
                "end_line": node.end_lineno,
            }
        )
        self.generic_visit(node)

    def visit_ClassDef(self, node):
        tmp_class = {
            "name": node.name,
            "start_line": node.lineno,
            "end_line": node.end_lineno,
            "methods": [],
        }
        for func in node.body:
            if isinstance(func, ast.FunctionDef):
                method_dict = self._create_func_dict(func)
                tmp_class["methods"].append(method_dict)
                func.is_method = True
        self.classes.append(tmp_class)
        self.generic_visit(node)

    def visit_FunctionDef(self, node):
        if not hasattr(node, "is_method"):
            self.functions.append(self._create_func_dict(node))
        self.generic_visit(node)

    def visit_Assign(self, node):
        if isinstance(node.targets[0], ast.Name):
            value = self._simplify_value(node.value)
            const_dict = {
                "name": node.targets[0].id,
                "start_line": node.lineno,
                "end_line": node.end_lineno,
            }
            if value is not None:
                const_dict["value"] = value
            self.constants.append(const_dict)
        self.generic_visit(node)

    def visit_If(self, node):
        if (
            isinstance(node.test, ast.Compare)
            and isinstance(node.test.left, ast.Name)
            and node.test.left.id == "__name__"
            and any(isinstance(op, ast.Eq) for op in node.test.ops)
            and any(
                isinstance(cmp, ast.Str) and cmp.s == "__main__"
                for cmp in node.test.comparators
            )
        ):
            self.main_block = True
        self.generic_visit(node)

    def visit_Expr(self, node):
        if isinstance(node.value, ast.Str):
            self.comments.append(
                {
                    "type": "docstring",
                    "content": node.value.s,
                    "start_line": node.lineno,
                    "end_line": node.end_lineno,
                }
            )
        self.generic_visit(node)

    def visit(self, node):
        if isinstance(node, ast.Expr) and isinstance(node.value, ast.Str):
            self.comments.append(
                {
                    "type": "docstring",
                    "content": node.value.s,
                    "start_line": node.lineno,
                    "end_line": node.end_lineno,
                }
            )
        else:
            super().visit(node)

    def visit_Module(self, node):
        for n in node.body:
            if isinstance(n, ast.Expr) and isinstance(n.value, ast.Str):
                self.comments.append(
                    {
                        "type": "docstring",
                        "content": n.value.s,
                        "start_line": n.lineno,
                        "end_line": n.end_lineno,
                    }
                )
            else:
                self.visit(n)

    def visit_Global(self, node):
        self.global_statements.append(
            {
                "type": "global",
                "identifiers": node.names,
                "start_line": node.lineno,
                "end_line": node.end_lineno,
            }
        )
        self.generic_visit(node)

    def get_structure(self):
        return {
            "file_name": self.file_name,
            "imports": self.imports,
            "classes": self.classes,
            "functions": self.functions,
            "constants": self.constants,
            "main_block": self.main_block,
            "global_statements": self.global_statements,
            "comments": self.comments,
        }

    def _simplify_value(self, value):
        if isinstance(
            value, (ast.Str, ast.Num, ast.Constant)
        ):  # Python 3.8+ uses ast.Constant
            return value.value if hasattr(value, "value") else value.n
        elif isinstance(value, ast.NameConstant):
            return value.value
        return None

Which can be invoked using the parse_python_file function.

In [None]:
import os

def parse_python_file(file_path: str) -> str | dict:
    """Parses a Python file and returns its structure."""
    if os.path.isdir(file_path):
        return "IS A DIRECTORY"

    with open(file_path, "r") as source:
        file_content = source.read()
    parser = PythonFileParser(file_name=file_path)
    tree = ast.parse(file_content, file_path)
    parser.visit(tree)
    return parser.get_structure()

##### parse_python_file call result

Below is the (simplified) result of executing the parse_python_file function on a Python file with imports, classes, and other functions.

```python
{
  "file_name": "./utilities.py",
  "imports": [
    {
      "module": "ast",
      "alias": null,
      "start_line": 1,
      "end_line": 1
    },
    {
      "module": "cProfile",
      "alias": null,
      "start_line": 2,
      "end_line": 2
    }
  ],
  "classes": [
    {
      "name": "PythonFileParser",
      "start_line": 57,
      "end_line": 230,
      "methods": [
        {
          "name": "__init__",
          "start_line": 60,
          "end_line": 68,
          "parameters": ["self", "file_name"],
          "returns": null
        },
        {
          "name": "_create_func_dict",
          "start_line": 70,
          "end_line": 90,
          "parameters": ["self", "func_node"],
          "returns": null
        }
      ]
    }
  ],
  "functions": [
    {
      "name": "get_project_files",
      "start_line": 15,
      "end_line": 29,
      "parameters": ["directory_path"],
      "returns": "str"
    },
    {
      "name": "get_lines_code",
      "start_line": 32,
      "end_line": 42,
      "parameters": ["file_path", "start_line", "end_line"],
      "returns": "str"
    }
  ],
  "constants": [
    {
      "name": "result",
      "start_line": 17,
      "end_line": 17,
      "value": ""
    }
  ],
  "comments": [
    {
      "type": "docstring",
      "content": "List all Python (.py) files and directories that may contain Python files.",
      "start_line": 16,
      "end_line": 16
    }
  ]
}
```

The following function uses the start and end of the lines of code in the previous structure to retrieve the code from the Python file at the specified lines.

In [None]:
def get_lines_code(file_path: str, start_line: int, end_line: int) -> str:
    """Get specific lines of code from a file."""
    if not os.path.exists(file_path):
        return f"{file_path} FILE NOT FOUND"

    elif os.path.isdir(file_path):
        return "IS A DIRECTORY"

    with open(file_path, "r") as f:
        lines = f.readlines()
    return "\n".join(map(lambda s: s.strip(), lines[start_line:end_line]))

To generate the embeddings for each file in the project, we still need a function that allows us to list files in a directory:

In [None]:
def get_project_files(directory_path: str = ".") -> str:
    """List all Python (.py) files and directories that may contain Python files."""
    result = ""
    try:
        for file in os.listdir(directory_path):
            full_path = os.path.join(directory_path, file)
            if os.path.isdir(full_path):
                result += f"{file}/\n"
            elif file.endswith(".py"):
                result += f"{file}\n"
    except NotADirectoryError:
        result = "NOT A DIRECTORY"
    except FileNotFoundError:
        result = "FILE NOT FOUND"
    return result

Now that we have the necessary functions, we can create embeddings for each existing function in each Python file:

In [None]:
def store_embeddings(content: str, metadata: list[dict] = None) -> None:
    """Store embeddings of content with associated metadata."""
    embedding_vector = embeddings_model.embed_query(content)

    if metadata and not isinstance(metadata, list):
        metadata = [metadata]

    chroma_db.add_texts([content], embeddings=[embedding_vector], metadatas=metadata)
    
def analyze_and_store_code(file_path: str):
    """Analyze a Python file and store its embeddings, including functions and class methods."""
    structure = parse_python_file(file_path)

    # Store embeddings for standalone functions
    for func in structure["functions"]:
        code_snippet = get_lines_code(
            file_path, func["start_line"], func["end_line"]
        )
        metadata = {"name": func["name"], "type": "function", "file": file_path}
        store_embeddings(code_snippet, metadata)

    # Store embeddings for classes and their methods
    for cls in structure.get("classes", []):
        class_snippet = get_lines_code(
            file_path, cls["start_line"], cls["end_line"]
        )
        class_metadata = {"name": cls["name"], "type": "class", "file": file_path}

        # Store the class code snippet
        store_embeddings(class_snippet, class_metadata)

        # Store methods within the class
        for method in cls.get("methods", []):
            method_snippet = get_lines_code(
                file_path, method["start_line"], method["end_line"]
            )
            method_metadata = {
                "name": method["name"],
                "type": "method",
                "class": cls["name"],
                "file": file_path,
            }
            store_embeddings(method_snippet, method_metadata)
            
# Preprocess: Store embeddings for all project files
for file in get_project_files().strip().split("\n"):
    analyze_and_store_code(file)

#### Results:

Although the results are useful for understanding repeated functions or those with similar functionality, this method needs to be further developed. Moreover, when used alone, it does not address the issues raised by the exclusive use of embeddings. Two proposed strategies to enhance the value of the embeddings are:

* Adding docstrings and typing for greater context for the LLM and improved results in the embeddings.
* Including metadata such as the file and line numbers where it exists, a brief explanation of the code, objectives, and examples of output.

It is believed that adding this data to the embeddings improves the results since the LLM does not always search for embeddings by the specific name of the function or by code snippets, but also by expressions or phrases.
Although this may improve the results of the embeddings and their usefulness, they need to be complemented with other 'tools' that the LLM can use when it deems appropriate.

Note that both docstrings and metadata can (or should) be generated by the LLM itself during an initial process of interpreting and exploring the project.
Another issue that arises from this method is removing old embeddings and updating them with the most recent ones as the project's code is developed or altered.

### Creating embeddings by splitting the document into parts

This other tested method is quite similar to the previous one; however, there is no interpretation of the Python code now. Instead, the file is split into parts of a specified size, and the respective embeddings are calculated and stored.

In [None]:
def load_file(python_file_path):
    with open(python_file_path, "r") as python_file:
        source = python_file.read()

    # Split documents text
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=100,
        chunk_overlap=100,
        length_function=len,
        add_start_index=True,
    )
    chunks = text_splitter.split_documents(
        [Document(source, metadata={"source": python_file_path})]
    )

    # Save to chroma database
    chroma_db.from_documents(chunks, embeddings_model, persist_directory="./chroma_db")

    print(f"{len(chunks)} chunks persisted")

Now that we have the necessary functions, we can create embeddings for each existing Python file:

In [None]:
for file in get_project_files().strip().split("\n"):
    load_file(file)

#### Results:

Although this method seems to solve the problem of losing code when asking the LLM to add a new functionality, forcing it to rewrite a file, it is not an absolute method because:

* It depends on the size of the embedding splits, which can accommodate an entire file in a single embedding or in parts.
* It depends on the dimension of the code file, where the request for embeddings may not result in the complete code from the embeddings, thus maintaining the problem of code loss.
* It depends on the terms used by the LLM to search within the embeddings, where results for the file being altered may not even appear.

Additionally, there is the difficulty of improving the context of the embeddings by adding docstrings or code descriptions, as the file is split into pieces, resulting in a loss of flow and logic in the code snippets.

**Definitely, this should be a method to avoid if we want to make the most out of the embeddings.**