In [None]:
import os
from glob import glob
import pandas as pd
from tree_sitter import Language, Parser, Node
from openai.embeddings_utils import get_embedding, cosine_similarity
from typing import List
import tiktoken


class CppParser:
    def __init__(self, language_path: str, code_root: str):
        self.code_root = code_root
        self.language = Language(language_path, 'cpp')
        self.parser = Parser()
        self.parser.set_language(self.language)

    def get_function_name(self, node: Node) -> str:
        if node.type == 'identifier':
            return node.text.decode("utf-8")

        for child in node.children:
            name = self.get_function_name(child)
            if name:
                return name
        return None

    def find_identifier(self, node: Node):
        if node.type == 'identifier':
            return node
        for child in node.children:
            found = self.find_identifier(child)
            if found:
                return found
        return None

    def get_function_code(self, node, source_code):
        """
        Extract function code from a tree-sitter node of type 'function_definition'
        """
        start_byte = node.start_byte
        end_byte = node.end_byte
        return source_code[start_byte:end_byte].decode('utf-8')

    def get_functions(self, filepath):
        filepath = os.path.abspath(filepath)  # Ensure the filepath is an absolute path
        with open(filepath, "r", encoding="utf-8") as f:
            source_code = f.read().encode("utf-8")
        tree = self.parser.parse(source_code)
        root_node = tree.root_node

        def traverse(node, scope=None, class_name=None):
            if node.type in ['public', 'private', 'protected']:
                scope = node.type
            if node.type == 'class_specifier':
                class_name_node = self.find_identifier(node)
                if class_name_node:
                    class_name = class_name_node.text.decode("utf-8")

            if node.type == 'function_definition':
                function_name = self.get_function_name(node)
                code = source_code[node.start_byte:node.end_byte].decode("utf-8")

                yield {
                    "function_name": function_name,
                    "filepath": filepath,
                    "code": code
                }
            for child in node.children:
                yield from traverse(child)

        yield from traverse(root_node)

In [None]:
import openai
import time
from tenacity import retry, stop_after_attempt, wait_random_exponential

# Set your OpenAI API key
openai.api_key = "your_api_key"

# Define retry function with backoff
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def request_with_backoff(func, **kwargs):
    return func(**kwargs)

# Function for Completions endpoint
def completions_with_backoff_and_batching(prompts, model="curie", max_tokens=20, rate_limit_per_minute=3000):
    delay = 60.0 / rate_limit_per_minute
    stories = []

    for prompt in prompts:
        time.sleep(delay)
        response = request_with_backoff(openai.Completion.create, model=model, prompt=prompt, max_tokens=max_tokens)
        stories.append(prompt + response.choices[0].text)

    return stories

# Function for Embeddings endpoint
def embeddings_with_backoff_and_batching(texts, rate_limit_per_minute=3000):
    delay = 60.0 / rate_limit_per_minute
    embeddings = []

    for text in texts:
        time.sleep(delay)
        response = request_with_backoff(openai.Embedding.create, model="text-davinci-002", text=text)
        embeddings.append(response.embedding)

    return embeddings

# Function for Code endpoint
def code_with_backoff_and_batching(prompts, model="curie", max_tokens=20, rate_limit_per_minute=20):
    delay = 60.0 / rate_limit_per_minute
    code_responses = []

    for prompt in prompts:
        time.sleep(delay)
        response = request_with_backoff(openai.Code.create, model=model, prompt=prompt, max_tokens=max_tokens)
        code_responses.append(prompt + response.choices[0].text)

    return code_responses

# Function for Edit endpoint
def edit_with_backoff_and_batching(texts, model="curie", rate_limit_per_minute=20):
    delay = 60.0 / rate_limit_per_minute
    edited_responses = []

    for text in texts:
        time.sleep(delay)
        response = request_with_backoff(openai.Edit.create, model=model, text=text)
        edited_responses.append(response.edited_text)

    return edited_responses


In [None]:
from typing import List
import tiktoken

class TokenHandler:
    def __init__(self, model: str):
        self.model = model
        self.encoding = tiktoken.encoding_for_model(model)
    
    def count_tokens(self, text: str) -> int:
        token_list = self.encoding.encode(text)
        return len(token_list)

    def count_tokens_for_messages(self, messages: List[dict]) -> int:
        num_tokens = 0
        for message in messages:
            num_tokens += self.count_tokens(message["content"])
        return num_tokens
    
    def tokenize(self, text: str) -> List[str]:
        token_list = self.encoding.encode(text)
        return token_list

    def count_tokens(self, text: str) -> int:
        token_list = self.tokenize(text)
        return len(token_list)

In [None]:
from typing import Dict

class CostAnalyzer:
    def __init__(self, token_count: int):
        self.token_count = token_count
        self.models = {
            'Ada': {'v1': 0.0040, 'v2': 0.0004},
            'Babbage': {'v1': 0.0050},
            'Curie': {'v1': 0.0200},
            'Davinci': {'v1': 0.2000}
        }

    def calculate_cost(self, model: str, version: str) -> float:
        return (self.token_count / 1000) * self.models[model][version]

    def print_costs(self):
        print("Number of tokens: " + str(self.token_count) + "\n")
        print("MODEL        VERSION       COST")
        print("----------------------------------------")
        
        for model, versions in self.models.items():
            for version, price in versions.items():
                cost = self.calculate_cost(model, version)
                print(f"{model}\t\t{version}\t$ {cost:.8f}")
                
                
    def calculate_all_costs(self) -> Dict[str, float]:
        costs = {}
        for model, versions in self.models.items():
            for version, price in versions.items():
                cost = self.calculate_cost(model, version)
                costs[f"{model}_{version}"] = cost
        return costs

In [None]:
from typing import List, Union, Dict
from openai.embeddings_utils import get_embedding, cosine_similarity
import tiktoken

class EmbeddingHandler:
    def __init__(self, model: str, engine: str):
        self.model = model
        self.engine = engine
        self.embeddings = {}  # Store embeddings as a dictionary

    def create_embedding(self, text: str) -> Union[List[float], None]:
        try:
            embedding = get_embedding(text, engine=self.engine)
            self.embeddings[text] = embedding
            return embedding
        except Exception as e:
            print(f"Error creating embedding: {e}")
            return None

    def get_embedding(self, text: str) -> Union[List[float], None]:
        if text in self.embeddings:
            return self.embeddings[text]
        else:
            token_count = self.token_handler.count_tokens(text)
            # You can now use token_count for any purpose, like cost estimation.
            return self.create_embedding(text)

    def cosine_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
        return cosine_similarity(embedding1, embedding2)

    def search_similar(self, df, query: str, n: int = 3, pprint: bool = True, n_lines: int = 7) -> pd.DataFrame:
        query_embedding = self.get_embedding(query)

        if query_embedding is None:
            print("Error creating query embedding.")
            return pd.DataFrame()

        df['similarities'] = df.code_embedding.apply(lambda x: self.cosine_similarity(x, query_embedding))
        res = df.sort_values('similarities', ascending=False).head(n)

        if pprint:
            for r in res.iterrows():
                print(r[1].filepath + ":" + r[1].function_name + "  score=" + str(round(r[1].similarities, 3)))
                print("\n".join(r[1].code.split("\n")[:n_lines]))
                print('-' * 70)

        return res
    
    @staticmethod
    def get_token_count(text: str) -> int:
        tokenizer = tiktoken.get_encoding("cl100k_base")
        tokens = []
        try:
            tokens = list(tokenizer.encode(text))
        except TokenizerException as e:
            print(f"Error tokenizing text: {e}")
        return len(tokens)

In [None]:
# Set the paths
language_path = 'C:\\Users\\newhi\\source\\repos\\Tree-Sitter\\tree-sitter-cpp\\parser.dll' # Update this with the path to your tree-sitter language .so file
code_root = "D:\\TestCodeParse" # Update this with the path to your codebase
engine = "text-embedding-ada-002" # Or any other engine you want to use
model = "ada" # Or any other model you want to use

# Instantiate the classes
cpp_parser = CppParser(language_path, code_root)
embedding_handler = EmbeddingHandler(model, engine)

In [None]:
# Parse the codebase
functions_data = []
for filepath in glob(os.path.join(code_root, "**/*.cpp"), recursive=True):
    for function in cpp_parser.get_functions(filepath):
        functions_data.append(function)
df = pd.DataFrame(functions_data)

In [None]:
df

In [None]:
# Analyze the cost
n_tokens = sum(embedding_handler.get_token_count(func['code']) for func in functions_data)
cost_analyzer = CostAnalyzer(n_tokens)
cost_analyzer.print_costs()

In [None]:
# Create embeddings if the cost is okay
decision = input("Do you want to proceed with creating embeddings? (y/n): ")
if decision.lower() == 'y':
    df['code_embedding'] = df['code'].apply(embedding_handler.get_embedding)
else:
    print("Embeddings not created.")
