#### Extracting Python Code

##### 独自のopenai-python コード リポジトリを使用します。
- 埋め込み、インデックス作成、クエリが可能な、Python ファイルからのファイル解析と
- 関数の抽出のシンプルなバージョンを実装します。
- https://github.com/openai/openai-python
- *
- 埋め込み、
- インデックス作成、
- クエリが可能な、Python ファイルからのファイル解析と
- 関数の抽出のシンプルなバージョンを実装します。

In [None]:
# Helper Function
import pandas as pd
from pathlib import Path

DEF_PREFIXES = ['def ', 'async def ']
NEWLINE = '\n'

def get_function_name(code):
    """
    Extract function name from a line beginning with 'def' or 'async def'.
    """
    for prefix in DEF_PREFIXES:
        if code.startswith(prefix):
            return code[len(prefix): code.index('(')]


def get_until_no_space(all_lines, i):
    """
    Get all lines until a line outside the function definition is found.
    """
    ret = [all_lines[i]]
    for j in range(i + 1, len(all_lines)):
        if len(all_lines[j]) == 0 or all_lines[j][0] in [' ', '\t', ')']:
            ret.append(all_lines[j])
        else:
            break
    return NEWLINE.join(ret)


def get_functions(filepath):
    """
    Get all functions in a Python file.
    """
    with open(filepath, 'r') as file:
        all_lines = file.read().replace('\r', NEWLINE).split(NEWLINE)
        for i, l in enumerate(all_lines):
            for prefix in DEF_PREFIXES:
                if l.startswith(prefix):
                    code = get_until_no_space(all_lines, i)
                    function_name = get_function_name(code)
                    yield {
                        'code': code,
                        'function_name': function_name,
                        'filepath': filepath,
                    }
                    break


def extract_functions_from_repo(code_root):
    """
    Extract all .py functions from the repository.
    """
    code_files = list(code_root.glob('**/*.py'))

    num_files = len(code_files)
    print(f'Total number of .py files: {num_files}')

    if num_files == 0:
        print('Verify openai-python repo exists and code_root is set correctly.')
        return None

    all_funcs = [
        func
        for code_file in code_files
        for func in get_functions(str(code_file))
    ]

    num_funcs = len(all_funcs)
    print(f'Total number of functions extracted: {num_funcs}')

    return all_funcs

# Load Data ----------------------------------------------------
# Set user root directory to the 'openai-python' repository
root_dir = Path.home()

# Assumes the 'openai-python' repository exists in the user's root directory
code_root = root_dir / 'openai-python'

# Extract all functions from the repository
all_funcs = extract_functions_from_repo(code_root)

# text-embedding-3-smallベクトル埋め込み ---------------------------
from utils.embeddings_utils import get_embedding

df = pd.DataFrame(all_funcs)
df['code_embedding'] = df['code'].apply(lambda x: get_embedding(x, model='text-embedding-3-small'))
df['filepath'] = df['filepath'].map(lambda x: Path(x).relative_to(code_root))
df.to_csv("data/code_search_openai-python.csv", index=False)
df.head()


#### Python codeの抜き出し

In [3]:
# 
import re

# ファイルを読み込む
with open('./text/embedding.txt', 'r') as file:
    content = file.read()

# 正規表現を使って、```pythonと```の間にあるPythonコードを抽出する
pattern = re.compile(r'```python(.*?)```', re.DOTALL)
matches = pattern.findall(content)

# 抽出されたコードを変数に代入する
programs = []
for match in matches:
    programs.append(match.strip())

# 結果を確認
for i, program in enumerate(programs):
    print(f"Program {i+1}:--------------------\n{program}\n")

Program 1:--------------------
from openai import OpenAI
client = OpenAI()

response = client.embeddings.create(
    input="Your text string goes here",
    model="text-embedding-3-small"
)

print(response.data[0].embedding)

Program 2:--------------------
from openai import OpenAI
client = OpenAI()

def get_embedding(text, model="text-embedding-3-small"):
   text = text.replace("\n", " ")
   return client.embeddings.create(input = [text], model=model).data[0].embedding

df['ada_embedding'] = df.combined.apply(lambda x: get_embedding(x, model='text-embedding-3-small'))
df.to_csv('output/embedded_1k_reviews.csv', index=False)

Program 3:--------------------
import pandas as pd

df = pd.read_csv('output/embedded_1k_reviews.csv')
df['ada_embedding'] = df.ada_embedding.apply(eval).apply(np.array)

Program 4:--------------------
from openai import OpenAI
import numpy as np

client = OpenAI()

def normalize_l2(x):
    x = np.array(x)
    if x.ndim == 1:
        norm = np.linalg.norm(x)
     

In [None]:
# AST
import re
import ast

# ファイルを読み込む
with open('./text/embedding.txt', 'r') as file:
    content = file.read()

# 正規表現を使って、```pythonと```の間にあるPythonコードを抽出する
pattern = re.compile(r'```python(.*?)```', re.DOTALL)
matches = pattern.findall(content)

# ASTを使ってチャンクを抽出する関数
def extract_chunks(code):
    chunks = []
    tree = ast.parse(code)
    
    current_chunk = []
    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
            if current_chunk:
                chunks.append(current_chunk)
                current_chunk = []
            current_chunk.append(node)
        elif isinstance(node, ast.Call):
            if current_chunk:
                chunks.append(current_chunk)
                current_chunk = []
            current_chunk.append(node)
        elif isinstance(node, ast.stmt):
            current_chunk.append(node)
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks

# コードをチャンク化する
all_chunks = []
for match in matches:
    code = match.strip()
    chunks = extract_chunks(code)
    all_chunks.extend(chunks)

# チャンクを元のコードに戻す関数
def unparse_chunk(chunk):
    return "\n".join(ast.unparse(node) for node in chunk)

# 各チャンクを表示
for i, chunk in enumerate(all_chunks):
    print(f"Chunk {i+1}:\n{unparse_chunk(chunk)}\n")

# ここで、ベクトル化のためのコードを追加する
# 例: 各チャンクを文字列として保存し、ベクトル化する準備を行う
chunks_as_strings = [unparse_chunk(chunk) for chunk in all_chunks]

# ベクトル化の例（ここでは簡単に文字列の長さをベクトルとして使用）
vectors = [len(chunk) for chunk in chunks_as_strings]

# 結果を確認
for i, vector in enumerate(vectors):
    print(f"Vector {i+1}: {vector}")


In [None]:
# Final python code
import re
import ast
from openai import OpenAI
import numpy as np
import pandas as pd
from sklearn.preprocessing import normalize

# OpenAI APIキーを設定
# openai.api_key = 'your-api-key'

# ファイルを読み込む
with open('./text/embedding.txt', 'r') as file:
    content = file.read()

# 正規表現を使って、```pythonと```の間にあるPythonコードを抽出する
pattern = re.compile(r'```python(.*?)```', re.DOTALL)
matches = pattern.findall(content)

# 抽出されたコードを変数に代入する
programs = []
for match in matches:
    programs.append(match.strip())

# ASTを使って関数レベルでチャンクを抽出する関数
def extract_functions(code):
    functions = []
    tree = ast.parse(code)
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            functions.append(ast.unparse(node))
    return functions

# 関数レベルでのチャンク化
all_functions = []
for program in programs:
    functions = extract_functions(program)
    all_functions.extend(functions)

# 各関数をベクトル化する関数
def get_embedding(text):
    client = OpenAI()
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

# 各関数をベクトル化
function_embeddings = []
for func in all_functions:
    embedding = get_embedding(func)
    function_embeddings.append(embedding)

# ベクトルデータをデータフレームに変換
df = pd.DataFrame(function_embeddings)
df['function'] = all_functions

# ベクトルデータの正規化
normalized_embeddings = normalize(df.drop(columns=['function']), axis=1)
df_normalized = pd.DataFrame(normalized_embeddings)
df_normalized['function'] = df['function']

# Vector Storeへの登録（ここではFaissを例に使用）
import faiss

# Faissインデックスを作成
d = df_normalized.shape[1] - 1  # ベクトルの次元数
index = faiss.IndexFlatL2(d)

# ベクトルをFaissインデックスに追加
index.add(np.array(df_normalized.drop(columns=['function'])))

# Faissインデックスを保存
faiss.write_index(index, 'function_embeddings.index')

# 結果を確認
print(df_normalized.head())

# 検索機能の実装例
def search_function(query, k=5):
    query_embedding = get_embedding(query)
    query_embedding = normalize([query_embedding], axis=1)
    D, I = index.search(np.array(query_embedding).astype(np.float32), k)
    return df_normalized.iloc[I[0]]['function'].tolist()

# サンプル検索
query = "データを読み込む関数"
results = search_function(query)
print("検索結果:")
for result in results:
    print(result)


In [None]:
# Final doc faiss 
import re
import openai
import numpy as np
import pandas as pd
from sklearn.preprocessing import normalize
import faiss

# OpenAI APIキーを設定
# openai.api_key = 'your-api-key'

# ファイルを読み込む
with open('./text/text_generation.txt', 'r') as file:
    content = file.read()

# 段落ごとに分割
paragraphs = re.split(r'\n\s*\n', content)

# コードスニペットを抽出する正規表現
code_pattern = re.compile(r'```python(.*?)```', re.DOTALL)

# 段落とコードスニペットをリストに格納
chunks = []
for paragraph in paragraphs:
    code_matches = code_pattern.findall(paragraph)
    if code_matches:
        for code in code_matches:
            chunks.append(code.strip())
    else:
        chunks.append(paragraph.strip())

# 各チャンクをベクトル化する関数
def get_embedding(text):
    client = OpenAI()
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

# 各チャンクをベクトル化
chunk_embeddings = []
for chunk in chunks:
    embedding = get_embedding(chunk)
    chunk_embeddings.append(embedding)

# ベクトルデータをデータフレームに変換
df = pd.DataFrame(chunk_embeddings)
df['chunk'] = chunks

# ベクトルデータの正規化
normalized_embeddings = normalize(df.drop(columns=['chunk']), axis=1)
df_normalized = pd.DataFrame(normalized_embeddings)
df_normalized['chunk'] = df['chunk']

# Faissインデックスを作成
d = df_normalized.shape[1] - 1  # ベクトルの次元数
index = faiss.IndexFlatL2(d)

# ベクトルをFaissインデックスに追加
index.add(np.array(df_normalized.drop(columns=['chunk'])))

# Faissインデックスを保存
faiss.write_index(index, 'chunk_embeddings.index')

# 結果を確認
print(df_normalized.head())

# 検索機能の実装例
def search_chunk(query, k=5):
    query_embedding = get_embedding(query)
    query_embedding = normalize([query_embedding], axis=1)
    D, I = index.search(np.array(query_embedding).astype(np.float32), k)
    return df_normalized.iloc[I[0]]['chunk'].tolist()

# サンプル検索
query = "テキスト生成モデルの概要"
results = search_chunk(query)
print("検索結果:")
for result in results:
    print(result)


In [None]:
# Chorome DB
# pip install chromedb
# chrome_db_api_key = 'your-chromedb-api-key'
# chrome_db_url = 'your-chromedb-url'
# Final doc ChromeDB 
import re
import openai
import numpy as np
import pandas as pd
from sklearn.preprocessing import normalize
import chromadb
from chromadb.utils import embedding_functions

# OpenAI APIキーを設定
# openai.api_key = 'your-api-key'

# ファイルを読み込む
with open('./text/text_generation.txt', 'r') as file:
    content = file.read()

# 段落ごとに分割
paragraphs = re.split(r'\n\s*\n', content)

# コードスニペットを抽出する正規表現
code_pattern = re.compile(r'```python(.*?)```', re.DOTALL)

# 段落とコードスニペットをリストに格納
chunks = []
for paragraph in paragraphs:
    code_matches = code_pattern.findall(paragraph)
    if code_matches:
        for code in code_matches:
            chunks.append(code.strip())
    else:
        chunks.append(paragraph.strip())

# OpenAI Embedding 関数を設定
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="your-openai-api-key",
    model_name="text-embedding-3-small"
)

# ChromaDBクライアントを作成
client = chromadb.Client()

# コレクションを作成
collection = client.create_collection(name="text_generation_chunks", embedding_function=openai_ef)

# チャンクをコレクションに追加
collection.add(
    documents=chunks,
    ids=[f"chunk_{i}" for i in range(len(chunks))]
)

# 検索機能の実装例
def search_chunk(query, k=5):
    results = collection.query(
        query_texts=[query],
        n_results=k
    )
    return results['documents'][0]

# サンプル検索
query = "テキスト生成モデルの概要"
results = search_chunk(query)
print("検索結果:")
for result in results:
    print(result)

## Milvau の場合
# Final doc Milvus
- docker run -d --name milvus-standalone -p 19530:19530 -p 9091:9091 milvusdb/milvus:latest


In [None]:
# case Milvus
# !pip install pymilvus
"""docker run -d --name milvus-standalone -p 19530:19530 -p 9091:9091 milvusdb/milvus:latest"""
import re
import openai
import numpy as np
import pandas as pd
from sklearn.preprocessing import normalize
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

# Milvusサーバーに接続
connections.connect("default", host="localhost", port="19530")

# Milvusの設定
collection_name = "text_chunks"

# コレクションが既に存在する場合は削除
if Collection.exists(collection_name):
    collection = Collection(collection_name)
    collection.drop()

# コレクションのスキーマを定義
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),  # ベクトルの次元数
    FieldSchema(name="chunk", dtype=DataType.VARCHAR, max_length=65535)
]
schema = CollectionSchema(fields, description="Collection of text chunks and their embeddings")
collection = Collection(name=collection_name, schema=schema)

# OpenAI APIキーを設定
# openai.api_key = 'your-api-key'

# ファイルを読み込む
with open('./text/text_generation.txt', 'r') as file:
    content = file.read()

# 段落ごとに分割
paragraphs = re.split(r'\n\s*\n', content)

# コードスニペットを抽出する正規表現
code_pattern = re.compile(r'```python(.*?)```', re.DOTALL)

# 段落とコードスニペットをリストに格納
chunks = []
for paragraph in paragraphs:
    code_matches = code_pattern.findall(paragraph)
    if code_matches:
        for code in code_matches:
            chunks.append(code.strip())
    else:
        chunks.append(paragraph.strip())

# 各チャンクをベクトル化する関数
def get_embedding(text):
    client = OpenAI()
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

# 各チャンクをベクトル化
chunk_embeddings = []
for chunk in chunks:
    embedding = get_embedding(chunk)
    chunk_embeddings.append(embedding)

# ベクトルデータをデータフレームに変換
df = pd.DataFrame(chunk_embeddings)
df['chunk'] = chunks

# ベクトルデータの正規化
normalized_embeddings = normalize(df.drop(columns=['chunk']), axis=1)
df_normalized = pd.DataFrame(normalized_embeddings)
df_normalized['chunk'] = df['chunk']

# Milvusにベクトルデータを追加
insert_data = [
    df.index.tolist(),  # IDとしてインデックスを使用
    normalized_embeddings.tolist(),
    df['chunk'].tolist()
]
collection.insert(insert_data)

# インデックスを作成
collection.create_index(field_name="embedding", index_params={"index_type": "IVF_FLAT", "params": {"nlist": 128}})

# コレクションのロード
collection.load()

# 結果を確認
print(df_normalized.head())

# 検索機能の実装例
def search_chunk(query, k=5):
    query_embedding = get_embedding(query)
    query_embedding = normalize([query_embedding], axis=1).tolist()[0]
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
    results = collection.search([query_embedding], "embedding", search_params, limit=k)
    return [result.entity.get("chunk") for result in results[0]]

# サンプル検索
query = "テキスト生成モデルの概要"
results = search_chunk(query)
print("検索結果:")
for result in results:
    print(result)