In [None]:
!pip install langchain_community sentence-transformers rank_bm25 ragatouille google-generativeai

In [1]:
import os
import pandas as pd
from langchain_community.document_loaders import DataFrameLoader
from typing import List, Optional, Tuple
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_core.runnables import ConfigurableField
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.utils import DistanceStrategy
from ragatouille import RAGPretrainedModel
import keras_nlp
from langchain.prompts.prompt import PromptTemplate
from typing import Tuple, List, Optional
import re
import openai
import os
import time
from IPython.display import Markdown
import google.generativeai as genai
from google.colab import userdata

# Set environment variables
os.environ["KERAS_BACKEND"] = "jax"  # Or "torch" or "tensorflow".
os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "1.00"  # Avoid memory fragmentation on JAX backend.

api_key = 'AIzaSyCJGotdlGY4Sonjve-ezUlygSfgJT1q6Mo'

genai.configure(api_key=api_key)


# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Load data
data = pd.read_csv('/content/drive/My Drive/RAG/rag_data.csv')
data.head()
data['LLM_context'] = (
    "Outer Key: " + data['Outer Key'] +
    ",\nInner Key: " + data['Inner Key'] +
    ",\nValue: " + data['Value'] +
    ",\nDescription: " + data['Descriptions']
)
loader = DataFrameLoader(data, page_content_column="LLM_context")
docs = loader.load()

# Constants
EMBEDDING_MODEL_NAME = "BAAI/bge-base-en-v1.5"
CHUNK_SIZE = 512  # We choose a chunk size adapted to our model

def split_documents(chunk_size: int, knowledge_base: List[Document], tokenizer_name: Optional[str] = EMBEDDING_MODEL_NAME) -> List[Document]:
    text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
        AutoTokenizer.from_pretrained(tokenizer_name),
        chunk_size=chunk_size,
        chunk_overlap=int(chunk_size / 10),
        add_start_index=True,
        strip_whitespace=True,
    )

    docs_processed = []
    for doc in knowledge_base:
        docs_processed += text_splitter.split_documents([doc])

    unique_texts = {}
    docs_processed_unique = []
    for doc in docs_processed:
        if doc.page_content not in unique_texts:
            unique_texts[doc.page_content] = True
            docs_processed_unique.append(doc)

    return docs_processed_unique

chunked_docs = split_documents(CHUNK_SIZE, docs, tokenizer_name=EMBEDDING_MODEL_NAME)
embedding_model = HuggingFaceEmbeddings(
    model_name=EMBEDDING_MODEL_NAME,
    multi_process=True,
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True},  # set True for cosine similarity
)
num_docs = 5  # Default number of documents to retrieve

bm25_retriever = BM25Retriever.from_documents(chunked_docs).configurable_fields(
    k=ConfigurableField(
        id="search_kwargs_bm25",
        name="k",
        description="The search kwargs to use",
    )
)

faiss_vectorstore = FAISS.from_documents(
    chunked_docs, embedding_model, distance_strategy=DistanceStrategy.COSINE
)

faiss_retriever = faiss_vectorstore.as_retriever(
    search_kwargs={"k": num_docs}
).configurable_fields(
    search_kwargs=ConfigurableField(
        id="search_kwargs_faiss",
        name="Search Kwargs",
        description="The search kwargs to use",
    )
)

vector_database = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]  # Adjust the weight of each retriever in the EnsembleRetriever
)
reranker = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
prompt_template = """
"When given a user query, search for the row in the data where the 'Inner Key' or the 'Descriptions' matches the query. Once you find the matching row, return the answer from the corresponding value from the 'Value' column and 'Descriptions' column"

Example:

User Query: "How to test Contactor Sequence?"
Model Response: "You can test Contactor Sequence using 'V' Command. It is used for testing and simulating all contactors in a required charging sequence, the sequence simulates the contactor behavior when exiting the chargeloop.


CONTEXT:
{context}

QUESTION:
{question}

ANSWER:
"""

RAG_PROMPT_TEMPLATE = PromptTemplate(
    input_variables=["context", "question"],
    template=prompt_template,
)

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_rerank(question, page_contents, k):
    return reranker.rerank(question, tuple(page_contents), k=k)

def answer_with_rag(question: str, knowledge_index: EnsembleRetriever, reranker: Optional[RAGPretrainedModel] = None, num_retrieved_docs: int = 5, num_docs_final: int = 5) -> Tuple[str, List[Document]]:
    config = {"configurable": {"search_kwargs_faiss": {"k": num_retrieved_docs}, "search_kwargs_bm25": num_retrieved_docs}}
    relevant_docs = knowledge_index.invoke(question, config=config)
    relevant_docs = [doc.page_content for doc in relevant_docs]

    if reranker:
        relevant_docs = cached_rerank(question, tuple(relevant_docs), k=num_docs_final)
        relevant_docs = [doc["content"] for doc in relevant_docs]

    relevant_docs = relevant_docs[:num_docs_final]
    context = relevant_docs[0] if relevant_docs else ""
    final_prompt = RAG_PROMPT_TEMPLATE.format(context=context, question=question)

    # Use Google's Gemini model for generation
    system_instructions = "You are a helpful assistant."
    model_name = 'gemini-1.5-flash'  # or 'gemini-1.0-pro', 'gemini-1.5-pro'
    temperature = 0.5
    stop_sequence = ''
    model = genai.GenerativeModel(model_name, system_instruction=system_instructions)
    config = genai.GenerationConfig(temperature=temperature, stop_sequences=[stop_sequence])
    response = model.generate_content(contents=[final_prompt], generation_config=config)
    answer = response.text.strip()
    print(answer)

    return answer, relevant_docs

def extract_keys_values(text):
    outer_key_pattern = r'Outer Key:\s*(.*?),'
    inner_key_pattern = r'Inner Key:\s*(.*?),'
    value_pattern = r'Value:\s*(.*?),'
    description_pattern = r'Description:\s*(.*)'

    outer_key_match = re.search(outer_key_pattern, text)
    inner_key_match = re.search(inner_key_pattern, text)
    value_match = re.search(value_pattern, text)
    description_match = re.search(description_pattern, text)

    outer_key = outer_key_match.group(1).strip() if outer_key_match else None
    inner_key = inner_key_match.group(1).strip() if inner_key_match else None
    value = value_match.group(1).strip() if value_match else None
    description = description_match.group(1).strip() if description_match else None

    return outer_key, inner_key, value, description

while True:
    print("\033[1mPlease enter your question or type 'exit' to quit:\033[0m", end=" ")
    question = input().strip()  # Change from hardcoded question to input from user
    if question.lower() == 'exit':
        print("\033[1mGoodbye!\033[0m")
        break

    answer, relevant_docs = answer_with_rag(question, vector_database, reranker)
    outer_key, inner_key, value, descriptions = extract_keys_values(answer)

    display(Markdown("### Answer"))
    print("\033[1mCategory:\033[0m", outer_key)
    print("\033[1mTask:\033[0m", inner_key)
    print("\033[1mCommand:\033[0m", value)
    print("\033[1mDescription:\033[0m", descriptions)

