### reference
https://cookbook.openai.com/examples/code_search_using_embeddings

In [21]:
from dotenv import dotenv_values
from openai import OpenAI
import json

envs = dotenv_values("../.env")
openai = OpenAI(api_key = envs["OPENAI_API_KEY"])

from scipy.spatial.distance import cosine
def cosine_similarity(a, b):
    return 1 - cosine(a, b)

import tiktoken

def get_embedding(text, encoding = "cl100k_base", model = "text-embedding-3-small"):
    encoding = tiktoken.get_encoding(encoding)
    return openai.embeddings.create(input = encoding.encode(text), model = model).data[0].embedding

### helper

In [23]:
import pandas as pd
from pathlib import Path

DEF_PREFIXES = ['def ', 'async def ']
NEWLINE = '\n'

def get_function_name(code):
    """
    Extract function name from a line beginning with 'def' or 'async def'.
    """
    for prefix in DEF_PREFIXES:
        if code.startswith(prefix):
            return code[len(prefix): code.index('(')]


def get_until_no_space(all_lines, i):
    """
    Get all lines until a line outside the function definition is found.
    """
    ret = [all_lines[i]]
    for j in range(i + 1, len(all_lines)):
        if len(all_lines[j]) == 0 or all_lines[j][0] in [' ', '\t', ')']:
            ret.append(all_lines[j])
        else:
            break
    return NEWLINE.join(ret)


def get_functions(filepath):
    """
    Get all functions in a Python file.
    """
    with open(filepath, 'r') as file:
        all_lines = file.read().replace('\r', NEWLINE).split(NEWLINE)
        for i, l in enumerate(all_lines):
            for prefix in DEF_PREFIXES:
                if l.startswith(prefix):
                    code = get_until_no_space(all_lines, i)
                    function_name = get_function_name(code)
                    yield {
                        'code': code,
                        'function_name': function_name,
                        'filepath': filepath,
                    }
                    break


def extract_functions_from_repo(code_root):
    """
    Extract all .py functions from the repository.
    """
    code_files = list(code_root.glob('**/*.py'))

    num_files = len(code_files)
    print(f'Total number of .py files: {num_files}')

    if num_files == 0:
        print('Verify openai-python repo exists and code_root is set correctly.')
        return None

    all_funcs = [
        func
        for code_file in code_files
        for func in get_functions(str(code_file))
    ]

    num_funcs = len(all_funcs)
    print(f'Total number of functions extracted: {num_funcs}')

    return all_funcs

In [24]:
# Set user root directory to the 'openai-python' repository
# Assumes the 'openai-python' repository exists in the user's root directory
code_root = "/Volumes/t7/mac/gitProjects/openai-basics/data/sample_code_as_data/openai-python"

# Extract all functions from the repository
all_funcs = extract_functions_from_repo(Path(code_root))

all_funcs = all_funcs[:3]
all_funcs

Total number of .py files: 332
Total number of functions extracted: 348


[{'code': 'def test_pydantic_v1(session: nox.Session) -> None:\n    session.install("-r", "requirements-dev.lock")\n    session.install("pydantic<2")\n\n    session.run("pytest", "--showlocals", "--ignore=tests/functional", *session.posargs)\n',
  'function_name': 'test_pydantic_v1',
  'filepath': '/Volumes/t7/mac/gitProjects/openai-basics/data/sample_code_as_data/openai-python/noxfile.py'},
 {'code': 'def event_loop() -> Iterator[asyncio.AbstractEventLoop]:\n    loop = asyncio.new_event_loop()\n    yield loop\n    loop.close()\n\n',
  'function_name': 'event_loop',
  'filepath': '/Volumes/t7/mac/gitProjects/openai-basics/data/sample_code_as_data/openai-python/tests/conftest.py'},
 {'code': 'def client(request: FixtureRequest) -> Iterator[OpenAI]:\n    strict = getattr(request, "param", True)\n    if not isinstance(strict, bool):\n        raise TypeError(f"Unexpected fixture parameter type {type(strict)}, expected {bool}")\n\n    with OpenAI(base_url=base_url, api_key=api_key, _strict_

In [34]:
df = pd.DataFrame(all_funcs)
df["code_embedding"] = df["code"].apply(lambda x: get_embedding(x))
df

Unnamed: 0,code,function_name,filepath,code_embedding
0,def test_pydantic_v1(session: nox.Session) -> ...,test_pydantic_v1,/Volumes/t7/mac/gitProjects/openai-basics/data...,"[0.015467033721506596, 0.008634207770228386, 0..."
1,def event_loop() -> Iterator[asyncio.AbstractE...,event_loop,/Volumes/t7/mac/gitProjects/openai-basics/data...,"[0.0161242987960577, 0.031800419092178345, -0...."
2,def client(request: FixtureRequest) -> Iterato...,client,/Volumes/t7/mac/gitProjects/openai-basics/data...,"[-0.014911573380231857, 0.030834101140499115, ..."


In [52]:
def search_functions(df, code_query, n=3, pprint=True, n_lines=7):
    embedding = get_embedding(code_query)
    df['similarities'] = df.code_embedding.apply(lambda x: cosine_similarity(x, embedding))

    res = df.sort_values('similarities', ascending=False).head(n)
    if pprint:
        for r in res.iterrows():
            print(f"{r[1].filepath}:{r[1].function_name}  score={round(r[1].similarities, 3)}")
            print("\n".join(r[1].code.split("\n")[:n_lines]))
            print('-' * 70)

    return res

In [53]:
res = search_functions(df, 'fine-tuning input data validation logic', n=3)

(2, code              def client(request: FixtureRequest) -> Iterato...
function_name                                                client
filepath          /Volumes/t7/mac/gitProjects/openai-basics/data...
code_embedding    [-0.014911573380231857, 0.030834101140499115, ...
similarities                                               0.155826
Name: 2, dtype: object)
/Volumes/t7/mac/gitProjects/openai-basics/data/sample_code_as_data/openai-python/tests/conftest.py:client  score=0.156
def client(request: FixtureRequest) -> Iterator[OpenAI]:
    strict = getattr(request, "param", True)
    if not isinstance(strict, bool):
        raise TypeError(f"Unexpected fixture parameter type {type(strict)}, expected {bool}")

    with OpenAI(base_url=base_url, api_key=api_key, _strict_response_validation=strict) as client:
        yield client
----------------------------------------------------------------------
(0, code              def test_pydantic_v1(session: nox.Session) -> ...
function_name   