In [2]:
import numpy as np
from sentence_transformers import SentenceTransformer
from opensearchpy import OpenSearch, RequestsHttpConnection

SERVER_URL = "http://localhost:9200"
INDEX_NAME = "llama-mix-index"
model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2') 

  from .autonotebook import tqdm as notebook_tqdm


In [36]:
# https://towardsdatascience.com/text-search-vs-vector-search-better-together-3bd48eb6132a
# bm25 vs tfidf https://www.infoq.com/articles/similarity-scoring-elasticsearch/
## 1. load data
##    -> define vector field
##    -> read the text
##    -> get vector of text (embed model)
##    -> load vector into vector field
##    -> load the text to a text field (for bm25 implementation)
## 2. implement retrieval/query
##    -> do retrieval using bm25
##    -> do retrieval using vector (knn - cosinesimilarity)
##    -> normalize the bm25 (0 - 1)
##    -> reduce results
##       -> overlapping results (bm25 + vector) -> apply boost
##       -> unique results (set the lowest) -> can be set to 0 or not add to the list
##       -> sort the results
## 3. Summarization of the results (top 5)


def normalize_bm25_formula(score, max_score):
    return score / max_score


def normalize_bm25(bm_results):
    hits = (bm_results["hits"]["hits"])
    max_score = bm_results["hits"]["max_score"]
    for hit in hits:
        hit["_score"] = normalize_bm25_formula(hit["_score"], max_score)
    bm_results["hits"]["max_score"] = hits[0]["_score"]
    bm_results["hits"]["hits"] = hits
    return bm_results

def run_queries(os_client, query="What did the author do growing up?", vector_boost_level=1.0, bm25_boost_level=1.0):
        # reduce the scores by 1 when using cpu
        cpu_request_body = {
            "size": 5,
            "query": {
                "script_score": {
                    "query": {
                        "match_all": {}
                    },
                    "script": {
                        "source": "knn_score",
                        "lang": "knn",
                        "params": {
                            "field": "description_vector",
                            "query_value": get_vector_sentence_transformers(query).tolist(),
                            "space_type": "cosinesimil"
                        }
                    }
                }
            },
            "_source": ["text_field"],
        }

        bm25_query = {
            "size": 5,
            "query": {
                "match": {
                    "text_field": query
                }
            },
            "_source": ["text_field"],
        }
        vector_search_results = {"hits": {"hits": []}}
        if vector_boost_level != 0:
            vector_search_results = os_client.search(body=cpu_request_body, index=INDEX_NAME)
        bm25_results = {"hits": {"hits": []}}
        if bm25_boost_level != 0:
            bm25_results = os_client.search(body=bm25_query, index=INDEX_NAME)
            bm25_results = normalize_bm25(bm25_results)
        combined_results = interpolate_results(vector_search_results["hits"]["hits"],
                                                bm25_results["hits"]["hits"])
        sorted_elements = apply_boost(combined_results, vector_boost_level, bm25_boost_level)

        result_data_dictionary = extract_results_data(vector_search_results["hits"]["hits"],
                                                        bm25_results["hits"]["hits"])
        construct_response(result_data_dictionary, sorted_elements)


def extract_results_data(vector_data, bm25_data):
    result_data_dictionary = {}
    for vector_hit in vector_data:
        text_description = vector_hit["_source"]["text_field"]
        result_data_dictionary[vector_hit["_id"]] = [text_description]
    for bm25_hit in bm25_data:
        text_description = bm25_hit["_source"]["text_field"]
        result_data_dictionary[bm25_hit["_id"]] = [text_description]
    return result_data_dictionary


def construct_response(result_data_dictionary, sorted_elements):
    for index, sorted_element in enumerate(sorted_elements):
        print(index + 1, result_data_dictionary[sorted_element])


def get_vector_sentence_transformers(text_input):
    return model.encode(text_input)


def normalize_data(data):
    return data / np.linalg.norm(data, ord=2)


def get_client(server_url: str) -> OpenSearch:
    os_instance = OpenSearch(SERVER_URL, use_ssl=False, verify_certs=False,
                             connection_class=RequestsHttpConnection)
    # print("OS connected")
    return os_instance


def get_min_score(common_elements, elements_dictionary):
    if len(common_elements):
        return min([min(v) for v in elements_dictionary.values()])
    else:
        # No common results - assign arbitrary minimum score value
        return 0.01


def interpolate_results(vector_hits, bm25_hits):
    # gather all entry ids
    bm25_ids_list = [hit["_id"] for hit in bm25_hits]
    vector_ids_list = [hit["_id"] for hit in vector_hits]
    # find common entry ids
    common_results = set(bm25_ids_list) & set(vector_ids_list)
    results_dictionary = dict((key, []) for key in common_results)
    for common_result in common_results:
        for index, vector_hit in enumerate(vector_hits):
            if vector_hit["_id"] == common_result:
                results_dictionary[common_result].append(vector_hit["_score"])
        for index, BM_hit in enumerate(bm25_hits):
            if BM_hit["_id"] == common_result:
                results_dictionary[common_result].append(BM_hit["_score"])
    min_value = get_min_score(common_results, results_dictionary)
    # assign minimum value scores for all unique results
    for vector_hit in vector_hits:
        if vector_hit["_id"] not in common_results:
            new_scored_element_id = vector_hit["_id"]
            results_dictionary[new_scored_element_id] = [min_value]
    for BM_hit in bm25_hits:
        if BM_hit["_id"] not in common_results:
            new_scored_element_id = BM_hit["_id"]
            results_dictionary[new_scored_element_id] = [min_value]

    return results_dictionary


def apply_boost(combined_results, vector_boost_level, bm25_boost_level):
    for element in combined_results:
        if len(combined_results[element]) == 1:
            combined_results[element] = combined_results[element][0] * vector_boost_level + \
                                        combined_results[element][0] * bm25_boost_level
        else:
            combined_results[element] = combined_results[element][0] * vector_boost_level + \
                                        combined_results[element][1] * bm25_boost_level
    # sort the results based on the new scores
    sorted_results = [k for k, v in sorted(combined_results.items(), key=lambda item: item[1], reverse=True)]
    return sorted_results


# def main():


In [47]:
os_client = get_client(SERVER_URL)
query = "What are the common programming languages?"
run_queries(os_client, vector_boost_level=0.4, bm25_boost_level=0.9)


1 ["Now that I could write essays again, I wrote a bunch about topics I'd had stacked up. I kept writing essays through 2020, but I also started to think about other things I could work on. How should I choose what to do? Well, how had I chosen what to work on in the past? I wrote an essay for myself to answer that question, and I was surprised how long and messy the answer turned out to be. If this surprised me, who'd lived it, then I thought perhaps it would be interesting to other people, and encouraging to those with similarly messy lives. So I wrote a more detailed version for others to read, and this is the last sentence of it."]
2 ["I learned a lot in the color class I took at RISD, but otherwise I was basically teaching myself to paint, and I could do that for free. So in 1993 I dropped out. I hung around Providence for a bit, and then my college friend Nancy Parmet did me a big favor. A rent-controlled apartment in a building her mother owned in New York was becoming vacant. D