Install Packages

In [None]:
%pip install llama-index langchain python-dotenv fastembed qdrant_client sentence_transformers llama-index-llms-mistralai langchain_community ragas
%pip install -U bitsandbytes

Import Modules

In [None]:
import os
import json
import re
from llama_index.core.schema import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import SimpleDirectoryReader

import logging
from dotenv import load_dotenv
from fastembed import SparseTextEmbedding
from qdrant_client import QdrantClient, models
from tqdm import tqdm
from typing import List, Union
from sentence_transformers import CrossEncoder

from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain import HuggingFaceHub
from langchain.callbacks import get_openai_callback

from datetime import datetime


Load Configuration File

In [3]:
load_dotenv()

cwd = os.path.dirname(os.getcwd())
with open(os.path.join(cwd,"config","global_config.json")) as f:
    gconf = json.load(f)

Qdrant_URL = gconf['qdrant_URL']
Collection_Name = gconf['Collection_Name'][7]

Create Documents or Nodes

In [None]:
class CustomTransformation:
    def __call__(self, documents):
        transformed_documents = []
        for doc in documents:
            transformed_content = doc.get_content().lower()
            transformed_content = re.sub(r'\s+', ' ', transformed_content)
            transformed_content = re.sub(r'[^\w\s]', '', transformed_content)
            transformed_documents.append(Document(text=transformed_content, metadata=doc.metadata))
        return transformed_documents
    
def Sentence_Splitter_docs_into_nodes(all_documents):
    try:
        splitter = SentenceSplitter(
            chunk_size=int(gconf['chunk_size']),
            chunk_overlap=int(gconf['chunk_overlap'])
        )

        nodes = splitter.get_nodes_from_documents(all_documents)

        return nodes

    except Exception as e:
        print(f"Error splitting documents into nodes: {e}")
        return []

def save_nodes(nodes, output_file):
    try:
        # Create the directory if it does not exist
        os.makedirs(os.path.dirname(output_file), exist_ok=True)

        # Convert the TextNode objects to dictionaries
        nodes_dict = [node.dict() for node in nodes]

        with open(output_file, 'w') as file:
            json.dump(nodes_dict, file, indent=4)
        print(f"Saved nodes to {output_file}")
    except Exception as e:
        print(f"Error saving nodes to file: {e}")

if __name__ == '__main__':
    try:
        # Load data from directory
        documents = SimpleDirectoryReader(input_dir=os.path.join(cwd,gconf['input_directory'])).load_data()
        print(f"Loaded {len(documents)} documents")

        if documents:
            # Apply custom transformation
            custom_transform = CustomTransformation()
            documents = custom_transform(documents)

            # Split documents into nodes
            nodes = Sentence_Splitter_docs_into_nodes(documents)

            print(f"Created {len(nodes)} nodes")

            # Save nodes to a single JSON file
            output_file = os.path.join(cwd,gconf['node_json'])
            save_nodes(nodes, output_file)

        else:
            print("No documents to process.")

    except Exception as e:
        print(f"Error processing documents: {e}")

Create Qdrant Collection and Insert Documents

In [None]:
class QdrantIndexing:
    """
    A class for indexing documents using Qdrant vector database.
    """

    def __init__(self) -> None:
        """
        Initialize the QdrantIndexing object.
        """
        self.data_path = os.path.join(cwd,gconf['node_json'])
        self.sparse_embedding_model = SparseTextEmbedding(model_name=gconf['fastembed_sparse_model'])
        self.qdrant_client = QdrantClient(
                            url=Qdrant_URL)
        self.metadata = []
        self.documents = []
        logging.info("QdrantIndexing object initialized.")

    def load_nodes(self, input_file):
        """
        Load nodes from a JSON file and extract metadata and documents.

        Args:
            input_file (str): The path to the JSON file.
        """
        with open(input_file, 'r') as file:
            self.nodes = json.load(file)

        for node in self.nodes:
            self.metadata.append(node['metadata'])
            self.documents.append(node['text'])

        logging.info(f"Loaded {len(self.nodes)} nodes from JSON file.")

    def client_collection(self):
        """
        Create a collection in Qdrant vector database.
        """
        if not self.qdrant_client.collection_exists(collection_name=f"{Collection_Name}"): 
            self.qdrant_client.create_collection(
                collection_name= Collection_Name,
                vectors_config={
                     'dense': models.VectorParams(
                         size=384,
                         distance = models.Distance.COSINE,
                     )
                },
                sparse_vectors_config={
                    "sparse": models.SparseVectorParams(
                              index=models.SparseIndexParams(
                                on_disk=False,              
                            ),
                        )
                    }
            )
            logging.info(f"Created collection '{Collection_Name}' in Qdrant vector database.")

    def create_sparse_vector(self, text):
        """
        Create a sparse vector from the text using BM42 approach.
        """
        # Generate the sparse vector using BM42
        embeddings = list(self.sparse_embedding_model.embed([text]))[0]

        # Check if embeddings has indices and values attributes
        if hasattr(embeddings, 'indices') and hasattr(embeddings, 'values'):
            sparse_vector = models.SparseVector(
                indices=embeddings.indices.tolist(),
                values=embeddings.values.tolist()
            )
            return sparse_vector
        else:
            raise ValueError("The embeddings object does not have 'indices' and 'values' attributes.")


    def documents_insertion(self):
        points = []
        for i, (doc, metadata) in enumerate(tqdm(zip(self.documents, self.metadata), total=len(self.documents))):
            # Generate sparse embeddings
            sparse_vector = self.create_sparse_vector(doc)

            # Create PointStruct
            point = models.PointStruct(
                id=i,
                vector={
                    'sparse': sparse_vector,
                },
                payload={
                    'text': doc,
                    **metadata  # Include all metadata
                }
            )
            points.append(point)

        # Upsert points
        self.qdrant_client.upsert(
            collection_name=Collection_Name,
            points=points
        )

        logging.info(f"Upserted {len(points)} points with sparse vectors into Qdrant vector database.")

    
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    indexing = QdrantIndexing()
    indexing.load_nodes(indexing.data_path)
    indexing.client_collection()
    indexing.documents_insertion()

Retriever Class - Sparse Search Technique with Fastembed Embedding Model

In [4]:
class Sparse_search():
    """
    class for performing Sparse search using sparse embeddings.
    """

    def __init__(self) -> None:
        """
        Initialize the Sparse_search object with sparse embedding models and a Qdrant client.
        """
        self.sparse_embedding_model = SparseTextEmbedding(model_name=gconf['fastembed_sparse_model'])
        self.qdrant_client = QdrantClient(
            url=Qdrant_URL,
            timeout=30
        )

    def metadata_filter(self, file_names: Union[str, List[str]]) -> models.Filter:
        
        if isinstance(file_names, str):
            # Single file name
            file_name_condition = models.FieldCondition(
                key="file_name",
                match=models.MatchValue(value=file_names)
            )
        else:
            # List of file names
            file_name_condition = models.FieldCondition(
                key="file_name",
                match=models.MatchAny(any=file_names)
            )
        print("file_name_condition",file_name_condition)
        return models.Filter(
            must=[file_name_condition]
        )

    def query_sparse_search(self, query, metadata_filter=None, limit=5):
        
        # Embed the query using the sparse embedding model
        sparse_query = list(self.sparse_embedding_model.embed([query]))[0]
        
        results = self.qdrant_client.query_points(
            collection_name=Collection_Name,
            prefetch=[
                models.Prefetch(
                    query=models.SparseVector(indices=sparse_query.indices.tolist(), values=sparse_query.values.tolist()),
                    using="sparse",
                    limit=limit,
                ),
            ],
            query_filter=metadata_filter,
            query=models.FusionQuery(fusion=models.Fusion.RRF),
        )
        
        # Extract the document number, score, and text from the payload of each scored point
        documents = [point.payload['text'] for point in results.points]

        return documents

ReRanker Technique

In [5]:
class reranking():
    def __init__(self) -> None:
        # Load the CrossEncoder model
        self.model = CrossEncoder(gconf['reranker_model'])

    def rerank_documents(self, query, documents):
        # Compute the similarity scores between the query and each document
        scores = self.model.predict([(query, doc) for doc in documents])

        # Sort the documents based on their similarity scores
        ranked_documents = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)

        # Select the top 2 documents
        top_documents = [doc for doc, score in ranked_documents[:2]]

        return top_documents

LLM Prompt Engineering

In [6]:
class prompt_template_generation():
    def __init__(self) -> None:
        self.search = Sparse_search()
        self.reranker = reranking()
        self.prompt_str = """You are an AI assistant specializing in explaining complex topics related to Retrieval-Augmented Generation(RAG). Your task is to provide a clear, concise, and informative explanation based on the following context and query.

        Context:
        {context_str}

        Query: {query_str}

        Please follow these guidelines in your response:
        1. Start with a brief overview of the concept mentioned in the query.
        2. Provide at least one concrete example or use case to illustrate the concept.
        3. If there are any limitations or challenges associated with this concept, briefly mention them.
        4. Conclude with a sentence about the potential future impact or applications of this concept.

        Response: Your explanation should be informative yet accessible, suitable for someone with a basic understanding of RAG. If the query asks for information not present in the context, please state that you don't have enough information to provide a complete answer, and only respond based on the given context.
        Output: Remember output should be only the predicted answer.
        """
        self.prompt_tmpl = PromptTemplate(template=self.prompt_str, input_variables=["context_str","query_str"])

    def prompt_generation(self, query: str, filename: str):
        metadata_filter = self.search.metadata_filter(filename)
        cont_st_time = datetime.now() 
        results = self.search.query_sparse_search(query, metadata_filter)
        cont_ed_time = datetime.now()
        cont_time = round((cont_ed_time - cont_st_time).total_seconds(),2)

        rank_st_time = datetime.now() 
        reranked_documents = self.reranker.rerank_documents(query, results)
        rank_ed_time = datetime.now()
        rank_time = round((rank_ed_time - rank_st_time).total_seconds(),2)
        
        context = "/n/n".join(reranked_documents)
        
        prompt_templ = self.prompt_tmpl

        return prompt_templ, context, cont_time, rank_time



Define LLM Model

In [7]:
def llm_model():
    os.environ['HUGGINGFACEHUB_API_TOKEN'] = gconf['huggingface_token']
    hugllm = HuggingFaceHub(repo_id=gconf['hugging_gen_model'],task=gconf['hugging_gen_task'],
                                                model_kwargs={"num_beams":3,"top_k":1,"temperature":float(gconf['hugging_gen_temperature']),
                                                            "max_new_tokens":int(gconf['hugging_gen_max_token'])})
    return hugllm

LLM Inference

In [8]:
def inference(inputdata):
    outobj = prompt_template_generation()
    input_pdffiles = os.listdir(os.path.join(cwd,gconf['input_directory']))
    prompt_tmpl, contextpmt, cont_time, rank_time = outobj.prompt_generation(query=inputdata, filename=input_pdffiles)
    
    hllm = llm_model()
    llm_chain = LLMChain(prompt=prompt_tmpl, 
                        llm=hllm)

    def count_tokens(chain, query, cont):
        with get_openai_callback() as cb:
            gen_st_time = datetime.now()
            resp = chain.run({"context_str":cont,"query_str":query})
            gen_ed_time = datetime.now()
            gentime = round((gen_ed_time - gen_st_time).total_seconds(),2)
            print(resp)
            print(f'Spent a total of {cb.total_tokens} tokens')
            print(f"Prompt Tokens: {cb.prompt_tokens}")
            print(f"Completion Tokens: {cb.completion_tokens}")
            print(f"Total Cost (USD): ${cb.total_cost}")
        
        resp = resp.split('\n        \n')[-1].replace('\n',' ')

        return resp, gentime
    res, gentime = count_tokens(llm_chain, inputdata, contextpmt)

    datarec = {
        "question": [inputdata],
        "answer": [res],
        "contexts": [[contextpmt]]
    }
    return datarec, cont_time, gentime, rank_time

Test retriever and generation for complex queries

In [None]:
inputdata = "what is the formula for Property tax liability?"
#inputdata = "what is property tax meaning?"
#inputdata = "Connecticut is not plagued by?"
#inputdata = "What are the components for determining the property tax liabilities?"
#inputdata = "What is the homestead value for real property used for Minnesota Analysis?"
#inputdata = "What are the two cities where assessment limits reduce taxes by 60 percent?"
#inputdata = "what is the formula to calculate the Net Tax Bill?"

st_time = datetime.now()
response, cont_time, gentime, ranktime = inference(inputdata)
ed_time = datetime.now()
response_time = round((ed_time - st_time).total_seconds(),2)
print("Generation Response:",response)
print(f"Context_Retrieval_Latency:    {cont_time} secs")
print(f"ReRanking_Latency:            {ranktime} secs")
print(f"RESPONSE_Generation_Latency:  {gentime} secs")
print(f"Overall_RESPONSE_TIME:        {response_time} secs")

%reset_selective -f hugllm
%reset_selective -f hllm
%reset_selective -f resp