# Store posts to Vector Store FAISS

Goal here is to ask questions related to content of the blog post material. 
To achieve this, we need to use Retrieval-Augmented-Generation (RAG) and create prompts that contain relevant context along with the query.

In the previous [notebook](scrape_blogpost.ipynb) we extracted blog post content data from AWS Machine Learning blogs.

In this notebook, we'll clean, split, encode and store the extracted blog post contents to a Vector Store ([FAISS](https://github.com/facebookresearch/faiss))

In [1]:
import re
from pathlib import Path
from typing import Dict, List

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from bs4 import BeautifulSoup
from langchain.docstore.document import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores.faiss import FAISS
from nltk.tokenize import sent_tokenize
from pydantic import BaseModel
from rich import print
from torch import Tensor
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer

import faiss

In [2]:
# Root directory where all extracted blog post files are
DATADIR = Path("./data")

# Use rglob to recursively find all parquet files
file_list = list(DATADIR.rglob("*.parquet"))

In [3]:
file_list[0:15]

[PosixPath('data/aws/ml_blog_posts/rss/democratize-computer-vision-defect-detection-for-manufacturing-quality-using-no-code-machine-learning-with-amazon-sagemaker-canvas.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/retain-original-pdf-formatting-to-view-translated-documents-with-amazon-textract-amazon-translate-and-pdfbox.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/integrate-saas-platforms-with-amazon-sagemaker-to-enable-ml-powered-applications.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/recommend-and-dynamically-filter-items-based-on-user-context-in-amazon-personalize.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/use-proprietary-foundation-models-from-amazon-sagemaker-jumpstart-in-amazon-sagemaker-studio.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/use-the-aws-cdk-to-deploy-amazon-sagemaker-studio-lifecycle-configurations.parquet'),
 PosixPath('data/aws/ml_blog_posts/rss/deploy-a-serverless-ml-inference-endpoint-of-large-language-models-using-fastapi-aws-lamb

## Extract Metadata from the stored parquet file

Each parquet file is loaded to a pandas dataframe. Dataframe has the following columns
- Title
- Authors
- Published Date
- Tags
- Content
- URL
- Image URLs (only when extracted via BeautifulSoup)

We extract all columns, except Content, and prepare a *metadata* dictionary object.
This *metadata* dictionary will be used when constructing Langchain's *document* object (`langchain.docstore.document.Document`)

### Encode and store processed text chunks to Vector Store

Define text embedding model to encode the text chunks and store it to FAISS Vector Store

We use **intfloat/e5-large-v2** model from huggingface as our embedding model.

#### Save extracted doc embeddings to FAISS Index Locally

- Here we make use of LangChains `FAISS.from_documents` helper function to ingest docs to FAISS with text embedding model.
- If Index already exists on disk then skips creating a new index.
- Here we do NOT use `TextSplitter` or `CharacterSplitter` as we have already split our documents with `create_langchain_documents` function.

In [12]:
# Function to clean text of html tags, URLs and special characters
def clean_text(text):
    # Remove HTML tags
    text = BeautifulSoup(text, "html.parser").get_text()

    # Remove URLs
    text = re.sub(r"http[s]?://\S+", "", text)

    # Lowercase the text
    text = text.lower()

    # Remove punctuation and special characters (optional)
    # text = re.sub(r"\W", " ", text)

    # Substitute multiple spaces with a single space
    text = re.sub(r"\s+", " ", text).strip()

    return text


# Function to split content to paragraphs and then to passages
def create_passages(
    df: pd.DataFrame,
    passage_window: int = 4,
) -> List[str]:
    # Extract blog post content as Corpus
    corpus = clean_text(df.Content[0])

    # Split content as paragraphs and then into passages
    paragraphs = []
    for paragraph in corpus.replace("\r\n", "\n").split("\n\n"):
        if len(paragraph.strip()) > 0:
            paragraphs.append(sent_tokenize(paragraph.strip()))

    # We combine up to 3 sentences into a passage.
    # You can choose smaller or larger values for window_size
    # Smaller value: Context from other sentences might get lost
    # Lager values: More context from the paragraph remains, but results are longer

    window_size = passage_window
    passages = []
    for paragraph in paragraphs:
        for start_idx in range(0, len(paragraph), window_size):
            end_idx = min(start_idx + window_size, len(paragraph))
            passages.append(" ".join(paragraph[start_idx:end_idx]))

    # print("Paragraphs: ", len(paragraphs))
    # print("Sentences: ", sum([len(p) for p in paragraphs]))
    # print("Passages: ", len(passages))
    return passages


# Function to ingest passages into a faiss index
def ingest_passages_to_faiss(
    docs: List[str], faiss_index: faiss.IndexFlatL2, index_filename: str
):
    for doc in tqdm(docs, total=len(docs), desc="Ingesting passages"):
        embeddings = encode_e5(doc)
        faiss_index.add(embeddings)

    print(f"Saving index to {index_filename}")
    faiss.write_index(faiss_index, index_filename)


def encode_e5(corpus: str, normalize: bool = True) -> Tensor:
    model_id = "intfloat/e5-large-v2"
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = AutoModel.from_pretrained(model_id)

    def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor:
        last_hidden = last_hidden_states.masked_fill(
            ~attention_mask[..., None].bool(), 0.0
        )
        return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]

    # tokenize input
    tokenized_input = tokenizer(
        corpus,
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )

    with torch.no_grad():
        outputs = model(**tokenized_input)
        embeddings = average_pool(
            outputs.last_hidden_state, tokenized_input["attention_mask"]
        )

    if normalize:
        # (Optionally) normalize embeddings
        embeddings = F.normalize(embeddings, p=2, dim=1)
    # print(type(embeddings))
    # print(embeddings.shape)
    # convert tensor to numpy array as faiss index accepts vectors as ndarrays
    embeddings = embeddings.numpy()
    # print(type(embeddings))
    # print(embeddings.shape)
    return embeddings


def get_embedding_dimensions(model_id: str) -> int:
    model = AutoModel.from_pretrained(model_id)

    # The embedding dimension is the size of the hidden states
    embedding_dimension = model.config.hidden_size
    return embedding_dimension


def save_index(index, filename):
    """
    Save the FAISS index to a file.
    """
    faiss.write_index(index, filename)


# Helper function to extract metadata from dataframe
def extract_metadata(df) -> Dict:
    data = {}
    for column in df.columns:
        for value in df[column]:
            if str(column) != "Content":
                if isinstance(value, np.ndarray):
                    data[column] = value.tolist()
                else:
                    data[column] = value

    # Update the key 'url' to 'source'
    data["source"] = data.pop("URL")

    # convert values to lowercase, replace spaces with underscore
    metadata = {}
    for key, value in data.items():
        new_key = key.lower().replace(" ", "_")
        metadata[new_key] = value
    return metadata


# Function to split content to paragraphs and then to passages
def create_langchain_documents(
    df: pd.DataFrame,
    passage_window: int = 4,
) -> List[Document]:
    # Extract blog post content as Corpus
    corpus = df.Content[0]

    # Split content as paragraphs and then into passages
    paragraphs = []
    for paragraph in corpus.replace("\r\n", "\n").split("\n\n"):
        if len(paragraph.strip()) > 0:
            paragraphs.append(sent_tokenize(paragraph.strip()))

    # We combine up to 3 sentences into a passage.
    # You can choose smaller or larger values for window_size
    # Smaller value: Context from other sentences might get lost
    # Lager values: More context from the paragraph remains, but results are longer

    window_size = passage_window
    passages = []
    for paragraph in paragraphs:
        for start_idx in range(0, len(paragraph), window_size):
            end_idx = min(start_idx + window_size, len(paragraph))
            passages.append(" ".join(paragraph[start_idx:end_idx]))

    # print("Paragraphs: ", len(paragraphs))
    # print("Sentences: ", sum([len(p) for p in paragraphs]))
    # print("Passages: ", len(passages))
    docs = list()
    num_passages = len(passages)

    metadata = extract_metadata(df)
    for i, passage in enumerate(passages):
        # Create a copy of the metadata dictionary
        metadata_copy = metadata.copy()
        # Add current passage # as a new key-value pair
        metadata_copy["passage"] = f"{i+1}/{num_passages}"
        docs.append(Document(page_content=clean_text(passage), metadata=metadata_copy))

    return docs

### Extract and Prepare data for encoding (embedding vector)


To retrieve (search) relevant chunks of text for a give question, we perform a similarity search to find relevant chunks of text that are similar to the question.
As our problem here is a "asymmetric search" problem, we'll need to choose a text embedding model that performs well for Retrieval tasks.

We read the parquet file to a pandas dataframe and then extract all columns, except Content, as metadata.
Read the "Content" of the blog_post as corpus and then split the content as paragraphs and then to passages comprising of 3 sentences.
We then encode these passage chunks with an appropriate text embedding model and then store these embeddings into a vector store.

In [9]:
# Use rglob to recursively find all parquet files
file_list = list(DATADIR.rglob("*.parquet"))
INDEX_DIR = Path("./faiss")
INDEX_DIR.mkdir(exist_ok=True, parents=True)

# Define index name and index fullpath
INDEX_NAME = "aws_ml_blog_posts"
INDEX_FULLPATH = INDEX_DIR.joinpath(f"{INDEX_NAME}.faiss").absolute()

if INDEX_FULLPATH.exists():
    # Load the index from the file
    index = faiss.read_index(str(INDEX_FULLPATH))
    print(f"Loaded index from {INDEX_FULLPATH}")
else:
    # Create a new FAISS index
    dimension = get_embedding_dimensions("intfloat/e5-large-v2")
    print(f"Creating index: {INDEX_NAME} with dimensions {dimension}")
    index = faiss.IndexFlatL2(dimension)
    faiss.write_index(index, str(INDEX_FULLPATH))

In [14]:
INDEX_DIR = Path("./faiss")
INDEX_DIR.mkdir(exist_ok=True, parents=True)

# Define index name and index fullpath
INDEX_NAME = "aws_ml_blog_posts"
INDEX_FULLPATH = INDEX_DIR.joinpath(f"{INDEX_NAME}.faiss").absolute()

MODEL_PATH = "intfloat/e5-base-v2"


class EmbedConfig(BaseModel):
    embedding_model_name: str = MODEL_PATH
    embedding_model_kwargs: Dict = {"device": "cpu"}
    encode_kwargs: Dict = {"normalize_embeddings": True}
    faiss_index_name: str = INDEX_NAME


def get_embedding_model(Config: BaseModel):
    print(f"Using embedding model: {Config.embedding_model_name}")
    hf_embeddings = HuggingFaceEmbeddings(
        model_name=Config.embedding_model_name,
        model_kwargs=Config.embedding_model_kwargs,
        encode_kwargs=Config.encode_kwargs,
    )
    return hf_embeddings


config = EmbedConfig()

# If index exists then load the local index
if INDEX_FULLPATH.exists():
    db = FAISS.load_local(
        INDEX_DIR,
        embeddings=get_embedding_model(Config=config),
        index_name=config.faiss_index_name,
    )
    print(f"Loaded existing index: {INDEX_DIR}/{INDEX_NAME}")
else:
    print("No local index found, creating index from docs")
    for parquet_file in file_list:
        print(f"Ingesting docs for {parquet_file}")
        loaded_df = pd.read_parquet(parquet_file, engine="pyarrow")
        docs = create_langchain_documents(loaded_df)
        db = FAISS.from_documents(docs, get_embedding_model(Config=config))
    print("Saving index locally")
    FAISS.save_local(db, INDEX_DIR, INDEX_NAME)

  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


  text = BeautifulSoup(text, "html.parser").get_text()


In [None]:
# Read each parquet file from file_list, create passages and ingest passages to faiss index
for parquet_file in file_list[0:3]:
    print(f"Ingesting passages for {parquet_file}")
    loaded_df = pd.read_parquet(parquet_file, engine="pyarrow")
    # passages = create_passages(loaded_df)
    # ingest_passages_to_faiss(passages, index, str(INDEX_FULLPATH))
    docs = create_langchain_documents(loaded_df)
    index = FAISS.from_documents(docs, get_embedding_model(Config=config))

In [None]:
# (Optional) Delete faiss index from disk
# Uncomment below code to delete index from disk
# if INDEX_FULLPATH.exists():
#     INDEX_FULLPATH.unlink()

### Evaluate results using similarity search

We first encode the query vector and using the same embedding model then we use the `db.similarity_search_by_vector()` function to search for similar docs to our query

In [17]:
# Prepare query
query = "query: retain pdf formatting"

# Encode query vector
query_vector = encode_e5(query)

results = db.similarity_search(query, k=5)

# Print results
print(results)