S011 Self Query over semantically chunked data
- Use for all LLMs, except Cohere
- Requires markdown documents
- Use LangChain Markdown parser to semantically chunk documents
- Store the chunks in a vector database (along with necessary metadata)
- Use a Langchain "chain" for the RAG flow
-   Use retriever to fetch chunks from vector database
-   Retriever will translate natural language query into filters on chunk metadata 
-   Modify these chunks to include metadata information 
-   Pass the retrieved chunks to LLM for generation


In [1]:
import os

import nest_asyncio
from llama_index.core.base.response.schema import Response
from llama_index.core import Settings
from llama_index.core.evaluation import (
    BatchEvalRunner,
    CorrectnessEvaluator,
)

from llama_index.llms.openai import OpenAI
import openai

#from chunker import threadpool_map
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from langchain_cohere import ChatCohere, CohereEmbeddings
from langchain_together import ChatTogether
from langchain_fireworks import ChatFireworks
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter
import tiktoken
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import DirectoryLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import ChatPromptTemplate
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever



import os
from langsmith import Client

from datetime import datetime
import pandas as pd
import random

from evaluation_utils import threadpool_map

nest_asyncio.apply()

In [2]:
from config import set_environment
set_environment()

Choose the LLM for generation

In [3]:
generation_llm_family = os.environ["GENERATION_LLM_FAMILY"]
generation_llm_model = os.environ["GENERATION_LLM_MODEL"]


if generation_llm_family == "OPENAI":
   llm = ChatOpenAI(model_name=generation_llm_model, temperature=0)
elif generation_llm_family == "ANTHROPIC":
   llm = ChatAnthropic(model_name=generation_llm_model, temperature=0)
elif generation_llm_family == "GOOGLE":
   llm = ChatGoogleGenerativeAI(model=generation_llm_model, temperature=0)
elif generation_llm_family == "COHERE":
   llm = ChatCohere(model=generation_llm_model, temperature=0)
elif generation_llm_family == "META":
   #llm = ChatTogether(model=generation_llm_model, temperature=0)
   llm = ChatFireworks( model=generation_llm_model,temperature=0)
elif generation_llm_family == "QWEN":
   llm = ChatTogether(model=generation_llm_model, temperature=0)
elif generation_llm_family == "MISTRALAI":
   llm = ChatTogether(model=generation_llm_model, temperature=0)

Choose the LLM for embedding

In [4]:
embedding_llm_family = os.environ["EMBEDDING_LLM_FAMILY"]
embedding_llm_model = os.environ["EMBEDDING_LLM_MODEL"]
embedding_dimensions = int(os.environ["EMBEDDING_DIMESIONS"])

if embedding_llm_family == "OPENAI":
    embeddings_model = OpenAIEmbeddings()
elif generation_llm_family == "GOOGLE":
    embeddings_model = GoogleGenerativeAIEmbeddings(model=embedding_llm_model)
elif generation_llm_family == "COHERE":
    embeddings_model = CohereEmbeddings(model=embedding_llm_model)

In [5]:
eval_name = os.environ["EVAL_NAME"]
eval_directory = os.environ["EVAL_DIRECTORY"]
eval_file = os.environ["EVAL_FILE"]
eval_questions = os.environ["EVAL_QUESTIONS"]
eval_results_dir = os.environ["EVAL_RESULTS_DIR"]
eval_quick_test = os.environ["EVAL_QUICK_TEST"]
eval_db = os.environ["EVAL_DB"]

rag_strategy = os.environ["RAG_STRATEGY"]
similarity_top_k = int(os.environ["SIMILARITY_TOP_K"]) 

prompt_template = os.environ["RAG_PROMPT_TEMPLATE"]

Pick the strategy

In [6]:
embed_string = embedding_llm_model.replace("models/", "") if "models/" in embedding_llm_model else embedding_llm_model
generation_string = generation_llm_model.replace("meta-llama/", "").replace("accounts/fireworks/models/","").replace("Qwen/", "").replace("models/", "").replace("mistralai/", "") 

if rag_strategy == "S011_00":
    rag_strategy_desc = "Self Query"
    batch_id = f"{eval_name}_{rag_strategy}_GM_{generation_string}_EM_{embed_string}_K_{similarity_top_k}_{random.randint(0, 999):03}"

output_file = f"{eval_results_dir}/{batch_id}.xlsx"

Setup Langsmith tracing

In [7]:
import os
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_PROJECT'] = eval_name

Read the documents, create chunks, calculate embeddings, store in a vector database

In [8]:
if os.path.exists(eval_db) and os.path.isdir(eval_db):
    vectorstore = Chroma(persist_directory=eval_db,
                  embedding_function=embeddings_model)
else:
    loader = DirectoryLoader(eval_directory, glob="**/*.md", loader_cls=TextLoader)
    text_data = loader.load()
    page_contents = [item.page_content for item in text_data]
    text_concatenated = "\n\n".join(page_contents)

    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
        ("####", "Header 5")
    ]

    markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
    md_header_splits = markdown_splitter.split_text(text_concatenated)
    vectorstore = Chroma.from_documents(documents=md_header_splits, 
                                    embedding=embeddings_model,
                                    persist_directory=eval_db)
    vectorstore.persist()

retriever = vectorstore.as_retriever(search_kwargs={"k": similarity_top_k})

Create a self query retriever

In [9]:
import os

# Function to extract level one headers from all markdown files in a directory
def extract_level_one_headers_from_directory(directory):
    headers = []
    for filename in os.listdir(directory):
        if filename.endswith('.md'):  # Only process markdown files
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r') as file:
                for line in file:
                    if line.startswith('# '):  # Level one header in markdown
                        headers.append(line.strip('# ').strip())
                        break  # Only need the first level one header
    return headers

In [10]:
# Extract headers from all markdown files in the directory
headers_from_directory = extract_level_one_headers_from_directory(eval_directory)
headers_directory_output = f"One of {headers_from_directory}"
headers_directory_output

"One of ['Maintenance and Support', 'Non Billable Indepdt Contractor Labor', 'Non-Employee meetings or events', 'Sponsorships Fees']"

In [11]:
headers_from_directory = extract_level_one_headers_from_directory(eval_directory)

metadata_field_info = [
    AttributeInfo(
        name="Header 1",
        description=f"The category of procurement . One of {headers_from_directory} ",
        type="string",
    ),
    
]
document_content_description = "Procurement policies for a particular category of procurement"
llm = ChatOpenAI(temperature=0)
retriever = SelfQueryRetriever.from_llm(
    llm,
    vectorstore,
    document_content_description,
    metadata_field_info,
)

In [12]:
import re

def replace_title_in_text(text, title):
    # Create a regex pattern to match one or more # followed by the title
    pattern = re.compile(rf'#+\s+{re.escape(title)}')
    # Replace all matches with the title
    result = pattern.sub(title, text)
    return result

In [13]:
def format_documents(retrieved_chunks):
    
    result = "\n"
    
    for chunk in retrieved_chunks:
    
        header_1 = chunk.metadata.get("Header 1", "")
        header_2 = chunk.metadata.get("Header 2", "")
        header_3 = chunk.metadata.get("Header 3", "")
        header_4 = chunk.metadata.get("Header 4", "")
        header_5 = chunk.metadata.get("Header 5", "")

        headers = [header_1, header_2, header_3, header_4, header_5]
        parents = []

        for header in headers:
            if header == "":
                break
            parents.append(header)
    
        # Identify the title as the last non-empty header
        title = parents[-1] if parents else "Untitled"
        text = replace_title_in_text(chunk.page_content, title)
        
        parents_concat = '\n'.join(parents)

        result += (
                    f"\n# Relevant Document Title:\n{title}\n"
                    f"## Document Text:\n{text}\n"
                    f"## This document is contained under the following sections:\n{parents_concat}\n"
            ) 
        

    return result


In [14]:
prompt = ChatPromptTemplate.from_template(prompt_template)

In [15]:
rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(lambda x: format_documents(x["context"])))
    | prompt
    | llm
    | StrOutputParser()
)

retrieve_docs = (lambda x: x["question"]) | retriever

rag_chain = RunnablePassthrough.assign(context=retrieve_docs).assign(
    answer=rag_chain_from_docs
)


Quick Test

In [13]:
#eval_quick_test = """
#How do I enter or change a start date for my hire?

#"""

In [16]:
response = rag_chain.invoke({"question": eval_quick_test})
print(f"Question:{eval_quick_test}{chr(10)}")
print(f"Response:{chr(10)}{response['answer']}{chr(10)}")


Question:One of the sponsors I am using has some questions around PII security, what should I do?

Response:
If one of the sponsors you are using has questions around Personally Identifiable Information (PII) security, you should refer to the Global Marketing Compliance, Privacy and Security Sponsorships Guidance document provided in the Additional Ordering Instructions section. This document will provide you with guidance on how to handle PII security concerns and outline the necessary steps to ensure compliance with privacy and security regulations. Additionally, it is mentioned that 3rd Party Sponsorships require C&E approval, so you may need to seek approval from the appropriate department before proceeding with the sponsorship.



In [17]:
from langchain.callbacks.tracers import LangChainTracer
from langchain_core.tracers.context import tracing_v2_enabled

In [18]:

def run_rag_pipeline(row):

    metadata = {
        "eval_name": eval_name,
        "batch_id":batch_id,
        "query_num": row["query_num"],
        "rag_strategy": rag_strategy,
        "rag_strategy_desc": rag_strategy_desc,
        "parameter_1": similarity_top_k,
        "parameter_2": "",
        "parameter_3": "",
        "parameter_4": "",
        "parameter_5": "",
        "model": generation_llm_model,
        "embed_model": embedding_llm_model,
        "embed_dimensions": embedding_dimensions,
    }
    

    with tracing_v2_enabled(project_name=eval_name):
        response = rag_chain.invoke({"question": row["query"]},{"metadata": metadata})   
    
    return {
        "query_num": row["query_num"],
        "generated_answer": response['answer'],
        "sources": response['context']
    }



In [19]:
# Load the evaluation questions
queries = pd.read_excel(eval_questions)

In [20]:
results = threadpool_map(run_rag_pipeline, [{"row": item[1]} for item in list(queries.iterrows())],num_workers=1)

100%|██████████| 18/18 [01:22<00:00,  4.60s/it]


In [21]:
results

[{'query_num': 'CMP_PROC_GUIDE_001',
  'generated_answer': "Based on the information provided in the context, for purchases at a value equal to or greater than $0 involving Personal Information (PI) exchange, a contract is required regardless of the amount. However, if no PI is being exchanged, a contract is required for transactions of $25,000 or more.\n\nIn the case of a US tradeshow where the purchase amount for one of the sponsors will cost over $120,000 and there is no contract in place, the steps you should take are as follows:\n\n1. Contact the Category Manager to request an exception to the contract requirement for purchases over $25,000. The Category Manager has the authority to grant exceptions to the contract requirement.\n2. If the purchase involves the exchange of Personal Information (PI), ensure that the language from the Marketing Contact Data List SOW template is applicable to the purchase.\n3. If the purchase does not involve the exchange of PI, ensure that the approp

In [22]:
df = queries.merge(pd.DataFrame(results), on="query_num", how="inner")
assert len(df) == len(queries)  # Ensure that all queries have been processed

Choose the LLM for evaluations

In [23]:
evaluation_llm_family = os.environ["EVALUATION_LLM_FAMILY"]
evaluation_llm_model = os.environ["EVALUATION_LLM_MODEL"]

if evaluation_llm_family == "OPENAI":
    Settings.eval_llm = OpenAI(temperature=0, model=evaluation_llm_model)


In [24]:
eval_lidx_c = CorrectnessEvaluator(llm=Settings.eval_llm)

runner = BatchEvalRunner(
    {"correctness": eval_lidx_c},
    workers=16,
)

LI_eval_results = await runner.aevaluate_responses(
    queries=df["query"].tolist(),
    responses=[Response(response=x) for x in df["generated_answer"].tolist()],
    reference=[{"reference": x} for x in df["expected_answer"].tolist()],
)

In [25]:
df["correctness_result"] = LI_eval_results["correctness"]
df["correctness_llm"] = df["correctness_result"].map(lambda x: x.score)
df["feedback_llm"] = df["correctness_result"].map(lambda x: x.feedback)
print(f"""Average score: {df["correctness_llm"].mean()}""")

Average score: 3.7222222222222223


In [26]:
responses_df = pd.DataFrame()
responses_df = df[['query_num', 'query', 'expected_answer', 'generated_answer', 'correctness_llm']]
responses_df['correctness_human'] = responses_df['correctness_llm']
responses_df.loc[:, ['faithfulness_llm', 'faithfulness_human']] = ""
responses_df['rag_strategy'] = rag_strategy
responses_df['rag_strategy_desc'] = rag_strategy_desc
responses_df['parameter_1'] = similarity_top_k
responses_df.loc[:, ['parameter_2', 'parameter_3', 'parameter_4', 'parameter_5']] = ""
responses_df['model'] = generation_string 
responses_df['embed_model'] = embedding_llm_model 
responses_df['eval_model'] = evaluation_llm_model
responses_df['embed_dimensions'] = embedding_dimensions   
responses_df['reranker'] = ""
responses_df['run_date'] = datetime.today().strftime('%Y-%m-%d') 
responses_df['eval_name'] = eval_name
responses_df['batch_id'] = batch_id

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  responses_df['correctness_human'] = responses_df['correctness_llm']
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  responses_df['rag_strategy'] = rag_strategy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  responses_df['rag_strategy_desc'] = rag_strategy_desc
A value is trying to be set on a copy o

Get Performance Metrics from Langsmith

In [27]:
client = Client()
runs = client.list_runs (
    project_name=eval_name, 
    filter=f"and(eq(metadata_key, 'batch_id'), eq(metadata_value, '{batch_id}'))",
    is_root=True
)

In [28]:
usage_data = []

for run in runs:
        
    usage_data.append(
        {
            "query_num": run.extra["metadata"]["query_num"],
            "total_tokens": run.total_tokens,
            "prompt_tokens": run.prompt_tokens,
            "completion_tokens": run.completion_tokens,
            "total_cost": f"${run.total_cost:.4f}"
            if run.total_cost
            else None,
            "prompt_cost": f"${run.prompt_cost:.4f}"
            if run.prompt_cost
            else None,
            "completion_cost": f"${run.completion_cost:.4f}"
            if run.completion_cost
            else None,
            "latency": (run.end_time - run.start_time).total_seconds()
            if run.end_time
            else None,  # Pending runs have no end time
            "first_token_ms": (run.first_token_time - run.start_time).total_seconds()*1000
            if run.first_token_time
            else None,  # Pending runs have no end time
        }
    )

usage_df = pd.DataFrame(usage_data)

In [29]:
responses_df = responses_df.merge(usage_df, on='query_num', how='left')

In [30]:
correctness_sum = df['correctness_llm'].sum()
correctness_mean = df['correctness_llm'].mean()

# Create a new DataFrame for the summary
summary_df = pd.DataFrame({
    'Metric': ['Sum', 'Mean'],
    'Value': [correctness_sum, correctness_mean]
})

In [31]:
correctness_df = pd.DataFrame()
correctness_df = df[['query_num', 'query', 'expected_answer', 'generated_answer', 'correctness_llm', 'feedback_llm']]
correctness_df['correctness_human'] = correctness_df['correctness_llm']
correctness_df['feedback_human'] = ""
correctness_df['batch_id'] = batch_id

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  correctness_df['correctness_human'] = correctness_df['correctness_llm']
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  correctness_df['feedback_human'] = ""
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  correctness_df['batch_id'] = batch_id


In [32]:
sources_df = df[['query_num', 'query', 'sources']]
sources_df['batch_id'] = batch_id

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sources_df['batch_id'] = batch_id


In [33]:
with pd.ExcelWriter(output_file) as writer:
   responses_df.to_excel(writer, sheet_name="Responses", index=False)
   sources_df.to_excel(writer, sheet_name="Sources", index=False)
   summary_df.to_excel(writer, sheet_name="Summary", index=False)
   correctness_df.to_excel(writer, sheet_name="Correctness", index=False)