In [None]:
import openai
import pinecone
import os
pinecone.init(api_key="", environment="")
index = pinecone.Index(index_name='')
os.environ['OPENAI_API_KEY'] = ""
openai.api_key = ""

In [None]:
from pygments import lex
from pygments.lexers import GoLexer

def get_functions_and_imports(filename):
    with open(filename, 'r') as f:
        content = f.read()

    lexer = GoLexer()
    tokens = list(lex(content, lexer))

    functions = [value for ttype, value in tokens if "Name.Function" in str(ttype)]
    imports = [value.replace('"', '') for ttype, value in tokens if "Literal.String" in str(ttype) and value.startswith('"')]

    return functions, imports

# Usage
filename = "./company.go"
functions, imports = get_functions_and_imports(filename)
print("Functions:", functions)
print("Imports:", imports)


In [None]:

def pre_process_query(user_query):
    response = openai.Embedding.create(
        input=user_query.strip(),
        model="text-embedding-ada-002",
    )
    description_embeddings = response["data"][0]["embedding"]

    pinecone_result = index.query(vector=description_embeddings,
                                include_metadata=True,
                                top_k=10,
                                namespace="vicky")
    print(pinecone_result)


    for match in pinecone_result.matches:
        type = match.metadata["type"]
        summary = match.metadata["summary"]
        package = match.metadata["package"]
        if type in {"function", "type", "struct", "method", "interface"}:
            filename = match.metadata["filename"]
            imports = match.metadata["imports"]
            code = match.metadata["code"]
            context += f"{{ 'filename': '{filename}', 'type': {type}, 'package': '{package}', 'summary of the parent file': '{summary}', 'code': '{code}' }},"
        elif type == "file":
            filename = match.metadata["filename"]

            context += f"{{ 'filename': '{filename}', 'type': {type}, 'package': '{package}', 'summary of this file': '{summary}'  }},"
        elif type == "package":
            context += f"{{ 'type': {type}, 'package': '{package}', 'description of package': '{summary}'  }},"
    print(context)
    prompt_with_context = f"Answer this questions using the context from codebase (unless the question is a follow up on your previous response). {user_query} \n Context: {context}"

    return prompt_with_context
pre_process_query("explain the formula for calculating dilluted number of shares for individual security?")


In [None]:
#TEST QUERY

QUERY =  "explain the formula for calculating dilluted number of shares for individual security?"

response = openai.Embedding.create(
input=QUERY,
model="text-embedding-ada-002",
)
embeddings_vector = response["data"][0]["embedding"]

pinecone_result = index.query(vector=embeddings_vector,
                            include_metadata=True,
                            filter ={"isTest": {"$eq": False}},
                            top_k=10,
                            namespace='vicky')

                            
print(pinecone_result)


In [None]:
import os
import openai
import tiktoken
import time
from json import JSONDecodeError
import openai
import pinecone
import json
import os
pinecone.init(api_key="", environment="")
index = pinecone.Index(index_name='')

def get_tokens(string):
  encoding = tiktoken.get_encoding("cl100k_base")
  num_tokens = len(encoding.encode(string))
  return num_tokens


ALL_PACKAGES = ['numbers', 'cals']

preprocess_functions = [{
  "name": "search_code_in_vector_db",
  "description": """This function searches vector db for code snippets that can be used to answer user's query. It's arguments are
   'isTest' - which must is True, if the query mentions test files, and False otherwise. 
   'package_names' is optional parameter consisting of the packages that are mentioned in user's query. If no packages are mentioned in the query it should be null.
   'cleaned_query' Is a parameter that is used to create vector embeddings. It should be a modified original query, cleaned from all words that are not relevant to the query.
    E.g. If original query is 'explain the formula for calculating fully diluted (as its written in the code)', then cleaned query should be 'formula for calculating fully diluted' """,
  "parameters": {
    "type": "object",
    "properties": {
      "isTest": {
        "type": "boolean",
        "description": "This argument must be set to True, if in providing answer to user's query requires searching in test files, and False otherwise."
      },
      "package_names": {
        "type": "array",
        "description": "This parameters is the array of the packages that are mentioned in user's query. It should only be filled out if packages are specifically mentioned in user's query, otherwise it should be left null",
        'items': {
          'type': 'string',
          "description": "This represents individual package name that is mentioned in user's query. If no packages are mentioned in the query it should be null.", 
          "enum": ALL_PACKAGES
        }
      },
      "cleaned_query": {
        "type": "string",
        "description": """This is the query that has been cleaned of all the packages and other unnecessary words. 
        Since this query will be embedded, it needs to be cleaned of all the unnecessary words and have maximum meaning as embedding vector ."""
    },
  },
  'required': ['isTest', 'cleaned_query'],
  }
}]

preprocess_system_prompt = """You are a code search query processor. You will take queries from users that are interacting with a code search bot and process them to call
    a search_code_in_vector_db function that will call vector db to find code snippets that can be used to answer user's query."""

def create_chat_completion(user_input):
    return openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{
            "role": "system",
            "content": preprocess_system_prompt
        }, {
            "role": "user",
            "content": user_input.strip(),
        }],
        functions=preprocess_functions,
        function_call={"name": "search_code_in_vector_db"},
    )
internal_import_descriptions = {
    "usecases": "Business and Workflow logic",
    "domain": "Simple marshalling/unmarshalling with the database + some helper methods and deprecated calculations",
    "caplogic": "Graphql layer on top of usecases",
    "api": "The HTTP layer of the API, contains rproxies, middlewares etc, routing etc.",
    "hasura": "Deprecated package that houses clients and query/mutations.",
    "postgres": "Package that contains our postgres client and some queries/helpers.",
    "authx": "Package that contains authorization logic",
    "qmail": "Queueing mail system package, this is our entrypoint to sendgrid",
    "excel": "For rendering data into excel",
    "numbers": "For creating mini-exports with correct numerical values",
    "numbers/calcs": "For calculating every number in the system in one place",
    "e2e": "End-to-end test suite, for tests that exercise the stack at the top level",
}

def process_interal_packages(internal_imports):
    internal_packages = []
    for internal_import in internal_imports:
        if internal_import in internal_import_descriptions:
            internal_packages.append(f"{internal_import} ({internal_import_descriptions[internal_import]})")
        else:
            internal_packages.append(internal_import)
    return internal_packages

def process_matches(matches):
    context = ""
    json_output = [] 

    for match in matches:
        match_data = {} 
        type = match.metadata["type"]
        summary = match.metadata["summary"]
        package = match.metadata["package"]

        if type in {"function", "type", "struct", "method", "interface"}:
            filename = match.metadata["filename"]
            code = match.metadata["code"]
            internal_imports = process_interal_packages(match.metadata["internal_imports"])
            external_imports = match.metadata["external_imports"]

            context += f"{{ 'type': {type}, 'code': '{code}', 'interal imports': {internal_imports}, 'external imports':  {external_imports} , 'package': '{package}', 'origin file': {filename}, 'origin file description': '{summary}' }},"
      
            match_data = {
            'type': type,
             'code': code, 
             'interal import pkgs': internal_imports,  
            'external import pkgs': external_imports,  
             'package': package, 
             'origin file': filename, 
             'origin file description': summary}

        elif type == "file":
            filename = match.metadata["filename"]
            internal_imports = process_interal_packages(match.metadata["internal_imports"])
            external_imports = match.metadata["external_imports"]
            internal_imports = process_interal_packages(match.metadata["internal_imports"])
            function_names =  match.metadata["function_names"]

            context += f"{{ 'filename': '{filename}', 'type': {type}, 'package': '{package}',  'interal imports': {internal_imports}, 'external imports':  {external_imports} , 'functions in file': {function_names}, 'file description': '{summary}'  }},"

            # Storing data for JSON file
            match_data = {
                'type': type,
                'filename': filename,
                'package': package,
                'internal imports': internal_imports,
                'external imports': external_imports,
                'functions in file': function_names,
                'file description': summary
            }

        elif type == "package":
            context += f"{{ 'type': {type}, 'package': '{package}', 'description of package': '{summary}'  }},"
            
            match_data = {
                'type': type,
                'package': package,
                'package description': summary
            }
        
        json_output.append(match_data)  # appending individual match data to the list

    # Check if the file already exists
    if os.path.exists('context_data.json'):
        with open('context_data.json', 'r') as json_file:
            existing_data = json.load(json_file)
            json_output.extend(existing_data)

    # Writing the JSON output to a file
    with open('context_data.json', 'w') as json_file:
        json.dump(json_output, json_file, indent=4)

    return context


def index_query(vector, top_k, filter, namespace):
    query = index.query(vector=vector,
                       include_metadata=True,
                       top_k=top_k,
                       filter=filter,
                       namespace=namespace)
    return query



def query_to_func_args(user_input):
    isTest, cleaned_query, packages = False, user_input, ALL_PACKAGES

    for i in range(6):
        try:
            first_response = create_chat_completion(user_input)
            response_message = first_response["choices"][0]["message"]

            if response_message.get("function_call"):
                arguments = json.loads(response_message["function_call"]["arguments"])

                if arguments.get("isTest"):
                    isTest = arguments["isTest"]

                if arguments.get("package_names"):
                    packages = arguments["package_names"]

                if arguments.get("cleaned_query"):
                    cleaned_query = arguments["cleaned_query"]

                return isTest, cleaned_query, packages

        except JSONDecodeError as e:
            print(f'JSONDecodeError occurred: {e}')
            time.sleep(1)
            continue

    return first_response

def get_context_from_vectordb(embeddings_vector, filter):
  filter.update({"scope": {"$in": ["low"]}})

  with open("functions.json", "r") as f:
    functions_list = json.load(f)
                    
  pinecone_res = index.query(vector=embeddings_vector,
                      include_metadata=True,
                      top_k=8,
                      filter=filter,
                      namespace="vicky")

  context = ""
  for i, match in enumerate(pinecone_res.matches):
    match_data = {} 
    type = match.metadata["type"]
    package = match.metadata["package"]
    filename = match.metadata["filename"]
    code = match.metadata["code"]
    internal_imports = process_interal_packages(match.metadata["internal_imports"])
    external_imports = match.metadata["external_imports"]

    context += f"## {i+1}) Relevant {type} ##: "
    context += f"{{'code': '{code}', 'interal file imports': {internal_imports}, 'external file imports':  {external_imports} , 'package': '{package}', 'origin file': {filename}}},\n"

    functions_called = match.metadata["functions_called"]

    if functions_called:
      context += f"#Code of functions called by #: "
      for function_call in functions_called:
        try:
          if functions_list[function_call]:
            context += f"{{'code': '{functions_list[function_call]}'}}, \n"
        except:
          print('no func found')
    print(context)
  return context



def gpt_pre_process_query(user_query):
    isTest, cleaned_query, packages = query_to_func_args(user_query)

    embeddings_vector = openai.Embedding.create(
        input=cleaned_query,
        model="text-embedding-ada-002",
    )["data"][0]["embedding"]

    print(packages)

    if not isTest: 
      filter = {"isTest": {"$eq": False}}
    else:  
      filter = {}

    context = get_context_from_vectordb(embeddings_vector, filter)

    prompt_with_context = f"Answer this questions using the context from codebase (unless the question is a follow up on your previous response). {user_query} \n Context: {context}"
    return prompt_with_context


system_prompt = "You are helpful codebot that gets user query and a context from a golang BE codebase that must be used to answer user's query. You must analyze the code and provide a clear answer to the query."

def handle_user_query(user_input, messages_history):

  messages = [
    {
      "role": "system",
      "content": system_prompt
    },
  ]

  for msg in messages_history:
    role, content = msg
    if get_tokens(content) > 5000:
      content = content[:10000]

    messages.append({
      "role": role,
      "content": content,
    })

  messages.append({
    "role": "user",
    "content": user_input,
  })

  for i in range(6):
    try:
      response = openai.ChatCompletion.create(
        model="gpt-4-32k",
        messages=messages,
      )
      return response["choices"][0]["message"]['content']

    except JSONDecodeError as e:
      print(f'JSONDecodeError occurred: {e}')
      time.sleep(1)
      continue


QUERY = "explain the formula for calculating fully diluted (as it's written in the code)"
query_with_context = gpt_pre_process_query(QUERY)

response = handle_user_query(query_with_context, [])
print(response)


In [None]:
with open("functions.json", "r") as f:
    functions_list = json.load(f)   

functions_list['isZero']

In [None]:
with open("functions.json", 'r' ) as f:
    data = json.load(f)

lowercased_data = {k.lower(): v for k, v in data.items()}

with open("functions.json", 'w' ) as f:
    f.write(json.dumps(lowercased_data, indent=4))