In [None]:
import os
import pandas as pd
from transformers import pipeline
from operator import itemgetter
from tqdm import tqdm
from datasets import Dataset
import datetime
import uuid
from scipy.stats import hmean
from typing import List, Dict, Any, Callable
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_community.retrievers import AzureAISearchRetriever
from langchain.prompts.chat import ChatPromptTemplate
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from ragas import evaluate
from ragas.metrics import (
    answer_relevancy,
    faithfulness,
    context_recall,
    context_precision,
    answer_relevancy,
    answer_correctness,
    answer_similarity,
    context_utilization,
    context_entity_recall
)
from ragas.metrics.critique import harmfulness, coherence, conciseness, correctness
from configs.widgets import *


def create_ragas_dataset(rag_pipeline:Callable, eval_dataset:List[Dict[str, Any]])-> Dataset:
  """
    Creates a RAGAS dataset from a given evaluation dataset using a specified RAG pipeline.

    Args:
        rag_pipeline (RagRetriever): The RAG pipeline to use for generating answers.
        eval_dataset (List[Dict[str, Any]]): The evaluation dataset to process.
        Each dictionary should contain a "question" and a "ground_truth".

    Returns:
        Dataset: A Dataset containing the questions, generated answers, contexts, and ground truths.

    """
  rag_dataset = []
  for row in tqdm(eval_dataset):
    answer = rag_pipeline.invoke({"question" : row["question"]})
    rag_dataset.append(
        {"question" : row["question"],
         "answer" : answer["response"].content,
         "contexts" : [context.page_content for context in answer["context"]],
         "ground_truths" : [row["ground_truth"]]
         }
    )
  rag_df = pd.DataFrame(rag_dataset)
  rag_eval_dataset = Dataset.from_pandas(rag_df)
  return rag_eval_dataset


def calculate_ragas_score(answer_relevancy:float,
                          faithfulness:float,
                          contextual_precision:float,
                          contextual_recall:float,
                          answer_correctness:float,
                          answer_similarity:float)->float:
  """
    Calculate the RAGAS score as the harmonic mean of six metrics.

    Parameters:
    answer_relevancy (float): The answer relevancy metric.
    faithfulness (float): The faithfulness metric.
    contextual_precision (float): The contextual precision metric.
    contextual_recall (float): The contextual recall metric.
    answer_correctness (float): The answer correctness metric.
    answer_similarity (float): The answer similarity metric.

    Returns:
    float: The RAGAS score.

    Raises:
    AssertionError: If any of the input values are less than or equal to 0.
    """

  # Ensure all values are greater than 0, as the harmonic mean is undefined for values <= 0
  assert all(i > 0 for i in [answer_relevancy, faithfulness, contextual_precision, contextual_recall]), "All input values mustbe greater than 0"

  ragas_score = hmean([answer_relevancy, faithfulness,
                       contextual_precision, contextual_recall,
                       answer_correctness, answer_similarity])

  return ragas_score


  # Setting up the environment and the model
environment = dbutils.secrets.get(scope = "scope01", key = "Environment")
client_id = dbutils.secrets.get("scope01", "OpenAIEndpoint")
client_secret = dbutils.secrets.get("scope01", "OpenAIKey")

custom_model = AzureChatOpenAI(
    openai_api_version=openai_api_version_widget.value,
    azure_deployment=azure_deploy_widget.value,
    azure_endpoint=client_id,
    openai_api_key=client_secret,
)
embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
    azure_deployment=azure_ai_search_model_name_widget.value,
    openai_api_version=azure_ai_search_api_version_widget.value,
    azure_endpoint=azure_ai_search_endpoint_widget.value,
    api_key=azure_ai_embedding_api_key_widget.value,
)
vector_store: AzureSearch = AzureSearch(
    azure_search_endpoint=ai_search_vectorstore_address_widget.value,
    azure_search_key=azure_ai_search_api_key_widget.value,
    index_name=azure_ai_search_index_name_widget.value,
    embedding_function=embeddings.embed_query,
    # Configure max retries for the Azure client
    additional_search_client_options={"retry_total": 4},
)
retriever = AzureAISearchRetriever(
  service_name = azure_ai_search_service_name_widget.value,
  api_key=azure_ai_search_api_key_widget.value,
  content_key="chunk",
  top_k=1,
  index_name=azure_ai_search_service_index_name_widget.value
)

search_client = SearchClient(
    endpoint=ai_search_vectorstore_address_widget.value,
    index_name=azure_ai_search_service_index_name_widget.value,
    credential=AzureKeyCredential(azure_ai_search_api_key_widget.value)
)

In [None]:
results = search_client.search(search_text="*", include_total_count=True)  # Adjust the query as needed
documents = []
for result in results:
    content = result.copy()
    documents.append(content)

# Question generation
question_schema = ResponseSchema(
  name = "question",
  description = "A question about the context"
)
question_response_schemas = [question_schema]
question_output_parser = StructuredOutputParser.from_response_schemas(question_response_schemas)
format_instructions = question_output_parser.get_format_instructions()
question_generation_llm = custom_model
bare_prompt_template = "{content}"
bare_template = ChatPromptTemplate.from_template(template=bare_prompt_template)
qa_template = """\
You are a University Professor creating a test for advanced students. For each context,
create a question that is specific to the context. Avoid creating generic or general questions.
question: a question about the context.
Format the output as JSON with the following keys:
question
context: {context}
"""
prompt_template = ChatPromptTemplate.from_template(template=qa_template)
messages = prompt_template.format_messages(
    context=documents[0],
    format_instructions=format_instructions
)
question_generation_chain = bare_template | question_generation_llm
response = question_generation_chain.invoke({"content" : messages})
output_dict = question_output_parser.parse(response.content)
qac_triples = []
for text in tqdm(documents[:10]):
  messages = prompt_template.format_messages(
      context=text,
      format_instructions=format_instructions
  )
  response = question_generation_chain.invoke({"content" : messages})
  try:
    output_dict = question_output_parser.parse(response.content)
  except Exception as e:
    continue
  output_dict["context"] = text
  qac_triples.append(output_dict)

# Answer generation
answer_generation_llm = custom_model
answer_schema = ResponseSchema(
    name="answer",
    description="an answer to the question"
)
answer_response_schemas = [answer_schema]
answer_output_parser = StructuredOutputParser.from_response_schemas(answer_response_schemas)
format_instructions = answer_output_parser.get_format_instructions()
answer_qa_template = """\
You are a University Professor creating a test for advanced students. For each question and context, create an answer.
answer: a answer about the context.
Format the output as JSON with the following keys:
answer
question: {question}
context: {context}
"""
answer_prompt_template = ChatPromptTemplate.from_template(template=answer_qa_template)
messages = answer_prompt_template.format_messages(
    context=qac_triples[0]["context"],
    question=qac_triples[0]["question"],
    format_instructions=format_instructions
)
answer_generation_chain = bare_template | answer_generation_llm
answer_response = answer_generation_chain.invoke({"content" : messages})
output_dict = answer_output_parser.parse(answer_response.content)
for triple in tqdm(qac_triples):
  messages = answer_prompt_template.format_messages(
      context=triple["context"],
      question=triple["question"],
      format_instructions=format_instructions
  )
  response = answer_generation_chain.invoke({"content" : messages})
  try:
    output_dict = answer_output_parser.parse(response.content)
  except Exception as e:
    continue
  triple["answer"] = output_dict["answer"]

# Evaluation dataset generation
ground_truth_qac_set = pd.DataFrame(qac_triples)
ground_truth_qac_set["context"] = ground_truth_qac_set["context"].map(lambda x: str(x['chunk']))
ground_truth_qac_set = ground_truth_qac_set.rename(columns={"answer" : "ground_truth"})
eval_dataset = Dataset.from_pandas(ground_truth_qac_set)

template = f"""
{system_message_widget.value}
Context: {{context}}
Question: {{question}}
"""
prompt = ChatPromptTemplate.from_template(template)
retrieval_augmented_qa_chain = (
    {"context": itemgetter("question") | retriever, "question": itemgetter("question")}
    | RunnablePassthrough.assign(context=itemgetter("context"))
    | {"response": prompt | custom_model, "context": itemgetter("context")}
)
metrics=[
      context_precision,# Retrieval, relevance and ranking of retrieved context chunks.
      faithfulness, # = Generative,coherence, factual consistency of the generated answer against the given context
      answer_relevancy,# Generative, relevance , the quality and conciseness of the retrieved context.
      context_recall,# Retrieval accuracy, retrieval of relevant context chunks
      answer_correctness,# Generative, groundness , accuracy of the generated answer when compared to the ground truth
      answer_similarity,# Generative, semantic resemblance between the generated answer and the ground truth.
      context_utilization, context_entity_recall,
      harmfulness, coherence, conciseness, correctness
      ]
basic_qa_ragas_dataset = create_ragas_dataset(retrieval_augmented_qa_chain, eval_dataset)
basic_qa_result = evaluate(basic_qa_ragas_dataset, metrics=metrics, llm=custom_model, embeddings=embeddings)
ragas_score = calculate_ragas_score(basic_qa_result['answer_relevancy'],
                                    basic_qa_result['faithfulness'], basic_qa_result['context_precision'],
                                    basic_qa_result['context_recall'], basic_qa_result['answer_correctness'],
                                    basic_qa_result['answer_similarity'])

run_time = datetime.datetime.now()
run_id = f"{run_time.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4()}"

result_dict = {
    'context_precision': round(basic_qa_result['context_precision'],4),
    'faithfulness': round(basic_qa_result['faithfulness'],4),
    'answer_relevancy': round(basic_qa_result['answer_relevancy'],4),
    'context_recall':round( basic_qa_result['context_recall'],4),
    'answer_correctness':round( basic_qa_result['answer_correctness'],4),
    'answer_similarity': round(basic_qa_result['answer_similarity'],4)
}
# Convert the dictionary to a DataFrame
evaluate_df = pd.DataFrame([result_dict])
evaluate_df['top_k']=retriever.top_k
evaluate_df['model_temperature']= custom_model.temperature
evaluate_df['max_token']= custom_model.max_tokens
evaluate_df['top_p']= custom_model.top_p
evaluate_df['embedding_model']= embeddings.model
evaluate_df['overall_ragas_score']= round(ragas_score,4)
evaluate_df['run_time']= run_time
evaluate_df['run_id'] = run_id
evaluate_df = evaluate_df.fillna(-111)

all_questions_df = basic_qa_result.to_pandas()
all_questions_df['run_id']= run_id

spark.sql(f"CREATE DATABASE IF NOT EXISTS {environment_widget.value}_enriched_chatbot")

evaluation_metrics_sdf = spark.createDataFrame(all_questions_df)
sanitized_run_id = run_time.strftime('%Y%m%d%H%M%S')

evaluation_metrics_sdf.write.format("delta").option("mergeSchema", "true").saveAsTable(f"{environment_widget.value}_local.{environment_widget.value}_enriched_chatbot.chatbot_evaluation_detailed_{sanitized_run_id}")