<a href="https://colab.research.google.com/github/lehuong240823/rag-company-knowledge-consultant-chatbot/blob/main/ChatBot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Get Started

In [None]:
# @title Install Libraries
!pip -q install pinecone
!pip -q install langchain_community
!pip -q install langchain_openai
!pip -q install langchain_pinecone
!pip -q install langchain-huggingface
!pip -q install langsmith

In [None]:
# @title Import Libraries
import os, yaml, inspect, json, datetime, pytz, hashlib
from os.path import join
from google.colab import userdata
from langchain.chains import ConversationalRetrievalChain
from langchain.document_loaders import TextLoader
from langchain.document_loaders.csv_loader import CSVLoader
from langchain.prompts import PromptTemplate
from langchain.text_splitter import CharacterTextSplitter
from langchain_huggingface import HuggingFaceEndpointEmbeddings
from langchain_openai import ChatOpenAI
from langchain.prompts.chat import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate, AIMessagePromptTemplate
from langchain_pinecone.vectorstores import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from langsmith import Client, traceable

In [None]:
# @title Set Environment Variables
os.environ['HUGGINGFACEHUB_API_TOKEN'] = userdata.get('HF_TOKEN')
os.environ['LANGSMITH_API_KEY'] = userdata.get('LANGSMITH_API_KEY')
os.environ['LANGSMITH_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGSMITH_PROJECT'] = userdata.get('PINECONE_INDEX_NAME')
os.environ['LANGSMITH_TRACING'] = 'true'
os.environ['OPENAI_API_BASE'] = 'https://router.huggingface.co/v1' #'https://openrouter.ai/api/v1'
os.environ['OPENAI_API_KEY'] = userdata.get('HF_TOKEN') #userdata.get('OPENROUTER_API_KEY')
os.environ['PINECONE_API_KEY'] = userdata.get('PINECONE_API_KEY')
os.environ['PINECONE_INDEX_NAME'] = userdata.get('PINECONE_INDEX_NAME')
os.environ['GITHUB_PERSONAL_ACCESS_TOKEN'] = userdata.get('GITHUB_PERSONAL_ACCESS_TOKEN')
root_path =  userdata.get('ROOT_PATH')
embedding_path = join(root_path, 'embedding_data')
reference_path = join(root_path, 'reference_data')
evaluators_path = join(root_path, 'evaluators')

# Method

In [None]:
# @title Get Pinecone Index
def get_index(index_name=os.environ['PINECONE_INDEX_NAME']):
  pc = Pinecone()
  if not pc.has_index(index_name):
    pc.create_index(
        name=index_name,
        dimension=4096,
        metric='cosine',
        spec=ServerlessSpec(
            cloud='aws',
            region='us-east-1'
        )
    )

  return pc.Index(index_name)

In [None]:
# @title Create Text Splitter
def create_text_splitter(chunk_size=1000, chunk_overlap=0):
  return CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

In [None]:
def get_timestamp():
  vietnam_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
  vietnam_time = datetime.datetime.now(vietnam_timezone)
  return vietnam_time.isoformat()

def hash(text):
  '''MD% hash for track change'''
  return hashlib.md5(text.encode('utf-8')).hexdigest()

In [None]:
# @title Add Documents to Vector Store
def Loader(file):
  if file.endswith('.csv'):
    loader = CSVLoader(file)
  elif file.endswith('.txt'):
    loader = TextLoader(file)
  return loader

def add_documents(path, ids):
  loader = Loader(path)
  documents = loader.load()
  for each in documents:
    each.metadata['timestamp'] = get_timestamp()
  print([each.metadata for each in documents])
  print(f'Loaded {len(documents)} document(s) from {path}')
  docs = text_splitter.split_documents(documents)
  #vectorstore.add_documents(docs, ids=[f'{ids}_{str(i)}' for i in range(0, len(docs))])
  print(f'Added {len(docs)} text chunk(s) to vector store.')

In [None]:
# @title Add All Ref Data to Vectorstore
for root, dirs, files in os.walk(reference_path):
  for name in files:
    path = join(root, name)
    add_documents(path, name[:-4])

In [None]:
# @title Add Texts to Vector Store
def add_texts(text):
  texts = text_splitter.split_text(text)
  vectorstore.add_texts(texts)
  print(f'Added {len(texts)} text chunks to vector store.')

In [None]:
# @title Get Evaluator Path
def get_evaluator_path(evaluator_name):
  return f'{evaluators_path}/{evaluator_name}'

In [None]:
# @title Load Output Schema
def load_output_schema(evaluator_name):
  file_path = f'{get_evaluator_path(evaluator_name)}/output_schema.yaml'
  with open(file_path, 'r') as f:
    return yaml.safe_load(f)

In [None]:
# @title Load Chat Prompt
def load_chat_prompt(evaluator):
  file_path = f'{get_evaluator_path(evaluator)}/prompt.yaml'
  with open(file_path, 'r') as f:
    config = yaml.safe_load(f)

  role_map = {'system': SystemMessagePromptTemplate, 'user': HumanMessagePromptTemplate, 'ai': AIMessagePromptTemplate}

  messages = []
  for msg_config in config['messages']:
    messages.append(role_map[msg_config['role']].from_template(msg_config['template']))
  #messages = [role_map[m['role']].from_template(m['template']) for m in cfg['messages']]

  return ChatPromptTemplate.from_messages(messages)

In [None]:
# @title Create LangSmith Dataset
def create_dataset(file_name, dataset_name=None, provider='langsmith'):
  #Load json file only
  dataset_name = file_name[:-5] if dataset_name is None else dataset_name
  with open(join(embedding_path, provider, file_name)) as f:
    if not client.has_dataset(dataset_name=dataset_name):
      return client.create_dataset(dataset_name=dataset_name)
      client.create_examples(
        dataset_id=dataset.id,
        examples=json.load(f)
      )

In [None]:
# @title Create Structured Grader LLM
def create_structured_grader_llm(model='qwen/qwen3-8b:free', temperature=0, evaluator=None):
  return ChatOpenAI(model=model, temperature=temperature).with_structured_output(
    load_output_schema(evaluator),
    method='json_schema', strict=True
  )

# Evaluators

In [None]:
# @title Initialize Components
index = get_index()

client = Client()

#llm = ChatOpenAI(model='qwen/qwen3-8b:free')
llm = ChatOpenAI(model='Qwen/Qwen3-8B:featherless-ai')

embeddings = HuggingFaceEndpointEmbeddings(model='Qwen/Qwen3-Embedding-8B')

vectorstore = PineconeVectorStore(index=index, embedding=embeddings)

text_splitter = create_text_splitter()

retriever = vectorstore.as_retriever(k=2)

qa_chain = ConversationalRetrievalChain.from_llm(
  llm,
  retriever=retriever,
  combine_docs_chain_kwargs={'prompt': load_chat_prompt('qa_chain')},
  return_source_documents=True
)

#create_dataset('fqa.json')

In [None]:
chat_history = []
query = 'công ty này ở phường nào'
response = qa_chain.invoke({'question': query, 'chat_history': chat_history})

print(response['source_documents'])

In [None]:
# @title Correctness
def correctness(inputs: dict, outputs: dict, reference_outputs: dict) -> bool:
  evaluator = inspect.currentframe().f_code.co_name
  grader_llm = create_structured_grader_llm(evaluator=evaluator)
  grade = grader_llm.invoke(load_chat_prompt(evaluator).format(
    question=inputs['question'],
    reference=reference_outputs['answer'],
    answer=outputs['answer']
  ))
  return grade['correct']

In [None]:
# @title Relevance
def relevance(inputs: dict, outputs: dict) -> bool:
  evaluator = inspect.currentframe().f_code.co_name
  grader_llm = create_structured_grader_llm(evaluator=evaluator)
  grade = grader_llm.invoke(load_chat_prompt(evaluator).format(
    question=inputs['question'],
    answer=outputs['answer']
  ))
  return grade['relevant']

In [None]:
# @title Groundedness
def groundedness(inputs: dict, outputs: dict) -> bool:
  evaluator = inspect.currentframe().f_code.co_name
  grader_llm = create_structured_grader_llm(evaluator=evaluator)
  grade = grader_llm.invoke(load_chat_prompt(evaluator).format(
    facts='\n'.join(doc.page_content for doc in outputs["documents"]),
    answer=outputs['answer']
  ))
  return grade['grounded']

In [None]:
# @title Retrieval relevance
def retrieval_relevance(inputs: dict, outputs: dict) -> bool:
  evaluator = inspect.currentframe().f_code.co_name
  grader_llm = create_structured_grader_llm(evaluator=evaluator)
  grade = grader_llm.invoke(load_chat_prompt(evaluator).format(
    facts='\n'.join(doc.page_content for doc in outputs["documents"]),
    answer=outputs['answer']
  ))
  return grade['relevant']

In [None]:
# @title Target for Experiment
@traceable()
def rag_bot(question: str) -> dict:
  ai_msg = qa_chain.invoke({'question': question, 'chat_history': []})
  return {'answer': ai_msg['answer'], 'documents': ai_msg['source_documents']}

def target(inputs: dict) -> dict:
  return rag_bot(inputs['question'])

In [None]:
# @title Experiment
def experiment():
  experiment_results = client.evaluate(
    target,
    data='fqa',
    evaluators=[correctness, relevance, groundedness, retrieval_relevance],
    experiment_prefix='rag-doc-relevance',
  )

In [None]:
experiment()

In [None]:
print(experiment_results.to_pandas().to_markdown())