<a href="https://colab.research.google.com/github/istiaquehussain/ML-Staging/blob/main/RAG_as_service.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pypdf
!pip install panda
!pip install google.generativeai
!pip install pinecone-client
!pip install langchain
!pip install langchain_community
!pip install langchain-pinecone
!pip install langchain-google-genai

In [None]:
import json
import glob
import os
import pandas as pd
from google.colab import userdata
from pinecone import Pinecone, ServerlessSpec
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import PyPDFLoader, TextLoader, CSVLoader
from langchain_text_splitters import CharacterTextSplitter
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain_pinecone import PineconeVectorStore


In [None]:
class RAG:
  #util = None
  #llm = None
  #vector_store = None
  #emabdding = None
  #extractor = None
  #loader = None

  def __init__(self,llm,vector_store,extractor,util):
    self.util = util
    self.llm = llm
    self.vector_store = vector_store
    self.emabdding = llm.get_emabdding()
    self.extractor = extractor
    pass
  def set_util(self,util):
    self.util = util

  def get_util(self):
    return self.util

  def set_llm(self,llm):
    self.llm

  def get_llm(self):
    return self.llm

  def set_vector_store(self,vector_store):
    self.vector_store = vector_store

  def get_vector_store(self):
    return self.vector_store

  def set_emabdding(self,emabdding):
    self.emabdding = emabdding

  def get_emabdding(self):
    return self.emabdding

  def set_extractor(self,extractor):
    self.extractor=extractor

  def get_extractor(self):
    return self.extractor

  def set_loader(self,loader):
    self.extractor=loader

  def get_loader(self):
    return self.loader

  def create_sanitised_csv(self,csv_file_path_with_name:str,colums_to_return:list,total_row=0,file_name_extn="_sanitized"):
    return self.util.sanitise_csv(csv_file_path_with_name,colums_to_return,total_row,file_name_extn)

  def extract_csv(self,csv_file_path_with_name:str):
    return self.extractor.extract_csv(csv_file_path_with_name)

  def extract_pdf(self,pdf_file_path_with_name:str):
    return self.extractor.extract_pdf(pdf_file_path_with_name)

  def extract_text(self,text_file_path_with_name:str):
    return self.extractor.extract_text(text_file_path_with_name)

  def insert(self, documents,chunk_size=100):
    self.vector_store.insert(documents,chunk_size)

  def find(self,query_string,total_docs:int,meta:dict):
    retriever_result = self.vector_store.find(query_string,total_docs,meta)
    return retriever_result

  def query(self,template,retriever_result,query):
    return self.llm.query(template,retriever_result,query)

  def query_with_docs(self,template_string,docs,query):
    return self.llm.query_with_docs(template_string,docs,query)


  def load(self,load_request):
    load_request=self.util.generate_load_request(load_request)
    if(load_request['request_type']=="csv"):
      self.load_csv(load_request['uri'],load_request['meta'])
    elif(load_request['request_type']=="pdf"):
      self.load_pdf(load_request['uri'],load_request['meta'])

  def load_csv(self,csv_file_path_with_name:str,meta:dict):
    documents = self.extract_csv(csv_file_path_with_name)

    if(meta):
      for document in documents:
        document.metadata.update(meta)

    self.insert(documents)

  def load_pdf(self,pef_file_path_with_name:str,meta:dict):
    documents = self.extract_pdf(pef_file_path_with_name)
    if(meta):
      for document in documents:
        document.metadata.update(meta)

    self.insert(documents)

  def generate(self,query_request,should_log=False):
     query_request=self.util.generate_query_request(query_request)
     if(query_request['query']):
        query=query_request['query']
        total_docs="0"
        template_meta={'type':'all'}
        if(query_request['total_records']):
          total_docs=str(query_request['total_records'])
        total_docs = int(total_docs)
        if(query_request['template_type']):
          template_type=query_request['template_type']
        meta=query_request['meta']
        response = self.generate_response(query,total_docs,meta,template_meta,should_log)
        return response
     else:
      return None

  def generate_response(self,query,total_docs:int,vector_meta:dict,template_meta:dict,should_log=False):
     if(should_log):
      print(f"query->{query},total_docs->{total_docs},vector_meta->{vector_meta},template_meta->{template_meta}")
     docs = self.find(query,total_docs,vector_meta)
     if(should_log):
      for doc in docs:
        print("---------------------------------")
        print(f"doc->{doc.page_content}")
     if(docs and len(docs)>0):
        templates=self.llm.get_templates()
        if(should_log):
          print(f"templates->{templates}")
        template=templates[template_meta['type']]
        if(should_log):
          print(f"template->{template}")
        res = self.query_with_docs(template,docs,query)
        if(should_log):
          print(f"res->{res}")
        return res
     else:
      return None


In [None]:
class Util:

   def __init__(self):
        pass

   def sanitise_csv(self,csv_file_path_with_name:str,colums_to_return:list,total_row=0,file_name_extn="_sanitized",)->str:
        df=pd.read_csv(csv_file_path_with_name,usecols=colums_to_return)

        if(total_row>0):
          df=df.iloc[:total_row]

        sanitized_df=self.sanitise_csv_custom(df,colums_to_return)
        #sanitized_df.to_csv(csv_file_path_with_name,index=False)

        directory = os.path.dirname(csv_file_path_with_name)
        file_name = os.path.basename(csv_file_path_with_name)

        # Split the file name and extension
        file_base, file_extension = os.path.splitext(file_name)

        # Create the new file name
        new_file_name = f"{file_base}{file_name_extn}{file_extension}"

        # Join the directory and new file name to get the new path
        new_path_name = os.path.join(directory, new_file_name)
        sanitized_df.to_csv(new_path_name,index=False)
        return new_path_name

   def sanitise_csv_custom(self,data_frame:pd,colums_to_return:list)->pd:
      data_to_sanitised = data_frame.copy(deep=True)

      data_sanitised = data_to_sanitised[colums_to_return]
      data_sanitised['Resolution'] = data_sanitised['Resolution'].fillna('No resolution comments')
      data_sanitised['Customer Satisfaction Rating'] = data_sanitised['Customer Satisfaction Rating'].fillna('No ratings')
      data_sanitised.fillna('',inplace=True)

      def format_ticket_description(row):
        row_content=row['Ticket Description'].replace('{product_purchased}',row['Product Purchased'])
        return row_content

      data_sanitised["Ticket Description Corrected"] = data_sanitised.apply(format_ticket_description, axis=1)

      data_sanitised.drop(['Ticket Description'],axis=1,inplace=True)
      data_sanitised.rename(columns={'Ticket Description Corrected':'Ticket Description'},inplace=True)
      data_sanitised['Ticket Description'] = data_sanitised['Ticket Description'].str.replace('\n', ' ', regex=True)
      return data_sanitised


   def transform_csv_to_vector(self,data:pd)->pd:
      data_frame = data.copy(deep=True)
      #data_frame['vector_transformed']=data_frame.apply(lambda row:row['vector'].toList,axis=1)
      data_frame['vector_transformed']=data_frame.apply(lambda row:json.loads(row['vector']),axis=1)
      data_frame.drop(['vector'],axis=1,inplace=True)
      data_frame.rename(columns={'vector_transformed':'vector'},inplace=True)
      return data_frame[['id','vector','data','meta']]

   def export_data_to_csv(self,data:pd,file_path:str,chunk_size:int):
      for i in range(0, len(data), chunk_size):
        chunk = data.iloc[i:i + chunk_size]
        file_name =f'output_chunk_{i // chunk_size + 1}.csv'
        chunk.to_csv(file_path+"/"+file_name, index=False)
   """ Load and query request format should be

   load_request={
      "request_type":"csv",
      "uri":"https://static.realpython.com/python-basics-sample-chapters.pdf",
      "meta":{
          "doc_type":"csv"
          }
      }
   query_request={
    "query":"list issues with xbox",
    "template_type":"csv",
    "total_records":4,
    "meta":{
        "doc_type":"csv"
        }
    }

  """
   def generate_load_request(self,load_request)->dict:
      if load_request.__class__ == dict:
        load_request_=json.loads(json.dumps(load_request,indent=4))
      else:
        load_request_=json.loads(load_request)
      request_type=load_request_.get('request_type',None)
      uri=load_request_.get('uri',None)
      meta = load_request_.get('meta',None)
      doc_type = None
      if(meta):
        doc_type=meta.get('doc_type',None)
      return {"uri":uri,"request_type":request_type,"meta":meta,"doc_type":doc_type}

   def generate_load_request_old(self,load_request)->dict:
      if load_request.__class__ == dict:
        load_request_=json.loads(json.dumps(load_request,indent=4))
      else:
        load_request_=json.loads(load_request)
      request_type=None
      uri=None
      meta = None
      doc_type = None
      if(load_request_['request_type']):
        request_type=load_request_['request_type']
      if(load_request_['uri']):
        uri=load_request_['uri']
      if(load_request_['meta']):
        meta=load_request_['meta']
        if(load_request_['meta']['doc_type']):
          doc_type=load_request_['meta']['doc_type']
      return {"uri":uri,"request_type":request_type,"meta":meta,"doc_type":doc_type}

   def generate_query_request(self,query_request)->dict:
      if query_request.__class__ == dict:
        query_request_=json.loads(json.dumps(query_request,indent=4))
      else:
        query_request_=json.loads(query_request)
      query=query_request_.get('query',None)
      total_docs=query_request_.get('total_records',None)
      template_type=query_request_.get('template_type',None)
      meta = query_request_.get('meta',None)
      doc_type = None
      if(meta):
        doc_type=meta.get('doc_type',None)
      return {"query":query,"total_records":total_docs,"template_type":template_type,"meta":meta,"doc_type":doc_type}



   def generate_query_request_old(self,query_request)->dict:
      if query_request.__class__ == dict:
        query_request_=json.loads(json.dumps(query_request,indent=4))
      else:
        query_request_=json.loads(query_request)
      query=None
      template_type=None
      meta = None
      doc_type = None
      total_docs = None
      if(query_request_['total_records']):
        total_docs=query_request_['total_records']
      if(query_request_['query']):
        query=query_request_['query']
      if(query_request_['template_type']):
        template_type=query_request_['template_type']
      if(query_request_['meta']):
        meta=query_request_['meta']
        if(query_request_['meta']['doc_type']):
          doc_type=query_request_['meta']['doc_type']
      return {"query":query,"total_records":total_docs,"template_type":template_type,"meta":meta,"doc_type":doc_type}

   def get_genai_template(self)->str:
    genai_query_template="""
      summarize below customer ticket system data :

      {context}

      always include 'Product Purchased','Ticket Description' and 'Customer Satisfaction Rating' in your response.
      """
    genai_query_template = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {query}
"""
    genai_query_template = """You are a data analyst. Given the following data of customer support tickets, provide a detailed summary of the information.Make sure to highlight key points and provide an overall summary of the data.

Data:
{context}

Summary:"""
    return genai_query_template

   def get_query_template_for_meta(self,meta_data:map)->str:

      qa_template = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {query}
"""
      summary_template = """You are a data analyst. Given the following data of customer support tickets, provide a detailed summary of the information.Make sure to highlight key points and provide an overall summary of the data.

Data:
{context}

Summary:"""
      template_meta ={"doc":qa_template,
                      "data":summary_template}

      return template_meta[meta_data['type']]

In [None]:
class VectorStore:
  DB_INDEDX_NAME = "vector-global"
  DB_DIMENTION = 768
  DB_METRIX = "cosine"
  store = None
  DB_API_KEY = None
  vectordb = None

  def __init__(self,db_index_name,db_dimention,db_metrix,api_key):
    self.DB_INDEDX_NAME = db_index_name
    self.DB_DIMENTION = db_dimention
    self.DB_METRIX = db_metrix
    self.DB_API_KEY = api_key
    pass

  def initialize(self,llm_emabdding):
      if(self.DB_API_KEY):
        self.vectordb = Pinecone(api_key=self.DB_API_KEY)
      else:
        print("please initialize DB_API_KEY")

      if(self.vectordb and self.DB_INDEDX_NAME and self.DB_DIMENTION and self.DB_METRIX):
        index_list = self.vectordb.list_indexes()
        index_names=[index['name'] for index in index_list]
        if self.DB_INDEDX_NAME not in index_names:
          self.create_table()
        self.store = PineconeVectorStore(index_name=self.DB_INDEDX_NAME, embedding=llm_emabdding,pinecone_api_key=self.DB_API_KEY)
      else:
          print("please initialize  vector db ")

  def create_table(self):
    if(self.vectordb and self.DB_INDEDX_NAME and self.DB_DIMENTION and self.DB_METRIX):
      self.vectordb.create_index(
          name=self.DB_INDEDX_NAME,
          dimension=self.DB_DIMENTION,
          metric=self.DB_METRIX,
          spec=ServerlessSpec(
              cloud="aws",
              region="us-east-1"
          )
        )
    else:
        print(f"please initialize  vectordb->{self.vectordb}, DB_INDEDX_NAME->{self.DB_INDEDX_NAME}, DB_DIMENTION->{self.DB_DIMENTION} , DB_METRIX->{self.DB_METRIX} ")

  def drop_table(self):
    if(self.vectordb and self.DB_INDEDX_NAME ):
      self.vectordb.delete_index(self.DB_INDEDX_NAME)
    else:
      print(f"please initialize vectordb->{self.vectordb}, DB_INDEDX_NAME->{self.DB_INDEDX_NAME}")

  def get_store(self):
    return self.store

  def insert(self, documents,chunk_size=100):
    if(self.store):
        total_docs=len(documents)
        for i in range(0, total_docs, chunk_size):
            chunk = documents[i:i + chunk_size]
            self.store.add_documents(chunk)
    else:
        print(f"please initialize vector_store->{self.vector_store}")

  def find(self,query_string,total_docs:int,meta:dict):
      docs = self.store.similarity_search(query=query_string,k=total_docs,filter=meta)
      return docs

In [None]:
class LLM:
  EMBEDING_MODEL_NAME = 'models/embedding-001'
  GENAI_MODEL_NAME = 'gemini-pro'
  LLM_API_KEY = None
  emabdding = None
  genai = None
  templates = None

  def __init__ (self,embadding_model_name,genai_model_name,llm_api_key,templates_json):
    self.EMBEDING_MODEL_NAME = embadding_model_name
    self.GENAI_MODEL_NAME = genai_model_name
    self.LLM_API_KEY = llm_api_key
    self.templates = templates_json
    pass

  def get_templates(self):
    return self.templates


  def initialize(self):
    if(self.LLM_API_KEY):
      self.emabdding = GoogleGenerativeAIEmbeddings(google_api_key=self.LLM_API_KEY,model=self.EMBEDING_MODEL_NAME)
      self.genai = ChatGoogleGenerativeAI(google_api_key=self.LLM_API_KEY,model=self.GENAI_MODEL_NAME)
    else:
      print("please initialize LLM_API_KEY")

  def get_genai(self):
    return self.genai

  def get_emabdding(self):
    return self.emabdding

  def query(self,template,retriever_result,query):
    prompt = PromptTemplate.from_template(template)
    combine_docs_chain = create_stuff_documents_chain(self.genai, prompt)
    combine_docs_chain.invoke
    return combine_docs_chain.invoke({"context": retriever_result,"input":query})

  def query_with_docs(self,template_string,docs,query):
    append = lambda x: x + "\n"
    contexts = [append(doc.page_content) for doc in docs]
    prompt_template = PromptTemplate(input_variables=["context", "input"],template=template_string)
    input_data = {"context": contexts,"input": query}
    prompt = prompt_template.format_prompt(**input_data)
    response = self.genai.invoke(prompt)
    return response

In [None]:
class Extractor:

    def __init__(self):
        pass

    def extract_csv(self,csv_file_path_with_name:str):
      loader = CSVLoader(csv_file_path_with_name)
      documents = loader.load()
      return documents

    def extract_pdf(self,pdf_file_path_with_name:str):
      loader = PyPDFLoader(pdf_file_path_with_name)
      text_splitter = CharacterTextSplitter(
          separator=".",
          chunk_size=800,
          chunk_overlap=80,
          length_function=len,
          is_separator_regex=False,
      )
      documents = loader.load()
      documents = text_splitter.split_documents(documents)
      return documents

    def extract_text(self,text_file_path_with_name:str):
      loader = TextLoader(text_file_path_with_name)
      text_splitter = CharacterTextSplitter(
          separator=".",
          chunk_size=250,
          chunk_overlap=50,
          length_function=len,
          is_separator_regex=False,
      )
      documents = loader.load()
      documents = text_splitter.split_documents(documents)
      return documents

In [None]:
class Template:
  qa_template = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {input}
"""
  summary_template = """You are a data analyst. Given the following data of customer support tickets, provide a detailed summary of the information.Make sure to highlight key points and provide an overall summary of the data.

Data:
{input}

Summary:"""

  any_template = """
You are a helpful AI assistant.
Answer based on the context provided.
context: {context}
input: {input}
answer:
"""

  def __init__(self):
    pass

  template_meta ={"doc":qa_template,
                  "data":summary_template,
                  "all":any_template}
  def get(self)->str:
    return self.template_meta

  def get_query_template_for_meta(self,meta_data:map)->str:
    return self.template_meta[meta_data['type']]


In [None]:
class Main:
  """chat_model="gemini-pro"
  embading_model="models/embedding-001"
  vector_db_index_name="cs-tickets"
  vector_db_dimension=768
  vector_db_metrix="cosine"
  API_KEY=userdata.get('GOOGLE_API_KEY')
  vectordb_API_KEY=userdata.get('PINE_CONE_API_KEY')"""

  def initialize(self,chat_model,embading_model,vector_db_index_name,vector_db_dimension,vector_db_metrix,API_KEY,vectordb_API_KEY)->RAG:
    """chat_model=chat_model
    embading_model=embading_model
    vector_db_index_name=vector_db_index_name
    vector_db_dimension=vector_db_dimension
    vector_db_metrix=vector_db_metrix
    API_KEY=API_KEY
    vectordb_API_KEY=vectordb_API_KEY"""
    llm=LLM(embading_model,chat_model,API_KEY,template.get())
    llm.initialize()
    vector_store=VectorStore(vector_db_index_name,vector_db_dimension,vector_db_metrix,vectordb_API_KEY)
    vector_store.initialize(llm.get_emabdding())
    extractor=Extractor()
    util=Util()
    rag=RAG(llm,vector_store,extractor,util)
    self.rag= rag

  def delele_db(self):
    self.rag.vector_store.drop_table()

  def load_content(self,load_request):
    self.rag.load(load_request)

  def generate_content(self,query_request,should_log=False):
    response=self.rag.generate(query_request,should_log)
    return response

In [None]:
chat_model="gemini-pro"
embading_model="models/embedding-001"
vector_db_index_name="cs-tickets"
vector_db_dimension=768
vector_db_metrix="cosine"
API_KEY=userdata.get('GOOGLE_API_KEY')
vectordb_API_KEY=userdata.get('PINE_CONE_API_KEY')

In [None]:
csv_file_path_with_name = '/content/drive/MyDrive/datasets/customer_support_tickets.csv'
colums_to_return = ['Ticket ID',
 'Customer Name',
 'Customer Email',
 'Customer Age',
 'Customer Gender',
 'Product Purchased',
 'Date of Purchase',
 'Ticket Type',
 'Ticket Subject',
 'Ticket Description',
 'Ticket Status',
 'Resolution',
 'Ticket Priority',
 'Ticket Channel',
 'Customer Satisfaction Rating']

In [None]:
main=Main()
main.initialize(chat_model,embading_model,vector_db_index_name,vector_db_dimension,vector_db_metrix,API_KEY,vectordb_API_KEY)


In [None]:
#main.rag.vector_store.drop_table()

In [None]:
#saniticzed_csv_path_with_name=main.rag.create_sanitised_csv(csv_file_path_with_name,colums_to_return,550)

In [None]:
load_request={
      "request_type":"csv",
      "uri":saniticzed_csv_path_with_name,
      "meta":{
          "doc_type":"csv",
          "doc_name":"customer_support_tickets"
          }
      }
#main.load_content(load_request)

In [None]:
query="List related to xbox"
template_meta={
    'type':'all'
}

query_request={
    "query":"list issues with playsyation",
    "template_type":"csv",
    "total_records":1,
    "meta":{
        "doc_type":"csv"
        }
    }

query_request={
    "query":"list issues with playsyation",
    "total_records":4,
    }


In [None]:
resonse=main.generate_content(query_request)
print(resonse.content)

- I'm having an issue with the PlayStation. Please assist. Thank you.
- I'm having an issue with the PlayStation. Please assist. Thank you.
- I'm having an issue with I've recently updated the firmware of my PlayStation, and the issue started happening afterward. Could it be related to the update?
- I'm having an issue with the Sony PlayStation. Please assist.
- I'm having an issue with the PlayStation. Please assist. — Kevin
