In [1]:
from __future__ import annotations
import numpy as np
import openai
import pandas as pd
import pickle
from glob import glob
from itertools import islice
import tiktoken
import os
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
openai.api_key = os.getenv("OPENAI_API_KEY")

In [2]:
COMPLETIONS_MODEL = "text-davinci-003"
EMBEDDING_MODEL = 'text-embedding-ada-002'
EMBEDDING_CTX_LENGTH = 8191
EMBEDDING_ENCODING = 'cl100k_base'

COMPLETIONS_API_PARAMS = {
    # We use temperature of 0.0 because it gives the most predictable, factual answer.
    "temperature": 0.0,
    "max_tokens": 500,
    "model": COMPLETIONS_MODEL,
}

In [3]:
def get_embedding(text: str, model: str=EMBEDDING_MODEL) -> list[float]:
    result = openai.Embedding.create(
      model=model,
      input=text
    )
    return result["data"][0]["embedding"]

def truncate_text_tokens(text, encoding_name=EMBEDDING_ENCODING, max_tokens=EMBEDDING_CTX_LENGTH):
    """Truncate a string to have `max_tokens` according to the given encoding."""
    encoding = tiktoken.get_encoding(encoding_name)
    return encoding.encode(text)[:max_tokens]

def batched(iterable, n):
    """Batch data into tuples of length n. The last batch may be shorter."""
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch
        
def chunked_tokens(text, encoding_name, chunk_length):
    encoding = tiktoken.get_encoding(encoding_name)
    tokens = encoding.encode(text)
    chunks_iterator = batched(tokens, chunk_length)
    yield from chunks_iterator
    
def len_safe_get_embedding(text, model=EMBEDDING_MODEL, max_tokens=EMBEDDING_CTX_LENGTH, encoding_name=EMBEDDING_ENCODING, average=True):
    chunk_embeddings = []
    chunk_lens = []
    for chunk in chunked_tokens(text, encoding_name=encoding_name, chunk_length=max_tokens):
        chunk_embeddings.append(get_embedding(chunk, model=model))
        chunk_lens.append(len(chunk))

    if average:
        chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)
        chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)  # normalizes length to 1
        chunk_embeddings = chunk_embeddings.tolist()
    return chunk_embeddings

def compute_doc_embeddings(df: pd.DataFrame) -> dict[tuple[str, str], list[float]]:
    """
    Create an embedding for each row in the dataframe using the OpenAI Embeddings API.
    
    Return a dictionary that maps between each embedding vector and the index of the row that it corresponds to.
    """
    return {
        idx: get_embedding(r.content) for idx, r in df.iterrows()
    }

def load_embeddings(folder_loc):
    embedding_list = []
    for item in glob(folder_loc):
        with open(item,'rb') as f:
            embedding_list.append({'file':item.replace("pickle", 'txt'), 'embedding':pickle.loads(f.read())})
    return embedding_list

def read_docs(item):

    with open(item) as f:
        return f.read()
    
def vector_similarity(x: list[float], y: list[float]) -> float:
    """
    Returns the similarity between two vectors.
    
    Because OpenAI Embeddings are normalized to length 1, the cosine similarity is the same as the dot product.
    """
    return np.dot(np.array(x), np.array(y))

def order_document_sections_by_query_similarity(query: str, contexts: dict[(str, str), np.array]) -> list[(float, (str, str))]:
    """
    Find the query embedding for the supplied query, and compare it against all of the pre-calculated document embeddings
    to find the most relevant sections. 
    
    Return the list of document sections, sorted by relevance in descending order.
    """
    query_embedding = get_embedding(query)
    
    document_similarities = sorted([
        (vector_similarity(query_embedding, rows['embedding']), read_docs(rows['file']), rows['file']) for rows in contexts
], reverse=True)
    
    return document_similarities

def construct_prompt(question: str, context_embeddings: dict) -> str:
    """
    Fetch relevant 
    """
    most_relevant_document_sections = order_document_sections_by_query_similarity(question, context_embeddings)

    
    chosen_sections = []
    chosen_sections_len = 0
    chosen_sections_indexes = []
     
    chosoen_selections = most_relevant_document_sections[0]
    # print(chosoen_selections)
    # Useful diagnostic information
    # print(f"Selected {len(chosen_sections)} document sections:")
    # print("\n".join(chosen_sections_indexes))
    
    header = """Answer the question as truthfully as possible using the provided context, and if the answer is not contained within the text below, say "I don't know."\n\nContext:\n"""
    
    return header + "".join(chosoen_selections[1]) + "\n\n Q: " + question + "\n A:", chosoen_selections[2]

def answer_query_with_context(
    query: str,
    document_embeddings: dict[(str, str), np.array],
    show_prompt: bool = True
) -> str:
    prompt, doc = construct_prompt(
        query,
        document_embeddings,
    )
    
    if show_prompt:
        print(prompt)

    response = openai.Completion.create(
                prompt=prompt,
                **COMPLETIONS_API_PARAMS
            )

    return response["choices"][0]["text"].strip(" \n"), doc

In [4]:
txt_file = "230501_한 여행자휴대품통관 길라잡이_short.txt"
pickle_path = txt_file.replace(os.path.splitext(txt_file)[-1], ".pickle")

with open(txt_file, "r")as f: # read txt file
    text = f.read()
    
with open(pickle_path, "wb")as f: # embedding & dump pickle
    pickle.dump(len_safe_get_embedding(text, average=True), f)

In [12]:
embedding_data = load_embeddings(pickle_path) # load embeddings

question = "니트로글리세린은 면세범위가 어떻게 돼?"

prompt, doc = answer_query_with_context(question,  embedding_data, show_prompt = False)
print("===\n", prompt)   

===
 니트로글리세린은 면세범위가 없습니다.
