Vektordatenbank im Podcastexplorer

Dieses Notebook soll einen guten Überblick darübr geben, wie man mit Vektordatenbanken in Zusammenhang mit Large-Language-Models (LLMs) arbeitet. 
Under anderem inkludiert das folgende Teilaufgaben und Konzepte: Textchunking, Vectorembedding, Metadaten, Indizess, Similarity Search.

Die Implementierung erfoglt in Pinecone (mit dem Python Pinecone-client) , der openAI library, und dem Langchain Framework und den schon transkriebierten Podcastfolgen.

Als erstes möchte ich eine Definitionen vorweg nehmen.

Tokenisierung: Der Prozess der Aufteilung einer Zeichenkette (in unserem Fall ein transkribierte Podacastfolge) in Tokens, wird als Tokenisierung bezeichnet. Tokens entsprechen in englischer Sprache durchschnittlich vier Zeichen.

Tokenisierung wird von einem Tokenizer übernommen in python mit der tiktoken library. Ziel des ganzen ist das Textdokument in vielzahl von "Chunks" aufzuteilen, die aus einer vorgegebenen Anzahl aus Tokens bestehen, aber alles noch im Klartext.


ich entscheide mich diesmal gegen den Blob-Storage Connector aus dem Langchain Framework aus dem einfachen Grund, weil ich diesmal den Blobnamen als Metadatum zum jedem Vector mitabspeichern möchte, das macht das arbeiten mit dem Blobtrigger um einiges einfacher. Um Files hereinzuladen, und auch wieder zu löschen....

als erstes definieren wir uns die lenght funktionen, die vom Textsoplitter genutzt wird, den trankribierten text in Chunks gewünschter länge zu verteilen

In [1]:
# alle benötigten 
import tiktoken
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
from azure.storage.blob import BlobServiceClient
import logging
import json
import re
from langchain.embeddings import OpenAIEmbeddings
import pinecone
from common import upload_blob_to_storage
import azure.functions as func
from support_allInOne import  search_pinecone, generate_JSON_respone, ask_GTP_api, generate_container_sas, generate_prompt_input, check_for_answer_capacity_in_tokens
import traceback
import openai
from azure.storage.blob import BlobServiceClient, generate_container_sas, BlobSasPermissions
import datetime

In [None]:

#als erstes definieren wir uns die lenght funktionen, die vom Textsoplitter genutzt wird, den trankribierten text in Chunks gewünschter länge zu verteilen
# als erstes schreiben wir uns die "Lenght Function"  die das Von uns angegebene tokenizer Model benutzt
# diese Lenght Function übergeben wir dem Textsplitter objekt der für uns das splitten übernimmt


tokenizer = tiktoken.get_encoding('cl100k_base') # das tokenizer model



# create the length function
def tiktoken_len(text):

    try:

        tokens = tokenizer.encode(
            text,
            disallowed_special=()
        )
        return len(tokens)
    
    except:
        logging.error("Something went wrong in the tiktoken_len")
        #raise Exception("Something went wrong in the tiktoken_len")


# initialisieren des Textsplitters
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=400, # 400 hat sich als gute chunk size herausgestellt
chunk_overlap=20,  # number of tokens overlap between chunks
length_function=tiktoken_len, 
separators=['\n\n', '\n', ' ', '']
)


: 

In [None]:
#1. wir connecten uns mit der python SDK zu der Blob-Storage in der die Files liegen und lassen uns eine Blob liste generieren
# 2. Wir generieren uns eine Blob-List und können so auf einzelne Blob und deren Metadaten (Properties zugreifen)


# Im Quell Ordner gibt es eine Unterordner für 2021 / 2022 / 2023 damit die Funktion nicht zu lange läuft, lasse ich die Funktion dreimal laufen (manuell, weil den relativen Blob path händisch umstelle)
# das machen wir für jedes Jahr


connection_string = os.environ['storage_PodcastExplorer']
        
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
      
container_client = blob_service_client.get_container_client('transcriptions')

blob_list = container_client.list_blobs(name_starts_with= 'Angebissen - der Angelpodcast/2021')



In [None]:
# wir definieren und die Sink connection

app_setting = 'my_storage'

connection_string = os.environ[app_setting] 

blob_service_client = BlobServiceClient.from_connection_string(connection_string)

container_client = blob_service_client.get_container_client('cleartextchunks/2021')

In [None]:
# 3. Wir unterteilen jeden einzelne Dokument in Klartext chunk und speichern den dazugehörige  und Blobnamen und de reltiven Source Path an (für spätere SAS-Token generierung) ab in einem Python Dictionary
# 4. Wir erstellen für jeden Dokument ein Json Dokument (python dictionary wird zum json geparsed) und speichern es in einer Blob Storage ab (als JSON-File)

start_point = 'https://stpodcastexplorer.blob.core.windows.net/'
relative_path = 'transcriptions/'

for blob in blob_list:

            text_chunks_with_metaData = []

            # connect to the specific blob
            blob_client = container_client.get_blob_client(blob)
            
            data = blob_client.download_blob()

            # chunking the text of the blob into several chunks of size 400
            chunks = text_splitter.split_text(data.content_as_text())

            for txt in chunks:

                a = {
        
                    'text' : txt, 
                    'blob_url': start_point + relative_path + str(data.name),
                    'blob_name': data.name

                }

                text_chunks_with_metaData.append(a)

            try:
                # parse python dictinary to json data
                json_data = json.dumps(text_chunks_with_metaData)
                # we do  not want the '.txt' ending
                output_blob = container_client.get_blob_client(str(data.name[:-4]) + '_chunked.json')
                output_blob.upload_blob(json_data,overwrite=True)
                logging.info("Blob for file : " + data.name + ' was successfully written to storage' )

            except:

                logging.warning("Blob for file : " + data.name + ' could NOT BE WRITTEN!!!!' )


Nachdem wir jetzt den ersten schritt gemacht haben und die Klartext chunks erstellt haben. Werden wir diese jetzt in Vektoren übersetzen (embedden) lassen via der OpenAI api und dem Text-embedding model:

Vektorembedding: Voktorembedding von Klartext-Chunks bezieht sich auf den Prozess, bei dem Wörter oder Textabschnitte als numerische Vektoren repräsentiert werden, um deren semantische Bedeutung und kontextuelle Beziehungen zu erfassen. Diese Technik wird häufig in der Verarbeitung natürlicher Sprache (NLP) und maschinellem Lernen eingesetzt.

Ein Index in einer Vektordatenbank beschreibt meistens nur eine "Datenbank". Innerhalb dieses Indexes (Datenbank) kannst du mehrere Collections, Namspaces o.Ä haben (Wie mehrere Tabellen in einer Datenbank). Da kommt es ganz auf den Anbieter drauf an wie er das genau benennt.

Der Begriff Index hat sich durchgesetzt für nichts weiter als einen Speicherort, weil du durch den hohen Berechnungsaufwand der "Similarity Search" im Vektorraum Mathematische Heuristiken entwickelt werden mussten, um den Exponentiellen Berechnungsaufwand logarithmisch skalieren zu lassen. Ein sogenannter Index tut genau dieses. (Mit FAISS der openSource library von Facebook kann man das selbst programmatisch ausprobieren)

Aber da dieser "Index" (der als Datenbank fungiert) einer Vektordatnebank unsere Vektoren automatisch Indiziert, wird er meistend Index genannt.

Similarity Search: Die Similarity Searchbezieht sich auf die Suche nach ähnlichen Elementen oder Datensätzen in einer Vektordatenbank. Dafür gibt es verschiedene Mathematische Ansätze die Ähnlichkeit von Vektoren zu bestimmen.

Warum sind Metadaten wichtig:
Metadaten sind wichtig im Zusammenhang mit Vektordatenbanken, da sie zusätzliche Informationen über die gespeicherten Vektoren liefern. Diese zusätzlichen Informationen ermöglichen eine bessere Verwaltung, Suche und Analyse der Daten in der Datenbank. Für den Menschen sind Vektoren nicht Human-Readable. Metadaten geben den Vektoren Bedeutung und Kontext. Ohne Metadaten sind Vektoren nur numerische Werte ohne klare Interpretation. 
Metadaten ermöglichen eine präzisere Suche und Filterung in der Datenbank, da man den Search Space verkleinern kann. Sie können Vektoren in logische oder semantische Gruppen einteilen oder zeitliche Abhängigkeiten hinzufügen. Und noch vieles weitere.

In [None]:
# 1. Wir nehmen uns die davor erstellten Json-Dokumente 
#2. Wir embedden den Klartext zu einem Vektor via der OpenAI API, dort werden wir das "text-embedding-ada-002" model benutzen
#3. Mit Regex ziehen wir uns die Metadaten zu den Vektoren aus dem Blob name heraus und speichern sie im JSON format ab (BEACHTE: die meisten Voktordatenbanken folgen dem NOSQL prinzip, das Json-Schema ist nicht fest vorgegeben und man kann jedem Vektor eine beliebieg Menge an Metadaten mitgeben)
#4. Über den Pinecone client inserten wir (upserten) den Vektor inklusive metadaten und klartext in unseren vorher definierten Index.


connection_string = os.environ['my_storage']
       
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

      
container_client = blob_service_client.get_container_client('cleartextchunks')

# hier laden wir uns wieder die Klartext chunks in eine BlobListe
blob_list = container_client.list_blobs(name_starts_with= 'Angebissen - der Angelpodcast/2023')

logging.info("got the Blob List")
      
# hier verbinden wir uns zu unserem Vektordatnebank auf Pinecone
pinecone.init(      
api_key='<<api-key>>',      
environment='asia-southeast1-gcp-free'      
)      
index = pinecone.Index('mypodcastindex')
 
      
logging.info("start embedding")
embeddings = OpenAIEmbeddings(deployment="text-embedding-ada-002")

# list slicing for testing purposes

for blob in blob_list:

  vectors = []

  blob_client = container_client.get_blob_client(blob)
            
  data = blob_client.download_blob()

  json_data = json.loads(data.content_as_text())

        # update identity
  stats_response = index.describe_index_stats()

  # calculate identity counter
  identity_counter = stats_response['total_vector_count'] +1
  chunk_counter = 1

  for chunk in json_data:

      # hier ziehen wir uns die Metadaten die in den BlobNamen gespeichert sind raus

      # für die Episodennummer
      match_sequence = re.search(r"/(\d+)_", chunk['blob_name_short'])
      if match_sequence:
            episode = match_sequence.group(1)
      else:
            episode = ""

      # für das Erscheinungsdaten
      match_date = re.search(r"(\d{4})(\d{2})(\d{2})", chunk['blob_name_short'])
      if match_date:
            date = f"{match_date.group(1)}{match_date.group(2)}{match_date.group(3)}"
      else:
            date = ""

      # für den Titel
      match_title = re.search(r"/(\d+_\d+_)(.+?)_", chunk['blob_name_short'])
      if match_title:
            title = match_title.group(2).replace("_", " ")
      else:
            title = ""

       # so definieren wir unser upsert Objekt, wie und was man alles abspeichern kann hängt von dem Anbieter     
      curr = {
                  "id": "vec" + str(identity_counter),
                  "values" : embeddings.embed_query(chunk['text']),
                  "metadata": {
                  "blob_url": chunk['blob_url'], 
                  "year" : '2023',
                  "date" : date, 
                  "episode" : episode,
                  "title": title,
                  "clear_text": chunk['text'],
                  "blob_name": chunk['blob_name_short'],
                  "chunk_counter": chunk_counter,
                  "total_doc_chunks" : len(json_data)
                  }        
      }

      identity_counter += 1
      chunk_counter +=1 


      vectors.append(curr)

      # insert Vector + Metadata into DB
      index.upsert(vectors=[curr], namespace="angebissen")

      #--------------- innere For Schleife beendet--------------------------

# hier definieren wir uns den JSON-File den wir uns abspichern für den Db Inhalt
  json_to_return = {
      "vectors": vectors,

      "namespace": "angebissen"

  }
         
  upload_blob_to_storage('embeddedchunks', json.dumps(json_to_return), data.name[:-5] +'_upserted.json')

#--------------------- äußere For schleife beendet------------





Nachdem wir alles in die Datenbank inserted haben, wollen wir die Datenbank auch benutzen! Zeit Fragen zu stellen und das läuft so ab.

1. Wir bekommen eine Frage gestellt vom User (unsere Function wird via HTTP Trigger getriggert per Post Request)
2. Aus dem JSON Body extrahieren wir uns die Frage und embedden diese. Nun haben wir einen Search Vector (!!! Wichig: Wir müssen das gleiche Embedding Model benutzen, sonst vergleichen wir Äpfel mit Birnen!!!)
3. Jeztt performen wir die "Similarity Search". Die Vektordatenbank gibt uns die k most similar Vektoreinträge zurück inklusive Metadaten, denn diese interessieren uns eigentlich. (k ist ein natürlicher Parameter, der nur durch die Chunk Size und Tokenin- und Tokenoutput vom LLM bedingt wird)
4. Wir setzen uns einen prompt zusammen. Das Prompt Template ist in einem Textfile ausgelagert. Dort schreiben wir den Kontext rein (extrahierte Klartext zu den Vektoren und Metadaten, die wir zu einem großen String konkatenieren) und eben die User Frage
5. Wir schicken diesen Prompt zu ChatGTP und erhalten eine genaue Antwort auf Basis unsere Daten!
6. Wie erstellen eine JSON Antwort, die die Frage, die Antwort, die DB Response und SAS-Links zu den referenzioerten Dokumenten in der Datenbank.
7. Wir schicken diese zurück und das Frontend muss den Rest übernehmen.


In [None]:
try:
         
        # get the Question out of the Request Body

        req_body = req.get_json()


        question = req_body['question']

        # embed the query to a vector
        embeddings = OpenAIEmbeddings(deployment="text-embedding-ada-002")
        question_embedding = embeddings.embed_query(question)


        # ------------------ now query the database
        # top_k als parameter für die Anzahl für die most similar einträge die zurück gegeben werden
        query_response = search_pinecone(question_embedding, top_k=6)

        # ---------------- now generate the answer --------------------

        # hier wird die große Json zurückgegeben 
        response = generate_JSON_respone(ask_GTP_api(query_response, question), query_response, question)
        

        logging.info("Generated answer , sending it back")

        #return func.HttpResponse(json.dumps(response), status_code=200)

except Exception as e:

        error_traceback = traceback.format_exc()

        #return func.HttpResponse("Irgendwas ist schiefgelaufen:" + str(e) + " --------" + error_traceback , status_code=400)

Funktionen, die in einen Python File ausgelagert habe, um den Code "sauber" zu Halten

In [None]:
# hier setzen wir uns den sogenannten Context für den Prompt zusammen. Wir benutzen die Metadaten Epsiodennummer, Titel und berechnen wo im Dokuemnt diese Referenz liegt

def generate_content_for_prompt(query_response):
    try:

        logging.info("now we are concatenating the input")


        string_to_return = ""
        for i in query_response['matches']:

            # hier berechnen wir uns die Stelle , wo es im Dokument vorkommt
            a = i['metadata']['chunk_counter'] /i['metadata']['total_doc_chunks']

            if a >= 0.75:
                meta_zeit = 'Am Ende'
            elif a < 0.75 and a > 0.25:
                meta_zeit = "In der Mitte"
            else:
                meta_zeit = 'Am Anfang'

            string_to_return += str(i['metadata']['clear_text']) + meta_zeit +" der Folge " + str(i['metadata']['episode']) + " mit dem Name " + str(i['metadata']['title']) + "."


        return string_to_return
    
    except:
        logging.error("Something went wrong genrating the Content for the Prompt")
        raise Exception("Something went wrong genrating the Content for the Prompt")

In [None]:
# so lese ich das Prompt Template aus dem Textfile heraus und setze meine "Bausteine" ein

def generate_prompt_input(context, question):
    with open('prompt_template.txt') as f:
        prompt = f.read()
    
    prompt = prompt.replace("<<Context>>", context)
    prompt = prompt.replace("<<Frage>>", question)


    return prompt


In [None]:
# so gebe ich eine query in Pinecone ab, top_k =5 ist als default wert eingestellt.

def search_pinecone(question_embedding, top_k = 5):

    pinecone.init(      
	    api_key='<<api-key>>',      
	    environment='asia-southeast1-gcp-free'      
        )      
    index = pinecone.Index('mypodcastindex')      

    query_response = index.query(
            top_k=top_k,
            include_values=False, # wir brauchen die Vektoren Values nicht
            include_metadata=True, # wire brauchen auf jeden fall die metadaten
            vector=question_embedding,
            namespace='angebissen'
        )
    
    return query_response

In [None]:
# jetzt haben wir alles zusammen, um einen prompt an "chatGTP" zu stellen das mache wir über die openai library

def ask_GTP_api(query_response, question):

    context = generate_content_for_prompt(query_response)


    prompt = generate_prompt_input(context, question)


    openai.api_key = os.environ['OPENAI_API_KEY']
    openai.api_base= os.environ['OPENAI_API_BASE']
    openai.api_type = os.environ['OPENAI_API_TYPE']
    openai.api_version =os.environ['OPENAI_API_VERSION']

    # diese funktion monitored unseren input und berechnet den maximal mnöglichen output für das LLM
    max_tokens = check_for_answer_capacity_in_tokens(prompt)

    # wir wollen die temparture bei 0 halten
    completion = openai.Completion.create(engine="text-davinci-003", prompt=prompt, temperature=0, max_tokens=max_tokens)

    return completion



In [None]:
# so generiere SAS Tokens, sodass der User direkt auf die Dokumente zugreifen kann auch wenn er nicht Contributer der Blob storage ist oder ähnliches

def generate_SAS_urls_for_sources(query_response):

    try:


        # now we are going to 

        start_time = datetime.datetime.now(datetime.timezone.utc)
        expiry_time = start_time + datetime.timedelta(days=1)

        app_setting = 'storage_PodcastExplorer'

        connection_string = os.environ[app_setting] 

        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        container_client = blob_service_client.get_container_client('transcriptions')

        sas_token = generate_container_sas(
            account_name=container_client.account_name,
            container_name=container_client.container_name,
            permission=BlobSasPermissions(read=True),
            account_key=container_client.credential.account_key, 
            expiry=expiry_time,
            start=start_time
        )
        

        # now concatenate
        finished_links = []

        for x in query_response['matches']:

            a =  x['metadata']['blob_url'] + '?' + str(sas_token)

            finished_links.append(a)

        return finished_links
    
    except Exception as e:

        logging.error('SAS token genration hat nicht so gut funktioniert')
        raise Exception('SAS token genration hat nicht so gut funktioniert')

In [None]:
# am ende setzten wir einfach alles zusammen und schicken die json antwort an frontend zurück

def generate_JSON_respone(completion, query_response, question):

    return {
        "question": question,
    
        "GTP_answer": completion.choices[0].text,

        "query_response": str(query_response['matches']),

        "SAS_links": generate_SAS_urls_for_sources(query_response)
    }

So sieht mein Prompt aus:

Nutze ausschließlich den folgenden Kontext, um eine konkrete Antwort auf die Frage am Ende zu geben. Wenn du die Frage nicht beantworten kannst, antworte mit "Das kann ich nicht beantworten".
Referenziere in deiner Antwort aus welcher Folge und Episoden, du die Informationen herausnimmst ebenfalls wann es in der Folge vorkommt. Halte dich dabei aber kurz! Schreibe einen fließenden Text.
###
<<Context>>
###

Frage: <<Frage>>
