# Vector Search and RAG function application based on SuperDuperDB

In [1]:
import os
import click
from tqdm import tqdm

import sentence_transformers
from dotenv import load_dotenv
from superduper import (
    Document,
    Listener,
    Model,ObjectModel,
    Schema,
    VectorIndex,
    superduper,
    vector
)
# from superduper.backends.mongodb import
import superduper_mongodb
load_dotenv()

  from tqdm.autonotebook import tqdm, trange


True

## Connect to mongodb database

In [2]:
mongodb_uri = os.getenv("MONGODB_URI", "superduperdb-demo")
artifact_store = os.getenv("ARTIFACT_STORE", "data/artifact_store")

db = superduper(mongodb_uri, artifact_store=f"filesystem://{artifact_store}")

2024-Aug-31 01:58:30.80| INFO     | localhost.localdomain| superduper.base.build:56   | Data Client is ready. MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, serverselectiontimeoutms=5000)
2024-Aug-31 01:58:30.80| INFO     | localhost.localdomain| superduper.base.build:35   | Connecting to Metadata Client with engine:  MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, serverselectiontimeoutms=5000)
2024-Aug-31 01:58:30.81| INFO     | localhost.localdomain| superduper.backends.local.artifacts:29   | Creating artifact store directory
2024-Aug-31 01:58:30.81| INFO     | localhost.localdomain| superduper.base.build:141  | Connecting to compute client: Compute(uri=None, compute_kwargs={}, _path='superduper.backends.local.compute.LocalComputeBackend')
2024-Aug-31 01:58:30.81| INFO     | localhost.localdomain| superduper.base.datalayer:106  | Building Data Layer
2024-Aug-31 01:58:30.82| INFO     | localhost.locald

In [3]:
 db.show()


[]

## Parse pdf files and store them in the database

In [4]:
from superduper.ext.unstructured.encoder import unstructured_encoder

db.add(unstructured_encoder)

pdf_folder = 'pdf-folders'

pdf_paths = [os.path.join(pdf_folder, pdf) for pdf in os.listdir(pdf_folder)]
# collection = superduper_mongodb("source")
to_insert = [
    Document({"elements": unstructured_encoder(pdf_path)}) for pdf_path in pdf_paths
]
# db.execute(collection.insert_many(to_insert))
_ = db['source'].insert_many(to_insert).execute()

2024-Aug-31 01:58:49.74| INFO     | localhost.localdomain| superduper.backends.mongodb.data_backend:226  | Table source does not exist, auto creating...


In [5]:
db['source'].find_one().execute().unpack()

{'_id': ObjectId('66d2bf48185a2bd98429bddc'),
 'elements': [<unstructured.documents.elements.Header at 0x7f8d9bc1e910>,
  <unstructured.documents.elements.NarrativeText at 0x7f8d9b883150>,
  <unstructured.documents.elements.NarrativeText at 0x7f8d9b881fd0>,
  <unstructured.documents.elements.NarrativeText at 0x7f8d9b880f10>,
  <unstructured.documents.elements.NarrativeText at 0x7f8d9b883d50>,
  <unstructured.documents.elements.NarrativeText at 0x7f8d9b8803d0>,
  <unstructured.documents.elements.Title at 0x7f8d9b882610>,
  <unstructured.documents.elements.Footer at 0x7f8dd41fb5d0>,
  <unstructured.documents.elements.Text at 0x7f8d9d703410>,
  <unstructured.documents.elements.Title at 0x7f8d9b882e10>,
  <unstructured.documents.elements.Text at 0x7f8d9b883590>,
  <unstructured.documents.elements.Title at 0x7f8d9b881f10>,
  <unstructured.documents.elements.Text at 0x7f8d9b880e10>,
  <unstructured.documents.elements.Header at 0x7f8d9b595310>,
  <unstructured.documents.elements.Title at 0x7f

## Create a chunking model to chunk pdf chunks

In [6]:
def merge_metadatas(metadatas, return_center=False):
    MAX_NUM = 999999999
    if not metadatas:
        return {}
    p1, p2, p3, p4 = (MAX_NUM, MAX_NUM), (MAX_NUM, 0), (0, 0), (0, MAX_NUM)
    for metadata in metadatas:
        p1_, p2_, p3_, p4_ = metadata["coordinates"]["points"]
        p1 = (min(p1[0], p1_[0]), min(p1[1], p1_[1]))
        p2 = (min(p2[0], p2_[0]), max(p2[1], p2_[1]))
        p3 = (max(p3[0], p3_[0]), max(p3[1], p3_[1]))
        p4 = (max(p4[0], p4_[0]), min(p4[1], p4_[1]))
    points = (p1, p2, p3, p4)
    if return_center:
        points = {"x": (p1[0] + p3[0]) / 2, "y": (p1[1] + p3[1]) / 2}
        page_number = metadata["page_number"]
    return {"points": points, "page_number": page_number}


def create_chunk_and_metadatas(page_elements, stride=3, window=10):
    datas = []
    for i in range(0, len(page_elements), stride):
        windown_elements = page_elements[i : i + window]
        metadatas = [e.metadata.to_dict() for e in windown_elements]
        chunk = "\n".join([e.text for e in windown_elements])
        datas.append(
            {"txt": chunk, "metadata": merge_metadatas(metadatas, return_center=True)}
        )
    return datas


def get_chunks(elements):
    from collections import defaultdict

    pages_elements = defaultdict(list)
    for element in elements:
        pages_elements[element.metadata.page_number].append(element)

    all_chunks_and_links = sum(
        [
            create_chunk_and_metadatas(page_elements)
            for _, page_elements in pages_elements.items()
        ],
        [],
    )
    return all_chunks_and_links


In [7]:
MODEL_IDENTIFIER_CHUNK = "chunk"
# from superduper import ObjectModel
chunk_model = ObjectModel(
    identifier=MODEL_IDENTIFIER_CHUNK,
    object=get_chunks,
    flatten=True,
    model_update_kwargs={"document_embedded": False},
    output_schema=Schema(identifier="myschema", fields={"txt": "string"}),
)

# db.add(
#     Listener(
#         model=chunk_model,
#         select=select,
#         key="elements",
#     )
# )
 
 
upstream_listener= Listener(
        model=chunk_model,
        select=db['source'].find(),
        key="elements",
    )
db.apply(upstream_listener)

InvalidDocument: cannot encode object: <superduper.components.schema._Native object at 0x7f8d9d8e8850>, of type: <class 'superduper.components.schema._Native'>

In [None]:
upstream_listener= Listener(
        model=chunk_model,
        select=db['source'].find(),
        key="elements",
    )
db.apply(upstream_listener)

## Embedding all text blocks and building vector indexes

In [None]:
SOURCE_KEY = "elements"
MODEL_IDENTIFIER_EMBEDDING = "embedding"
VECTOR_INDEX_IDENTIFIER = "vector-index"
COLLECTION_NAME_CHUNK = f"_outputs.{SOURCE_KEY}.{MODEL_IDENTIFIER_CHUNK}"
CHUNK_OUTPUT_KEY = f"_outputs.{SOURCE_KEY}.{MODEL_IDENTIFIER_CHUNK}"

chunk_collection = Collection(COLLECTION_NAME_CHUNK)

def preprocess(x):
    if isinstance(x, dict):
        # For model chains, the logic of this key needs to be optimized.
        chunk = sorted(x.items())[-1][1]
        return chunk["txt"]
    return x

model = Model(
    identifier=MODEL_IDENTIFIER_EMBEDDING,
    object=sentence_transformers.SentenceTransformer("BAAI/bge-large-en-v1.5"),
    encoder=vector(shape=(384,)),
    predict_method="encode",
    preprocess=preprocess,
    postprocess=lambda x: x.tolist(),
    batch_predict=True,
)

db.add(
    VectorIndex(
        identifier=VECTOR_INDEX_IDENTIFIER,
        indexing_listener=Listener(
            select=chunk_collection.find(),
            key=CHUNK_OUTPUT_KEY,  # Key for the documents
            model=model,  # Specify the model for processing
            predict_kwargs={"max_chunk_size": 64},
        ),
    )
)

## Define a vector search function

In [None]:
from pprint import pprint
def vector_search(query, top_k=5):
    collection = Collection(COLLECTION_NAME_CHUNK)
    out = db.execute(
        collection.like(
            Document({CHUNK_OUTPUT_KEY: query}),
            vector_index=VECTOR_INDEX_IDENTIFIER,
            n=top_k,
        ).find({})
    )
    if out:
        out = sorted(out, key=lambda x: x.content["score"], reverse=True)
    for r in out:
        score = r.content["score"]
        chunk_data = r.outputs("elements", "chunk")
        metadata = chunk_data["metadata"]
        chunk_message = {}
        chunk_message["score"] = score
        chunk_message["metadata"] = metadata
        txt = chunk_data["txt"]
        print(txt)
        print()
        print(chunk_message)
        print("\n\n", '-' * 20)

In [None]:
vector_search("What is the function of keys 10 to 12 on the left steering wheel keypad?")

## Define an LLM model

In [None]:
MODEL_IDENTIFIER_LLM = "llm"
prompt_template = (
    "The following is a document and question about the volvo user manual\n"
    "Only provide a very concise answer\n"
    "{context}\n\n"
    "Here's the question:{input}\n"
    "answer:"
)

# from superduper.ext.llm.vllm import VllmModel
from superduper.ext.openai import OpenAIChatCompletion

llm = VllmModel(
    identifier=MODEL_IDENTIFIER_LLM,
    model_name="TheBloke/Mistral-7B-Instruct-v0.2-AWQ",
    prompt_template=prompt_template,
    vllm_kwargs={"max_model_len": 2048, "quantization": "awq"},
    inference_kwargs={"max_tokens": 2048},
)
# Add the llm instance

db.add(llm)

## Define a QA function

In [None]:
from IPython.display import Markdown
from IPython.display import display
import pandas as pd
def qa(query, vector_search_top_k=5):
    collection = Collection(COLLECTION_NAME_CHUNK)
    output, out = db.predict(
        model_name=MODEL_IDENTIFIER_LLM,
        input=query,
        context_select=collection.like(
            Document({CHUNK_OUTPUT_KEY: query}),
            vector_index=VECTOR_INDEX_IDENTIFIER,
            n=vector_search_top_k,
        ).find({}),
        context_key=f"{CHUNK_OUTPUT_KEY}.0.txt",
    )
    if out:
        out = sorted(out, key=lambda x: x.content["score"], reverse=True)
    page_messages = []
    for source in out:
        chunk_data = source.outputs("elements", "chunk")
        metadata = chunk_data["metadata"]
        page_number = metadata["page_number"]
        points = metadata["points"]
        score = source["score"]
        page_messages.append(
            {"page_number": page_number, "points": points, "score": score}
        )
    df = pd.DataFrame(page_messages)
    display(output.content)
    display(df)

In [None]:
qa("What is the function of keys 10 to 12 on the left steering wheel keypad?")