## Validating FAISS vector database library

In [None]:
import faiss
import numpy as np

In [None]:
dimension = 6
index = faiss.IndexFlatL2(dimension)

In [None]:
numVectors = 5
vectors = np.random.rand(numVectors, dimension).astype('float32')

In [None]:
index.add(vectors)
print(f"Number of vectors in index: {index.ntotal}")

In [None]:
nlist = 5 # number of centroids
quantizer = faiss.IndexFlatL2(dimension)
indexIVF = faiss.IndexIVFFlat(quantizer, dimension, nlist)
if not indexIVF.is_trained:
    indexIVF.train(vectors)
indexIVF.add(vectors)

In [None]:
queryVector = np.random.rand(1, dimension).astype('float')
k = 5
distances, indices = index.search(queryVector, k)
print(f"Distances: {distances}")
print(f"Indices of nearest neighbors: {indices}")

In [None]:
print(f"Query: {queryVector}")

In [None]:
print("Reference vectors")
print(indices)
for ind in indices[0]:
    referenceVector = vectors[ind]
    l2_distance = np.linalg.norm(referenceVector - queryVector)
    print(f"{referenceVector}: {l2_distance} compared to {distances[0][ind]}")

## Utilizing PyMuPDF to extract text from PDF for vector encoding

In [None]:
import fitz

In [None]:
import re

In [None]:
import nltk
nltk.download('punkt_tab')

In [None]:
def dilate_array(arr, kernel_size=1):
    n = len(arr)
    result = np.zeros_like(arr)
    for i in range(n):
        start = max(0, i - kernel_size)
        end = min(n, i + kernel_size + 1)
        window = arr[start:end]
        result[i] = 1.0 if np.any(window == 1) else 0.0
    return result

def erode_array(arr, kernel_size=1):
    n = len(arr)
    result = np.zeros_like(arr)
    for i in range(n):
        start = max(0, i - kernel_size)
        end = min(n, i + kernel_size + 1)
        window = arr[start:end]
        result[i] = 1.0 if np.all(window == 1) else 0.0
    return result

In [None]:
# Empirically, this seems to work across a few different types of PDF files
# With more investigation, I would try turning this into a DP problem where up to some set percentage (maybe 15%) CAN be classified 
# as an "Introduction" section, thus allowing each page to bid for their slot and disincentivizing other pages.
def GetTableOfContentsEstimator(linksPerPage, pageCount, lookback=5):
    if pageCount <= lookback: return np.zeros(pageCount)
    # look for local maxima in sliding window over linksPerPage to identify candidates for 'table of contents' sections
    tableOfContentsEstimator = np.zeros(pageCount - lookback)
    averageLinks = np.mean(linksPerPage)
    for i in range(lookback, len(linksPerPage)):
        pastAverage = np.mean(linksPerPage[(i - lookback):i])
        current = linksPerPage[i]
        if current <= pastAverage:
            dropRatio = max((pastAverage - current), 1) / (pastAverage + 1e-6)
            magnitude = pastAverage / (pastAverage + averageLinks + 1e-6)
            frontBias = np.exp(-(5.0/pageCount) * (i - lookback)) # bias towards zero at the end of the array
            tableOfContentsEstimator[i - lookback] = dropRatio * magnitude * frontBias
        else:
            tableOfContentsEstimator[i - lookback] = 0.0
    binaryResult = np.where(tableOfContentsEstimator >= 0.5, 1, 0)
    # perform morphological closing with k=1
    binaryResult = dilate_array(erode_array(binaryResult, 1), 1)
    # return last position where binaryResult == 1, this is likely the final page of the introduction / table of contents
    for i in range(len(binaryResult) - 1, -1, -1):
        if binaryResult[i] == 1:
            return i + lookback

In [None]:
doc = fitz.open("Observability-Engineering.pdf")
pageCount = doc.page_count

# metadata
linksPerPage = []
blocksPerPage = []
linksToPage = {}
maxPageContentDimensions = [np.inf, np.inf, -np.inf, -np.inf]
# once-over to compute metadata before storing any paragraph info
for pageIndex in range(doc.page_count):
    page = doc.load_page(pageIndex)
    blocks = page.get_text("blocks")
    links = page.get_links()
    linksPerPage.append(len(links))
    blocksPerPage.append(len(blocks))
    for block in blocks:
        maxPageContentDimensions[:2] = np.minimum(maxPageContentDimensions[:2], block[:2])
        maxPageContentDimensions[2:] = np.maximum(maxPageContentDimensions[2:4], block[2:4])
    for link in links:
        if not 'page' in link: continue
        linkTo = link['page']
        linksToPage[linkTo] = linksToPage.get(linkTo, 0) + 1

# approximate which pages may be part of the introduction / table of contents
lastTableOfContentsPage = GetTableOfContentsEstimator(linksPerPage, pageCount, 5) # underlying array looks something like [1,1,1,1,0,0,0,0,0,0,0,0,0,0]
print(lastTableOfContentsPage)

# estimate which are the most likely content pages
if len(linksToPage) != 0:
    startingContentPage = np.min(list(linksToPage.keys()))
    endingContentPage = np.max(list(linksToPage.keys()))
    likelyContentPages = np.zeros(pageCount, dtype=int)
    likelyContentPages[startingContentPage:endingContentPage] = 1
else:
    likelyContentPages = np.ones(pageCount, dtype=int)
print(likelyContentPages)

# get metric data on blocks per page
averageBlocksPerPage = np.mean(blocksPerPage)
print(averageBlocksPerPage)

all_sentences = []
for pageIndex in range(doc.page_count):
    page = doc.load_page(pageIndex)
    blocks = page.get_text("blocks")
    if lastTableOfContentsPage and pageIndex < lastTableOfContentsPage:
        continue
    if not likelyContentPages[pageIndex]:
        if len(blocks) == 0 or len(blocks) < averageBlocksPerPage / 2:
            continue
    for block in blocks:
        paragraph_text = block[4].strip().replace("\n", " ")
        if paragraph_text.isnumeric() or len(paragraph_text) < 10: continue # hacky, but remove all very-short phrases as they're likely not substantial content
        all_sentences.append(paragraph_text)
doc.close()

In [None]:
print(all_sentences)

In [None]:
# group adjacent sentences depending on punctuation
def sentence_groups(lines):
    group = []
    for w in lines:
        strippedW = w.strip()
        parts = re.split(r'([.?!])', strippedW)
        for part in parts:
            if part == '.' or part == '?' or part == '!' and group:
                yield group
                group = []
            if not part.isnumeric() and len(part) >= 10:
                group.append(part)
    if group:
        yield group

In [None]:
def parse_sentences(text_list):
    full_text = ' '.join(text_list)
    sentences = nltk.tokenize.sent_tokenize(full_text)
    return [sent.strip() for sent in sentences if len(sent.strip()) > 10]

sentencesProcessed = parse_sentences(all_sentences)
for sentence in sentencesProcessed:
    print(sentence)
    print('----')

In [None]:
#simplifiedSentences = [' '.join(group) for group in sentence_groups(all_sentences)]
#for sentence in simplifiedSentences:
#    print(sentence)
#    print('---')

## Utilizing SBERT library to perform semantic encoding of chunks

In [None]:
from sentence_transformers import SentenceTransformer

In [None]:
import torch

In [None]:
model = SentenceTransformer("all-MiniLM-L6-v2")

In [None]:
corpus_embeddings = model.encode_document(sentencesProcessed, convert_to_tensor=True)

In [None]:
queries = [
    "What are the eligibility requirements for Medicare home health services?",
    "What qualifies a patient as homebound under CMS guidelines?",
    "What skilled nursing services are considered reasonable and necessary?",
    "What conditions must be met for physical therapy to be covered?",
    "What must be included in the physician’s plan of care?",
    "What are the requirements for the physician face-to-face encounter?",
    "What documentation is needed to prove medical necessity for skilled services?",
    "How should changes to the plan of care be documented during the episode?",
    "Under what conditions are home health aide services covered?",
    "What are the supervision requirements for home health aides?"
]

In [None]:
queries = [
    "What types of monitoring are needed for companies that run a large portion of their own systems on low-level hardware?",
    "Describe some advantages of test-driven development regarding upkeep of a software product.",
    "How can observability be coupled with a development effort to prevent rolling back deployments?"
]

In [None]:
top_k = 5
for query in queries:
    query_embedding = model.encode_query(query, convert_to_tensor=True)

    # We use cosine-similarity and torch.topk to find the highest 5 scores
    similarity_scores = model.similarity(query_embedding, corpus_embeddings)[0]
    scores, indices = torch.topk(similarity_scores, k=top_k)

    print("\nQuery:", query)
    print("Top 5 most similar sentences in corpus:")

    for score, idx in zip(scores, indices):
        context = sentencesProcessed[idx-2:idx+2]
        print(f"(Score: {score:.4f})", context)
