# 1. Generate Chunks & Summaries

First step is to take input data in pdf (Harry Potter books in our case) and generate the **chunks** and **summaries** for it

## a). Import required modules

We will import all the required modules for this notebook

In [18]:
import pymupdf
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import json
from config import *
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables import RunnableParallel
import torch
import json
import torch
import pandas as pd
import argparse
import os
import json
from time import time
from sentence_transformers import SentenceTransformer, util
from transformers import BitsAndBytesConfig, AutoTokenizer, AutoModelForCausalLM
from transformers.cache_utils import DynamicCache, OffloadedCache, QuantizedCache
import random
import gc

## b). Initalize llm and prompt

In [2]:
llm = ChatOpenAI(temperature=0, openai_api_key = OPEN_AI_KEY, model_name = MODEL_NAME_PARENT)

embeddings = OpenAIEmbeddings(api_key=OPEN_AI_KEY)


prompt_template = """Write a concise summary of the following:
                    {text}
                    CONSCISE SUMMARY:
                    """

  embeddings = OpenAIEmbeddings(api_key=OPEN_AI_KEY)


## c). Load Data

We will be loading the harry potter pdf using pymupdf module

In [None]:
def load_data(data_path: str) -> pymupdf.Document:
    """
        Reading a PDF Document using PyMuPdf.
    """
    doc = pymupdf.open(data_path)
    return doc

data = load_data("./harrypotter.pdf")

## d). Generating Chunks

We will be breaking the document into chunks of around 10k tokens and store it as a dictionary. We will also persist it as a json.

In [None]:
def generate_chunks(data: pymupdf.Document, chunk_size: int = 10000) -> dict:
    """
        Creating chunks of fixed size and store as a json.
    """
    text = ""
    chunk = 0
    final_dict = {}
    for page in data:
        text = text + "\n" +  page.get_text()
        if len(text.split()) > chunk_size:
            final_dict[f"chunk_{chunk}"] = text
            chunk +=1
            text = ""
    
    with open("./chunks.json", 'w') as file:  
        json.dump(final_dict, file)
        
    return final_dict

chunk_dictionary = generate_chunks(data, 10000)

## e). Generate Summaries

Now for each chunk we will be generating its summaries and storing it as dictionary. We will also persist it as json.

In [None]:
def generate_chunk_summaries(chunk_dictionary: dict, prompt: str) -> dict:
    """
        For each chunk, generate summary and store as a json.
    """
    prompt = PromptTemplate(template=prompt_template, input_variables=["text"])
    chain = LLMChain(llm=llm, prompt=prompt)

    summary_dict = {}
    
    for i in range(len(chunk_dictionary)):
        summary = chain.run(chunk_dictionary[f"chunk_{i}"])
        summary_dict[f"chunk_{i}"] = summary

    with open("./summary.json", 'w') as file:  
        json.dump(summary_dict, file)

    return summary_dict

summary_dictionary = generate_chunk_summaries(chunk_dictionary, prompt_template)

# 2. Generate K-V Cache

Once we have the dictionaries ready, we will use the **chunks** (not the summaries) to create their **K-V Cache**.

## a). Initialize the tokenizer and model

In [14]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_CHILD, token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME_CHILD,
            torch_dtype=torch.float16,
            device_map="auto",
            token=HF_TOKEN
        )

## b). Load the chunks dictionary

We will load the chunks dictionary that we just created

In [4]:
def load_json_data(data_path: str) -> dict:
    """
        funtion to load a json based on the string path provided
    """
    with open(data_path, 'r') as file:  
        final_dict = json.load(file)
    return final_dict

chunks_dictionary = load_json_data("./chunks.json")

## c). Iterate to create Dynamic Cache

Next, we will iterate through the chunk dictionary one by one and generate its k-v cache. For each chunk we will do following things

We will first create a **prompt instruction**. Here we will pass the chunk in a structured prompt format

In [None]:
answer_instruction = "Answer the question with a super short answer."

# We will take the very first chunk for example purpose.
chunk = chunks_dictionary[list(chunks_dictionary.keys())[0]]
chunk_name = list(chunks_dictionary.keys())[0]
knowledges = f"""
                <|begin_of_text|>
                <|start_header_id|>system<|end_header_id|>
                You are an assistant for giving short answers based on given context.<|eot_id|>
                <|start_header_id|>user<|end_header_id|>
                Context information is bellow.
                ------------------------------------------------
                {chunk}
                ------------------------------------------------
                {answer_instruction}
                Question:
                Summarize the entire document while keeping all the keypoints intact.
                """

We will then use the **model** we selected to create the **k-v cache** for this chunk

In [None]:
# Select the device
embed_device = model.model.embed_tokens.weight.device
print(f"device selected - {embed_device}")

input_ids = tokenizer.encode(knowledges, return_tensors="pt").to(embed_device)
past_key_values = OffloadedCache()

with torch.no_grad():
    outputs = model(
        input_ids=input_ids,
        past_key_values=past_key_values,
        use_cache=True,
        output_attentions=False,
        output_hidden_states=False
    )

Finally, we will **save the k-v cache in disk**

In [None]:
# Extract the cache from model
kv = outputs.past_key_values

# Extract Keys
key_cache = kv.key_cache

# Extract Values
value_cache = kv.value_cache

# Extract the device on which training done
original_device = kv.original_device

# Save everything
torch.save(key_cache, f"{chunk_name}_key.pt")
torch.save(value_cache, f"{chunk_name}_value.pt")
torch.save(original_device, f"{chunk_name}_od.pt")

Now that we know, how individual chunk cache is being generated and stored, lets see how to **iterate over all the chunks** and create its Dynamic Cache. Remember, this step will take some amount of disk memory. So make sure to have **atleast 50 GB** extra free space

In [None]:
def find_gpu_allocation():
    """
        Function to find the gpu usage
    """
    allocated = torch.cuda.memory_allocated() / 1024**2
    reserved = torch.cuda.memory_reserved() / 1024**2
    print(f"Memory Allocated: {allocated}, Memory Reserved: {reserved}")

def preprocess_knowledge(
    model: AutoModelForCausalLM,
    tokenizer: AutoTokenizer,
    prompt: str,
) -> DynamicCache:
    """
    Prepare knowledge kv cache for CAG.
    Args:
        model: HuggingFace model with automatic device mapping
        tokenizer: HuggingFace tokenizer
        prompt: The knowledge to preprocess, which is basically a prompt

    Returns:
        DynamicCache: KV Cache
    """
    print("Before Embedding Step:")
    find_gpu_allocation()
    embed_device = model.model.embed_tokens.weight.device
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(embed_device)
    past_key_values = OffloadedCache()
    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            past_key_values=past_key_values,
            use_cache=True,
            output_attentions=False,
            output_hidden_states=False
        )
    print("After Caching Step:")
    find_gpu_allocation()
    result = outputs.past_key_values
    
    # Follow below steps to clean the GPU memory
    outputs.past_key_values = None
    del outputs.past_key_values
    del outputs
    del input_ids
    del embed_device
    del model
    del past_key_values
    torch.cuda.empty_cache()
    gc.collect()
    
    print("After Deletion of Everything Step:")
    find_gpu_allocation()
    
    return result

def write_kv_cache(kv: DynamicCache, chunk):
    """
    Write the KV Cache to a file.
    """
    key_cache = kv.key_cache
    value_cache = kv.value_cache
    original_device = kv.original_device
    torch.save(key_cache, f"./chunk_caches/{chunk}_key.pt")
    torch.save(value_cache, f"./chunk_caches/{chunk}_value.pt")
    torch.save(original_device, f"./chunk_caches/{chunk}_od.pt")
    # torch.save(kv, f"./chunk_caches/test.pt")


def prepare_kvcache(documents, answer_instruction: str = None, chunk = None):
    # Prepare the knowledges kvcache

    if answer_instruction is None:
        answer_instruction = "Answer the question with a super short answer."
    knowledges = f"""
    <|begin_of_text|>
    <|start_header_id|>system<|end_header_id|>
    You are an assistant for giving short answers based on given context.<|eot_id|>
    <|start_header_id|>user<|end_header_id|>
    Context information is bellow.
    ------------------------------------------------
    {documents}
    ------------------------------------------------
    {answer_instruction}
    Question:
    Summarize the entire document while keeping all the keypoints intact.
    """
    # Get the knowledge cache
    t1 = time()
    kv = preprocess_knowledge(model, tokenizer, knowledges)
    print("kvlen: ", kv.key_cache[0].shape[-2])
    write_kv_cache(kv, chunk)
    t2 = time()
    return kv, t2 - t1

def dynamic_cache_creator(knowledges, chunk):
    answer_instruction = None
    knowledge_cache, prepare_time = prepare_kvcache(knowledges, answer_instruction=answer_instruction, chunk=chunk)
    kv_len = knowledge_cache.key_cache[0].shape[-2]
    print(f"KVcache prepared in {prepare_time} seconds")
    return knowledge_cache, prepare_time, kv_len

dynamic_cache_dict = {}

for i, (chunk, content) in enumerate(chunks_dictionary.items()):
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()
    gc.collect()
    print("*********")
    print(f"iteration - {i}")
    print("token length: ", len(content.split()))
    knowledge_cache, prepare_time, kv_len = dynamic_cache_creator(content, chunk)

print("KV cache generated successfully.")

# 3. Generate Embeddings

Now that we have created the K-V Cache, another aspect that will be used for answer generation is the Embeddings of the Chunk Summaries


## a). Load Chunk Summaries

We will load the chunk summaries that we persisted as a json in step 1

In [5]:
summary_dictionary = load_json_data("./summary.json")

## b). Create a Vector Store

Based on the summaries, we will **create the vectorstore** in **Chroma DB**.

In [8]:
def create_and_initialize_retriever(summary_dict):
    id_key = "doc_id"
    doc_ids = list(summary_dict.keys())
    summary_texts = [Document(page_content=s, metadata={id_key: doc_ids[i]}) for i, s in enumerate(summary_dict.values())]
    vectorstore = Chroma.from_documents(documents=summary_texts, embedding=embeddings)

    return vectorstore

vectorstore = create_and_initialize_retriever(summary_dictionary)

# 4. Generate Answers

Now that we have all the things ready - the cache and the vector store, we will move on to the answer generator step

## a). Fetch Correct Chunks

Based on the query of the user, we will search the Vector DB to find the **most relevant chunk** which has the answer

In [9]:
def fetch_correct_chunk(query, vectorstore):

    embedding_vector = embeddings.embed_query(query)
    docs = vectorstore.similarity_search_by_vector(embedding_vector)

    chunk = docs[0].metadata["doc_id"]

    return chunk

query = "Who is Nagini?"

chunk_name = fetch_correct_chunk(query, vectorstore)

print(f"The extracted chunk is - {chunk_name}")

The extracted chunk is - chunk_99


## b). Extract the correct k-v cache

Based on the chunk name, we will extract its **Dynamic Cache**

In [10]:
def initialize_knowledge_cache(chunk):
    knowledge_cache = OffloadedCache()
    knowledge_cache.key_cache = torch.load(f"./chunk_caches/{chunk}_key.pt", weights_only=False)
    knowledge_cache.value_cache = torch.load(f"./chunk_caches/{chunk}_value.pt", weights_only=False)
    knowledge_cache.prefetch_stream = torch.cuda.Stream()
    knowledge_cache.original_device = torch.load(f"./chunk_caches/{chunk}_od.pt", weights_only=False)
    return knowledge_cache

knowledge_cache = initialize_knowledge_cache(chunk_name)

## c). Use KV Cache to generate answer embeddings

First we will create a **structured prompt** for user query

In [11]:
# Query we have already defined in the extraction of embeddings phase
generation_prompt = f"""
    {query}
    Give very concise answer. In max one sentence
    <|eot_id|>
    <|start_header_id|>assistant<|end_header_id|>
    """

Next, we will **convert the prompt to its tokens**

In [15]:
input_ids = tokenizer.encode(generation_prompt, return_tensors="pt").to(model.device)

Now, we will generate an **embeddings reponse**, from the model

In [16]:
def generate(
    model,
    input_ids: torch.Tensor,
    past_key_values,
    max_new_tokens: int = 300
) -> torch.Tensor:
    """
    Generate text with greedy decoding.

    Args:
        model: HuggingFace model with automatic device mapping
        input_ids: Input token ids
        past_key_values: KV Cache for knowledge
        max_new_tokens: Maximum new tokens to generate
    """

    embed_device = model.model.embed_tokens.weight.device

    origin_ids = input_ids
    input_ids = input_ids.to(embed_device)

    output_ids = input_ids.clone()
    next_token = input_ids

    with torch.no_grad():
        for _ in range(max_new_tokens):
            outputs = model(
                input_ids=next_token, 
                past_key_values=past_key_values,
                use_cache=True
            )
            next_token_logits = outputs.logits[:, -1, :]
            next_token = next_token_logits.argmax(dim=-1).unsqueeze(-1)
            next_token = next_token.to(embed_device)

            past_key_values = outputs.past_key_values

            output_ids = torch.cat([output_ids, next_token], dim=1)

            if next_token.item() in model.config.eos_token_id:
                break
    return output_ids[:, origin_ids.shape[-1]:]

answer_embeddings = generate(model, input_ids, knowledge_cache)

Lastly, we will **decode this reponse back to text**

In [17]:
generated_text = tokenizer.decode(answer_embeddings[0], skip_special_tokens=True, temperature=0.5)
print(generated_text)

 Nagini is a snake that was once a Horcrux created by Albus Dumbledore to protect his son Ariana, but was later used by Gellert Grindelwald to gain power, and was later used by Voldemort to kill Ariana Dumbledore.
