# RAG with watsonx

💡 For this lab, we will work on a RAG application that answers questions about a single PDF file to keep it simple. You can use the PDF files provided with this repository or bring your own file.

### Contents

1. [Setup](#setup_environment)
1. [PDF to Text](#pdf_text)
1. [Text to Chunks](#text_chunks)
1. [Text Embeddings](#text_embeddings)
1. [Semantic Search](#semantic_search)
    1. [Visualizing Semantic Search](visualizing_semantic_search)
    1. [Semantic Search function](semantic_search_function)
1. [Prompt Building](#prompt_building)
1. [Create the inference function](#inference_function)


<a id="setup_environment"></a>
## 1. Set up the environment

In [None]:
!pip install -U ibm-watson-machine-learning --quiet

In [None]:
credentials = {
    "url": "URL",
    "apikey": "API_KEY"
}

In [None]:
project_id = 'PROJECT_ID'

In [None]:
import re

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from chromadb.api.types import EmbeddingFunction

from langchain.document_loaders import PyPDFLoader
from sentence_transformers import SentenceTransformer

from sklearn.manifold import TSNE
from sklearn.neighbors import NearestNeighbors
from typing import Literal, Optional, Any

### 1.2 List available models

All avaliable models are presented under `ModelTypes` class.

In [None]:
from ibm_watson_machine_learning.foundation_models.utils.enums import ModelTypes

print([model.value for model in ModelTypes])

<a id="pdf_text"></a>
## 2. PDF to Text

To extract the text from the PDF file. We will also preprocess this text to remove line breaks and excessive spaces, to keep it concise and clean.

In [None]:
def pdf_to_text(path: str = "../documents/bank_faq_en.pdf", 
                start_page: int = 1, 
                end_page: Optional[int | None] = None) -> list[str]:
    """
    Converts PDF to plain text.

    Params:
        path (str): Path to the PDF file.
        start_page (int): Page to start getting text from.
        end_page (int): Last page to get text from.
    """
    loader = PyPDFLoader(path)
    pages = loader.load()

    if end_page is None:
        end_page = len(pages)

    text_list = []
    for i in range(start_page-1, end_page):
        text = pages[i].page_content
        text = text.replace('\n', ' ')
        text = re.sub(r'\s+', ' ', text)
        text_list.append(text)

    return text_list

In [None]:
text_list = pdf_to_text("../documents/bank_faq_en.pdf")
print(text_list)

<a id="text_chunks"></a>
## 3. Text to Chunks

After extracting and processing the text, the next step is to split it into equally distributed chunks.
Here, we will use a generic approach and set the maximum number of words in each chunk, evenly distributing the words among the chunks of text.

In [None]:
def text_to_chunks(texts: list[str], 
                   word_length: int = 100, 
                   start_page: int = 1) -> list[list[str]]:
    """
    Splits the text into equally distributed chunks.

    Args:
        texts (str): List of texts to be converted into chunks.
        word_length (int): Maximum number of words in each chunk.
        start_page (int): Starting page number for the chunks.
    """
    text_toks = [t.split(' ') for t in texts]
    chunks = []

    for idx, words in enumerate(text_toks):
        for i in range(0, len(words), word_length):
            chunk = words[i:i+word_length]
            if (i+word_length) > len(words) and (len(chunk) < word_length) and (
                len(text_toks) != (idx+1)):
                text_toks[idx+1] = chunk + text_toks[idx+1]
                continue
            chunk = ' '.join(chunk).strip() 
            # chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"'
            chunks.append(chunk)
            
    return chunks

In [None]:
chunks = text_to_chunks(text_list)

for chunk in chunks:
    print(chunk + '\n')

<a id="text_embeddings"></a>
## 4. Text Embeddings

Now it is time to convert those pieces of text into embeddings, represented as multidimensional vectors. To achieve this, we are using a high-quality model from Hugging Face.  This encodes text into high-dimensional vectors that can be used for text classification, semantic similarity, clustering and other natural language tasks.

In [None]:
%%time
# Load the model from TF Hub

class MiniLML6V2EmbeddingFunction(EmbeddingFunction):
    MODEL = SentenceTransformer('all-MiniLM-L12-v2')
    # MODEL = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
    def __call__(self, texts):
        return MiniLML6V2EmbeddingFunction.MODEL.encode(texts).tolist()
        
emb_function = MiniLML6V2EmbeddingFunction()

In [None]:
def get_text_embedding(texts: list[list[str]], 
                       batch: int = 1000) -> list[Any]:
        """
        Get the embeddings from the text.

        Args:
            texts (list(str)): List of chucks of text.
            batch (int): Batch size.
        """
        embeddings = []
        for i in range(0, len(texts), batch):
            text_batch = texts[i:(i+batch)]
            # Embeddings model
            emb_batch = emb_function(text_batch)
            embeddings.append(emb_batch)
        embeddings = np.vstack(embeddings)
        return embeddings

In [None]:
embeddings = get_text_embedding(chunks)

print(embeddings.shape)
print(f"Our text was embedded into {embeddings.shape[1]} dimensions")

<a id="semantic_search"></a>
## 5. Semantic Search

In this section we will fit our Nearest Neighbors algorithm, using the full-sized embeddings.

<a id="visualizing_semantic_search"></a>
### 5.1 Visualizing the Semantic Search

Since it is very hard for humans, to visualize more than three dimensions - imagine 384 then - we will reduce the dimensionality of our embeddings.

We will use the t-SNE algorithm to bring it down to two dimensions, allowing us to visualize our data points.

In [None]:
question = 'How many miles does black card grant?'
emb_question = emb_function([question])

In [None]:
tsne = TSNE(n_components=2, random_state=42, perplexity=5)

embeddings_with_question = np.vstack([embeddings, emb_question])
embeddings_2d = tsne.fit_transform(embeddings_with_question)

In [None]:
embeddings_2d.shape

In [None]:
def visualize_embeddings(embeddings_2d: np.ndarray, 
                         question: Optional[bool] = False, 
                         neighbors: Optional[np.ndarray] = None) -> None:
    """
    Visualize 384-dimensional embeddings in 2D using t-SNE, label each data point with its index,
    and optionally plot a question data point as a red dot with the label 'q'.

    Args:
        embeddings (numpy.array): An array of shape (num_samples, 384) containing the embeddings.
        question (numpy.array, optional): An additional 384-dimensional embedding for the question.
                                          Default is None.
    """

    embeddingsdf = pd.DataFrame()
    embeddingsdf['x'] = embeddings_2d[:,0]
    embeddingsdf['y'] = embeddings_2d[:,1]

    # Scatter plot the 2D embeddings and label each data point with its index
    plt.figure(figsize=(4, 4))
    
    if question:
        question_point = embeddingsdf.iloc[-1]
        plt.scatter(question_point.x, question_point.y, color='red', label='q', alpha=1)
        plt.annotate('q', xy=(question_point.x, question_point.y), xytext=(5, 2), textcoords='offset points', color='black')
        
    if neighbors is not None:
        if question:
            embeddingsdf = embeddingsdf[:-1]
        
        for i, row in embeddingsdf.iterrows():
            if i in neighbors:
                plt.scatter(row.x, row.y, color='purple', alpha=1)
                plt.annotate(str(i), xy=(row.x, row.y), xytext=(5, 2), textcoords='offset points', color='black')
            else:
                plt.scatter(row.x, row.y, color='blue', alpha=0.7)
                plt.annotate(str(i), xy=(row.x, row.y), xytext=(5, 2), textcoords='offset points', color='black')
    else:
        if question:
            embeddingsdf = embeddingsdf[:-1]

        for i, row in embeddingsdf.iterrows():
                plt.scatter(row.x, row.y, color='blue', alpha=0.7)
                plt.annotate(str(i), xy=(row.x, row.y), xytext=(5, 2), textcoords='offset points', color='black')
        
    # Plot the question data point if provided
    plt.title('t-SNE Visualization of 384-dimensional Embeddings')
    plt.xlabel('Dimension 1')
    plt.ylabel('Dimension 2')
    plt.show()

In [None]:
visualize_embeddings(embeddings_2d[:-1])

In [None]:
visualize_embeddings(embeddings_2d[:-1], True)

In [None]:
nn_2d = NearestNeighbors(n_neighbors=3)
nn_2d.fit(embeddings_2d[:-1])

In [None]:
neighbors = nn_2d.kneighbors(embeddings_2d[-1].reshape(1, -1), return_distance=False)
neighbors

In [None]:
visualize_embeddings(embeddings_2d, True, neighbors)

<a id="semantic_search_function"></a>
### 5.2 Semantic Search function

As t-SNE is a non-linear algorithm and we lose some information during this process, we will not use the 2-dimensional vectors - those were used solely for visualization purposes.

In [None]:
nn = NearestNeighbors(n_neighbors=3)
nn.fit(embeddings)

In [None]:
def get_chunks(question, chunks, nn):
    emb_question = emb_function([question])

    neighbors = nn.kneighbors(emb_question, return_distance=False)
    
    topn_chunks = [chunks[i] for i in neighbors.tolist()[0]]
    
    return topn_chunks

In [None]:
topn_chunks = get_chunks(question='Do all cards grant Mastercard Airport Experience?', chunks=chunks, nn=nn)
print(topn_chunks)

<a id="prompt_building"></a>
## 6. Prompt Building

Now, it is time to build our prompt. 

In [None]:
def build_prompt(question, chunks, nn):
    prompt = ""
    prompt += "Context: \n '''"
    
    topn_chunks = get_chunks(question=question, chunks=chunks, nn=nn)
    
    for c in chunks:
        prompt += c + ''

    prompt += "'''\n\n"
    
    prompt += "Your task is to answer the question using the context given delimited with '''.\n"\
            "- Don't add any additional information.\n"\
            "- If you don't have an answer to the question, respond with \"I didn't find an answer to this question in my knowledge base.\"\n"\
            "- The answer should be short and concise\n"
    
    prompt += f"\n\nQuestion: {question}\n\nAnswer: "
    
    return prompt

In [None]:
prompt = build_prompt(question, chunks=chunks, nn=nn)
print(prompt)

<a id="inference_function"></a>
## 7. Create the inference function

In this section we define the inference function. 

In [None]:
from ibm_watson_machine_learning.foundation_models import Model

class GenAI:
    def __init__(self, credentials, project_id):
        self.credentials = credentials
        self.project_id = project_id

    def model(self, model_id, parameters):
        self.model = Model(
            	model_id = model_id,
	            params = parameters,
	            credentials = self.credentials,
	            project_id = self.project_id
        )

    def generate(self, prompt_input):
        response = self.model.generate_text(prompt=prompt_input)
        return response

You might need to adjust model `parameters` for different models or tasks, to do so please refer to documentation under `GenTextParamsMetaNames` class.

In [None]:
from ibm_watson_machine_learning.metanames import GenTextParamsMetaNames as GenParams
from ibm_watson_machine_learning.foundation_models.utils.enums import DecodingMethods

parameters = {
    GenParams.DECODING_METHOD: DecodingMethods.GREEDY,
    GenParams.MAX_NEW_TOKENS: 150,
    GenParams.MIN_NEW_TOKENS: 30,
    GenParams.STOP_SEQUENCES: ["."],
    GenParams.REPETITION_PENALTY: 1.5
}

In [None]:
genai = GenAI(credentials=credentials, project_id=project_id)

genai.model(model_id='meta-llama/llama-2-70b-chat', parameters=parameters)

In [None]:
question = "How many miles does black card grant?"

prompt = build_prompt(question, chunks=chunks, nn=nn)
print(prompt)

In [None]:
generated_response = genai.generate(prompt)
print(generated_response)