In [None]:
import os
import json
from dotenv import load_dotenv

load_dotenv()

from langchain.embeddings import (
    HuggingFaceEmbeddings,
    OpenAIEmbeddings,
    HuggingFaceInstructEmbeddings,
)
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.document_loaders import TextLoader
from langchain.docstore.document import Document

from tqdm import tqdm

from approaches.azureretriver import AzureRetrieveApproach

os.environ["TOKENIZERS_PARALLELISM"] = "false"
AZURE_SEARCH_SERVICE = os.environ["AZURE_SEARCH_SERVICE"]
AZURE_SEARCH_TINY_INDEX = os.environ["AZURE_SEARCH_TINY_INDEX"]
AZURE_SEARCH_BIGGER_INDEX = os.environ["AZURE_SEARCH_BIGGER_INDEX"]

In [None]:
import inspect
import importlib.util
import os


def read_tags_file(file_path: str) -> list[dict]:
    with open(file_path, "r", errors="ignore") as file:
        lines = file.readlines()

    tags = []
    for line in lines:
        if line.startswith("!"):  # Skip metadata lines
            continue
        parts = line.split("\t")
        if len(parts) >= 4:
            tag_name = parts[0]
            file_name = parts[1]
            pattern = parts[2]
            tags.append(dict(tag_name=tag_name, file_name=file_name, pattern=pattern))

    return tags


def get_embeddings(text: str, normalize=True) -> list:
    embeddings.encode_kwargs = {"normalize_embeddings": normalize}
    return embeddings.embed_query(text)


def get_source_code(function_name, function_path, ctags_root_path):
    spec = importlib.util.spec_from_file_location(
        function_name, os.path.join(ctags_root_path, function_path)
    )
    foo = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(foo)
    return inspect.getsource(foo)


def create_code_file_text(metadatas: list[dict]):
    code_file_text = ""
    for i, metadata in enumerate(metadatas):
        code_file_text += f"==== File {i+1}/{len(metadata)} ====\n"
        code_file_text += f'File path: {metadata["file_name"]}\n'
        code_file_text += f'Tag name: {metadata["tag_name"]}\n'
        code_string = metadata.get("code")
        code_file_text += f"Code: {code_string}\n"
        code_file_text += "\n"
    return code_file_text

In [None]:
template = """You are a hupful bot that fuilfill the human' program task:

The following is releative code:
{code_file_text}

User: {user_prompt}
Ai:
"""

In [None]:
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)  # vector length 384
embedding_dimension = embeddings.client.get_sentence_embedding_dimension()

ctags_path = "./repo/aladdin/tags"
ctags_root_path = os.path.dirname(ctags_path)
assert os.path.isfile(ctags_path), "Please run `zsh download_example_rpo.sh` first"

azure_retriever = AzureRetrieveApproach()

In [None]:
import ast


class TagFinder(ast.NodeVisitor):
    def __init__(self, source_code: str, tag_name: str):
        self.source_code = source_code
        self.tag_name = tag_name
        self.found_code = None

    def visit_Assign(self, node: ast.Assign) -> None:
        for target in node.targets:
            if isinstance(target, ast.Name) and target.id == self.tag_name:
                self.found_code = ast.get_source_segment(self.source_code, node)


def find_tag_in_source(file_path: str, tag_name: str) -> str:
    with open(file_path, "r") as file:
        source_code = file.read()

    finder = TagFinder(source_code, tag_name)
    finder.visit(ast.parse(source_code))

    return finder.found_code

In [None]:
import ast
import tokenize
from io import BytesIO
from typing import List, Dict, Union, Optional, Any, Type, Callable
from typing_extensions import TypedDict
from rich import print

from pathlib import Path
from typing import Iterable, Iterator, List


def list_all_files(directory: str) -> Iterator[Path]:
    """Yield all files from the given directory."""
    return Path(directory).rglob("*")


def filter_files_by_type(
    files: Iterator[Path], file_types: Iterable[str]
) -> Iterable[Path]:
    """Filter files by the given file types."""
    return (file for file in files if file.suffix in file_types)


def get_file_paths(directory: str, file_types: List[str]) -> List[Path]:
    """Get file paths from the specified directory, filtered by the provided file types."""
    return list(filter_files_by_type(list_all_files(directory), file_types))


class DependencyData(TypedDict):
    modules: List[str]
    functions_called: List[str]
    global_vars: List[str]
    classes_used: List[str]
    function_codes: Dict[str, str]
    class_codes: Dict[str, str]


def extract_from_node(extractor: Type, node: ast.AST) -> List[str]:
    return extractor.extract(node)


class ImportExtractor:
    @staticmethod
    def extract(node: ast.AST) -> List[str]:
        if isinstance(node, ast.Import):
            return [n.name for n in node.names]
        elif isinstance(node, ast.ImportFrom):
            return [f"{node.module}.{n.name}" for n in node.names]
        return []


class FunctionCallExtractor:
    @staticmethod
    def extract(node: ast.AST) -> List[str]:
        if isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name):
                return [node.func.id]
            elif isinstance(node.func, ast.Attribute):
                return [node.func.attr]
        return []


class GlobalVarExtractor:
    @staticmethod
    def extract(node: ast.AST) -> List[str]:
        return node.names if isinstance(node, ast.Global) else []


class ClassUsageExtractor:
    @staticmethod
    def extract(node: ast.AST) -> List[str]:
        return [node.name] if isinstance(node, ast.ClassDef) else []


def extract_code_segment(
    code: str, node: Union[ast.FunctionDef, ast.ClassDef]
) -> Optional[str]:
    # Use the line numbers provided by the ast node
    start_line = node.lineno - 1  # AST line numbers are 1-based, lists are 0-based
    end_line = node.end_lineno if hasattr(node, "end_lineno") else start_line
    return "\n".join(code.splitlines()[start_line : end_line + 1])


def analyze_code(code: str) -> DependencyData:
    tree = ast.parse(code)

    data: DependencyData = {
        "modules": [],
        "functions_called": [],
        "global_vars": [],
        "classes_used": [],
        "function_codes": {},
        "class_codes": {},
    }

    extractors = {
        "modules": ImportExtractor,
        "functions_called": FunctionCallExtractor,
        "global_vars": GlobalVarExtractor,
        "classes_used": ClassUsageExtractor,
    }

    for key, extractor in extractors.items():
        for node in ast.walk(tree):
            data[key].extend(extract_from_node(extractor, node))

    for node in tree.body:  # Only top-level nodes
        if isinstance(node, ast.FunctionDef) and not node.name.startswith("_"):
            data["function_codes"][node.name] = extract_code_segment(code, node)
        elif isinstance(node, ast.ClassDef):
            data["class_codes"][node.name] = extract_code_segment(code, node)

    return data

In [None]:
from langchain.chat_models import AzureChatOpenAI
from langchain.schema import HumanMessage, AIMessage

llm = AzureChatOpenAI(
    azure_deployment=os.environ.get("DEPLOYMENT_NAME"),
    temperature=0.5,
)

SUMMARY_TEMPLATE = """\
Please summary the following code in 50 tokens:
```
{code}
```

summary:
"""


def summary_code(llm: AzureChatOpenAI, code: str) -> str:
    if not code:
        return ""
    message = HumanMessage(content=SUMMARY_TEMPLATE.format(code=code))
    return llm([message]).content

In [8]:
import time
import re
from utils.tagreader import read_tags_file

azure_retriever.create_index(
    indedx_name="poc_aladdin-2023-12-03", embedding_dimension=embedding_dimension
)

ctags_root_path = os.path.dirname(ctags_path)
tags = read_tags_file(ctags_path, [".py"])

print(f"Total tags: {len(tags)}")

# tags = tags[:10]

documents = []
idx = 0
for tag in tqdm(tags):
    if tag["file_name"].endswith(".py"):
        file_path = os.path.normpath(os.path.join(ctags_root_path, tag["file_name"]))
        with open(file_path, "r") as source:
            code = source.read()
        data = analyze_code(code)

        for name, val in data["class_codes"].items():
            if tag["tag_name"] in val:
                tag["code"] = val
            else:
                tag["code"] = ""

        # if tag['tag_kind'] == 'f' or tag['tag_kind'] == 'c':
        #     code = tag.get('code')
        #     tag['summary'] = summary_code(code)
        # else:
        #     tag['summary'] = ''

    documents.append(
        dict(
            id=str(idx),
            title=f"{tag['file_name']} | {tag['tag_name']}",
            metadata=json.dumps(tag),
            content=tag["summary"],
            category="code",
            titleVector=get_embeddings(f"{tag['file_name']} | {tag['tag_name']}"),
            contentVector=get_embeddings(tag["summary"]),
        )
    )
    idx += 1

poc_aladdin-2023-12-03 created


  0%|          | 0/1990 [00:00<?, ?it/s]


KeyError: 'summary'

In [None]:
import time
import re
from utils.tagreader import read_tags_file

azure_retriever.create_index(
    indedx_name="poc_aladdin-2023-12-03", embedding_dimension=embedding_dimension
)

ctags_root_path = os.path.dirname(ctags_path)
tags = read_tags_file(ctags_path, [".py"])

print(f"Total tags: {len(tags)}")

# tags = tags[:10]

documents = []
idx = 0
for tag in tqdm(tags):
    if tag["file_name"].endswith(".py"):
        file_path = os.path.normpath(os.path.join(ctags_root_path, tag["file_name"]))
        with open(file_path, "r") as source:
            code = source.read()
        data = analyze_code(code)

        for name, val in data["class_codes"].items():
            if tag["tag_name"] in val:
                tag["code"] = val
            else:
                tag["code"] = ""

        # if tag['tag_kind'] == 'f' or tag['tag_kind'] == 'c':
        #     code = tag.get('code')
        #     tag['summary'] = summary_code(code)
        # else:
        #     tag['summary'] = ''

    documents.append(
        dict(
            id=str(idx),
            title=f"{tag['file_name']} | {tag['tag_name']}",
            metadata=json.dumps(tag),
            content=tag["summary"],
            category="code",
            titleVector=get_embeddings(f"{tag['file_name']} | {tag['tag_name']}"),
            contentVector=get_embeddings(tag["summary"]),
        )
    )
    idx += 1

poc_aladdin-2023-12-03 created


  0%|          | 0/1990 [00:00<?, ?it/s]


KeyError: 'summary'

In [None]:
azure_retriever.create_index(
    indedx_name="poc_aladdin-2023-12-03", embedding_dimension=embedding_dimension
)
azure_retriever.batch_update(documents=documents, index_name="poc_aladdin-2023-12-03")

In [None]:
text = "How Do I moidfy search app pattern?"
results = azure_retriever.search(
    index_name="poc_aladdin-2023-12-03",
    vector=get_embeddings(text),
    fields="contentVector",
    top=10,
)

for result in results:
    print(f"Title: {result['title']}")
    print(f"Score: {result['@search.score']}")
    print(f"Metadata: {result['metadata']}")
    print(f"Content: {result['content']}")
    print(f"Category: {result['category']}\n")

In [None]:
text = "How Do I moidfy regex of searchapp_explainer routing pattern?"
results = azure_retriever.search(
    index_name="poc_aladdin-2023-12-03",
    vector=get_embeddings(text),
    fields="contentVector",
    top=10,
)

for result in results:
    print(f"Title: {result['title']}")
    print(f"Score: {result['@search.score']}")
    print(f"Metadata: {result['metadata']}")
    print(f"Content: {result['content']}")
    print(f"Category: {result['category']}\n")

In [None]:
from langchain.chat_models import AzureChatOpenAI
from langchain.schema import HumanMessage, AIMessage


def ask(user_prompt: str) -> str:
    # find docs similar to user_prompt
    results = azure_retriever.hybrid_reranking_search(
        index_name="poc_aladdin-2023-12-03",
        text=user_prompt,
        vector=get_embeddings(user_prompt),
        fields="contentVector",
        top=5,
    )

    for result in results:
        print(f"Title: {result['title']}")
        # print(f"Score: {result['@search.score']}")
        print(f"Metadata: {result['metadata']}")
        # print(f"Content: {result['content']}")
        # print(f"Category: {result['category']}\n")
    metadatas = []
    for result in results:
        result_dict = json.loads(result["metadata"])
        # if result_dict['file_name'].endswith(".py"):
        metadatas.append(result_dict)
    # metadatas = metadatas[:2]

    # TODO: Should handle token length limit here
    print(f"{metadatas = }")
    user_prompt = template.format(
        code_file_text=create_code_file_text(metadatas), user_prompt=user_prompt
    )
    print(f"💬 {user_prompt = }")

    # call openai api here
    message = HumanMessage(content=user_prompt)
    return llm([message]).content


llm = AzureChatOpenAI(
    azure_deployment=os.environ.get("DEPLOYMENT_NAME"),
    temperature=0.5,
)

In [None]:
user_question = "How to add new plugin for companion?"
result = ask(user_question)

print("=" * 20)
print(f"👩‍💻 : {user_question}")
print("=" * 20)
print(f"🤖 : {result}")

In [None]:
user_question = "How to modify event bus?"
result = ask(user_question)

print("=" * 20)
print(f"👩‍💻 : {user_question}")
print("=" * 20)
print(f"🤖 : {result}")

In [None]:
user_question = "How to add fake case for SEARCH_APP_PROMPTS"
result = ask(user_question)

print("=" * 20)
print(f"👩‍💻 : {user_question}")
print("=" * 20)
print(f"🤖 : {result}")