# Build Document Embedding Tool

This is a fork from the retrieval-v1 within the local-rag development.

In this notebook the goal is to design an approach to load content from a PDF document into an vector database. For this experiment we will use Pinecone because it will most likely be our weapon of choice during production grade development

In [1]:
# load libraries
import os
import requests
from tqdm.auto import tqdm # for progress bars
import random
import re
import torch

  from .autonotebook import tqdm as notebook_tqdm


## Extract content from PDFs

data will be extracted from each pdf file we load in. The content can vary from text to images to tables. We will need to continue to develop different function to accurately load this into our notebook.

The text is cleaned before being populated to remove any noise. This will help increase accuracy of embeddings further down the process.

In [None]:


### Document Loader

import pymupdf

# extract image from pdf
def get_page_images(doc, page_content, page_index):
    image_paths = []
    try:
        image_list = page_content.get_images()

        # print number of images found on page
        if image_list: 
            print(f"found {len(image_list)} images on page {page_index}")

        for image_index, img in enumerate(image_list, start=1): # enumerate the image list
            xref = img[0] # get XREF of image
            pix = pymupdf.Pixmap(doc, xref) # create a Pixmap

            if pix.n - pix.alpha > 3: #CMYK: convert to RGB first
                pix = pymupdf.Pixmap(pymupdf.cdRGB, pix)

            image_path = "page_%s-image_%s.png" % (page_index, image_index)
            pix.save(image_path) # save the image as png
            pix = None

            image_paths.append(image_path)

    except Exception as e: 
        print(f"error occurred getting images: {e}")

    return image_paths

# clean text
def clean_text(text: str) -> str:
    """
    Format text to remove noise.

    In the document we are experimenting with, there are a lot of "."s
    We will also move "\n" and replace with " "
    """

    # replace multiple dots (.............) with a single space
    text = re.sub(r'\.{2,}', ' ', text)

    # replace new line character with space
    clean_text = text.replace("\n", " ").strip()

    # Add more formatting if needed
    return clean_text

# parse document
def parse_document(filepath):
    """
    This will extract all the data from document and 
    populate a dictionary with the extracted data
    """

    print("---PARSING DOCUMENT---")
    doc = pymupdf.open(filepath) # open document
    pages_and_texts = []

    for page_number, page in enumerate(doc):
        text = page.get_text() # get text from page
        text = clean_text(text) # clean text
        pages_and_texts.append({
            "page_number": page_number,
            "page_char_count": len(text),
            "page_word_count": len(text.split(" ")),
            "page_sentence_count_raw": len(text.split(". ")),
            "page_token_count": len(text) / 4, # average token = ~4 char
            "images": get_page_images(doc, page, page_number),
            "text": text,
        })

    return pages_and_texts



Now run 'parse_document' to extract data from local file

In [None]:
filepath = "./raw-data/61-65 Certifications.pdf"

# parse document
pdf_content = parse_document(filepath)

In [None]:
pdf_content

We successfully retrieved the content from the PDF document.

Now this content needs to be broken down into chunks that will fit into the context window of our llm.

We will use spaCy to break text into sentences. Its an NLP library, therefore, it will be more accurate than splitting by: text.split(". ")

In [None]:
from spacy.lang.en import English

# initialize model and sentincizer once 
nlp = English()
nlp.add_pipe("sentencizer")

def chunk_content(content):
    for item in tqdm(content):
        # Process the text to get sentences
        doc = nlp(item["text"])
        item["sentences"] = list(doc.sents)

        # Convert sentences to strings
        item["sentences"] = [str(sentence) for sentence in item["sentences"]]

        # Count the sentences
        item["page_sentence_count_spacy"] = len(item["sentences"])

    return content


In [None]:
pdf_content = chunk_content(pdf_content)

# inspect sample
random.sample(pdf_content, k=1)


In [None]:
# split size
sentences_in_chunk = 10

# recursively split list into desired sizes
def split_list(input_list: list,
               slice_size: int) -> list[list[str]]:
    """
    Split the input_list into sublists of size slice_size (as close as possible)
    """

    return [input_list[i:i + slice_size] for i in range(0, len(input_list), slice_size)]

# looper through pages and text and split sentences into chunks
for item in tqdm(pdf_content):
    item["sentence_chunks"] = split_list(input_list=item["sentences"],
                                          slice_size=sentences_in_chunk)
    item["num_chunks"] = len(item["sentence_chunks"])

In [None]:
# sample example from group
random.sample(pdf_content, k=1)

In [None]:
import re

# split each chunk into its own item
pages_and_chunks = []
for item in tqdm(pdf_content):
    for sentence_chunk in item["sentence_chunks"]:
        chunk_dict = {}
        chunk_dict["page_number"] = item["page_number"]

        # join sentences together into a paragraph-like structure, aka a chunk (single string)
        joined_sentence_chunk = "".join(sentence_chunk).replace("  ", " ").strip()
        joined_sentence_chunk = re.sub(r'\.([A-Z])', r'. \1', joined_sentence_chunk) # ".A" -> ". A" for any full stop/capital letter combo
        chunk_dict["sentence_chunk"] = joined_sentence_chunk

        # get stats about chunk
        chunk_dict["chunk_char_count"] = len(joined_sentence_chunk)
        chunk_dict["chunk_word_count"] = len([word for word in joined_sentence_chunk.split(" ")])
        chunk_dict["chunk_token_count"] = len(joined_sentence_chunk) / 4 # 1 token = ~4 char

        pages_and_chunks.append(chunk_dict)

In [None]:
# view random sample
random.sample(pages_and_chunks, k=1)

remove smaller embeddings

In [None]:
import pandas as pd

df = pd.DataFrame(pages_and_chunks)

# show random chunks with under 10 tokens in length
min_token_length = 10
for row in df[df["chunk_token_count"] <= min_token_length].sample(1).iterrows():
    print(f'Chunk token count: {row[1]["chunk_token_count"]} | Text: {row[1]["sentence_chunk"]} | Page number: {row[1]["page_number"]}')

pages_and_chunks_over_min_token_len = df[df["chunk_token_count"] > min_token_length].to_dict(orient="records")
pages_and_chunks_over_min_token_len[:2]

**Embedding text chunks**

In [None]:
from sentence_transformers import SentenceTransformer

embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2",
                                      device="cpu") # choose device to load model to

# Notes: this will embed using local computing power. Learn more about the benefits (if any)
# of computing in the cloud

# Make sure the model is on the CPU
embedding_model.to("cpu")

# Embed each chunk one by one
for item in tqdm(pages_and_chunks_over_min_token_len):
    item["embedding"] = embedding_model.encode(item["sentence_chunk"])

In [None]:
pages_and_chunks_over_min_token_len[0]

**Save to file**

In [None]:
# Save to file
text_chunks_and_embeddings_df = pd.DataFrame(pages_and_chunks_over_min_token_len)
embeddings_df_save_path = "text_chunks_and_embeddings_df.csv"
text_chunks_and_embeddings_df.to_csv(embeddings_df_save_path, index=False)

## Add to Pinecone

Using Pinecone will allow us to retrieve documents seamlessly. These embeddings will be continuously stored elsewhere so they can be retrieved whenever

First we will need to initialize a connection

In [None]:
api_key = os.getenv("PINECONE_API_KEY")

In [None]:
# Initialize connection
import dotenv
dotenv.load_dotenv()

from pinecone import Pinecone

# configure client
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

In [None]:
from pinecone import ServerlessSpec

cloud = os.environ.get('PINECONE_CLOUD') or 'aws'
region = os.environ.get('PINECONE_REGION') or 'us-east-1'

spec = ServerlessSpec(cloud=cloud, region=region)

In [None]:
index_name = "rag-retriever-v2"

In [None]:
# check if index already exists
if index_name not in pc.list_indexes().names():
    # if does not exist, create index
    pc.create_index(
        index_name,
        dimension=768,
        metric="cosine",
        spec=spec,
    )
#connect to index
index = pc.Index(index_name)
# view index stats
index.describe_index_stats()

In [None]:
# create ids for embeddings
for item in range(len(pages_and_chunks_over_min_token_len)):
    pages_and_chunks_over_min_token_len[item]["ids"] = str(item)

In [None]:
pages_and_chunks_over_min_token_len[1]

In [None]:
from time import sleep
batch_size = 100 # amount of embeddings to create and insert at once

for i in tqdm(range(0, len(pages_and_chunks_over_min_token_len), batch_size)):
    # find end of batch
    i_end = min(len(pages_and_chunks_over_min_token_len), i+batch_size)
    meta_batch = pages_and_chunks_over_min_token_len[i:i_end]
    # get ids
    ids_branch = [x["ids"] for x in meta_batch]
    # get text to encode
    text_branch = [x["sentence_chunk"] for x in meta_batch]
    # get embedding
    embeddings = [x["embedding"] for x in meta_batch]

    # clean metadata
    meta_batch = [{
        "text": x["sentence_chunk"],
        "ids": x["ids"],
        "page_number": x["page_number"],
        "chunk_char_count": x["chunk_char_count"],
        "chunk_word_count": x["chunk_word_count"],
        "chunk_token_count": x["chunk_token_count"],
    } for x in meta_batch]
    # upsert to pinecone
    to_upsert = list(zip(ids_branch, embeddings, meta_batch))
    index.upsert(vectors=to_upsert)

These are now uploaded into pincone with the necessary metadata.

We can confirm the accuracy between the notebook variables and pinecone by searching for ids

Embedding our query:

In [None]:
query = "what do i need to sign for a student to solo?"
res = embedding_model.encode(query)

xq = res.tolist()

res = index.query(vector=xq, top_k=2, include_metadata=True)

In [None]:
res

We can retrieve relevant documents by finding items that are similar to our query.

Now, we will put this into a retrieval function

In [None]:
limit = 3750

def retrieve(query):
    res = embedding_model.encode(query)

    # retrieve from Pinecone
    xq = res.tolist()

    # get relevant documents
    res = index.query(vector=xq, top_k=2, include_metadata=True)
    contexts = [
        x['metadata']['text'] for x in res['matches']
    ]

    # build our prompt with the retrieved context included
    prompt_start = (
        "Answer the question based on the context below. \n\n"+
        "Context: \n"
    )
    prompt_end = (
        f"\n\nQuestion: {query}\nAnswer:"
    )
    # append context until hitting limit
    for i in range(1, len(contexts)):
        if len("\n\n---\n\n".join(contexts[:i])) >= limit:
            prompt = (
                prompt_start + 
                "\n\n---\n\n".join(contexts[:i]) +
                prompt_end
            )
            break
        elif i == len(contexts)-1:
            prompt = (
                prompt_start + 
                "\n\n---\n\n".join(contexts) + 
                prompt_end
            )
    return prompt

In [None]:
# first we retrieve relevant items from pinecone
query_with_context = retrieve(query)
print(query_with_context)

Using OpenAI for LLM

For our generator we will use OpenAI's API to create content. This cna be local down the road if needed, but for now this is sufficient.

In [None]:
from openai import OpenAI

In [None]:
client = OpenAI()

# Define the query as a list of message objects
query = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "what is an airplane?"}
]

completion = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=query
)

print(completion.choices[0].message.content)

In [None]:
def complete(prompt):
    query = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ]

    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=query
    )

    return completion.choices[0].message.content


In [None]:
# test function
query = "what do i need to sign for a student to solo?"

print(complete(query))

In [None]:
print(complete(query_with_context))