In [None]:
import os
from dotenv import load_dotenv
from langchain_ollama import OllamaLLM  # Replaces langchain_community.llms.Ollama
from langchain_huggingface import HuggingFaceEmbeddings  # New embedding option
from langchain_chroma import Chroma  # Adds Chroma vector storage
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai.chat_models import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_experimental.text_splitter import SemanticChunker
from giskard.rag import KnowledgeBase
import pdfplumber
import re
import textwrap
import pandas as pd
from uuid import uuid4
import torch 
import os
import pickle

load_dotenv()

device = "cuda" if torch.cuda.is_available() else "cpu"

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
''
AVAILABLE_LLMS = {
    "ChatGPT4o" :"gpt-4o",
    "ChatGPT4o-mini": "gpt-4o-mini",
    "ChatGPT3.5-turbo": "gpt-3.5-turbo",
    "Llama3.2-3b": "llama3.2:3b",
}

AVAILABLE_EMBS = {
    "ChatGPT4o" : "openai",
    "ChatGPT4o-mini" : "openai",
    "ChatGPT3.5-turbo" : "openai",
    "Llama3.2-3b": "sentence-transformers/all-mpnet-base-v2"
}

In [2]:
def init_llm(model_name):
    """
    Initializes the LLM based on the provided model name.

    Parameters:
        model_name (str): The key of the model in AVAILABLE_LLMS (e.g., "ChatGPT3.5-turbo", "Llama3.2-3b").
    
    Returns:
        llm: The initialized LLM instance.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"  # Ensure `device` is defined

    # Check for valid input
    if model_name not in AVAILABLE_LLMS.keys():
        raise ValueError(f"Unsupported model: {model_name}. Please choose from: {list(AVAILABLE_LLMS.keys())}")

    # Initialize LLM based on the model name
    llm_config = AVAILABLE_LLMS[model_name]
    
    if model_name.startswith("ChatGPT"):
        # For OpenAI GPT models
        OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
        llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model=llm_config)
    elif model_name.startswith("Llama"):
        # For Llama models
        llm = OllamaLLM(model=llm_config)
    else:
        raise ValueError(f"Model configuration for {model_name} is not supported.")

    return llm

def init_emb(llm_model_name):
    """
    Initializes the embedding model based on the provided LLM model name.

    Parameters:
        llm_model_name (str): The name of the LLM model (e.g., "ChatGPT3.5-turbo", "Llama3.2-3b").
    
    Returns:
        emb_model: The initialized embedding model instance.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"  # Ensure `device` is defined

    # Validate input
    if llm_model_name not in AVAILABLE_EMBS.keys():
        raise ValueError(f"Unsupported model: {llm_model_name}. Please choose from: {list(AVAILABLE_EMBS.keys())}")

    # Initialize embeddings dynamically based on the model name
    emb_model_config = AVAILABLE_EMBS[llm_model_name]

    if llm_model_name.startswith("ChatGPT"):
        # For OpenAI GPT models
        emb_model = OpenAIEmbeddings()
    elif llm_model_name.startswith("Llama"):
        # For Llama models
        model_kwargs = {'device': device}
        encode_kwargs = {'normalize_embeddings': False}
        emb_model = HuggingFaceEmbeddings(
            model_name=emb_model_config,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
    else:
        raise ValueError(f"Model configuration for {llm_model_name} is not supported.")

    return emb_model


def process_table(table):
    """
    Processes a table by converting it into a string representation.
    Cleans each cell using the `clean_text` function.
    """
    if not table:
        return ""
    return "\n".join(["\t".join(clean_text(cell) for cell in row if cell is not None) for row in table])

def clean_text(text):
    """
    Cleans text by:
    - Removing unwanted characters (e.g., \n, \t, bullet points).
    - Removing extra spaces.
    - Handles None values gracefully.
    """
    if not text:
        return ""
    # Remove non-informative bullet points or markers
    text = re.sub(r'[•➢]', '', text)
    # Replace newlines, tabs, and carriage returns with a single space
    text = re.sub(r'[\n\r\t]', ' ', text)
    # Replace multiple spaces with a single space
    text = re.sub(r'\s+', ' ', text)
    return text.strip()
    

def read_files(filepath):
    folders = {
        "eu": os.path.join(filepath, "eu"),
        "other_higher": os.path.join(filepath, "other/higher-value"),
        "other_lower": os.path.join(filepath, "other/lower-value"),
    }

    # Metadata mapping
    metadata_mapping = {
        "eu": {"type": "eu"},
        "other_higher": {"type": "other", "value": "higher"},
        "other_lower": {"type": "other", "value": "lower"},
    }

# Initialize a data structure to store extracted content
    all_data = []

# Loop through folders and process files
    for folder_name, folder_path in folders.items():
        metadata = metadata_mapping[folder_name]
    
        for pdf_file in os.listdir(folder_path):
            if pdf_file.endswith(".pdf"):
                file_path = os.path.join(folder_path, pdf_file)
                with pdfplumber.open(file_path) as pdf:
                    for page in pdf.pages:
                        text = page.extract_text()
                        tables = page.extract_tables()

                        # Append data to the list
                        all_data.append({
                            "file_name": pdf_file,
                            "page_number": page.page_number,
                            "text": clean_text(text),  # Clean the text directly
                            "processed_tables": process_table(tables[0]) if tables else "",  # Cleaned tables
                            "metadata": metadata  # Folder-based metadata
                        })
    
    df = pd.DataFrame(all_data)
    
    # Remove rows with no text
    df = df[df["text"] != ""].reset_index(drop=True)
    return df

from tqdm import tqdm

def init_vectorstore(df, embedding, persist_directory="vectorstore"):
    """
    Initializes a vector store by embedding and chunking the text data, with support for persistence.

    Parameters:
        df (pd.DataFrame): The DataFrame containing the text data.
        embedding: The embedding model to use.
        persist_directory (str): Directory to save/load the vector store.

    Returns:
        vec_store: The initialized or loaded vector store.
    """
    # Check if the vector store already exists
    if os.path.exists(persist_directory):
        print(f"Loading vector store from {persist_directory}...")
        vec_store = Chroma(embedding_function=embedding, persist_directory=persist_directory)
        print("Loaded existing vector store.")
        return vec_store

    # Initialize the vector store if it doesn't exist
    print("Vector store not found. Initializing new vector store...")

    sem_text_splitter = SemanticChunker(embedding, breakpoint_threshold_type="gradient")
    chunks = []

    # Add tqdm progress bar for iterating over DataFrame rows
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Processing documents"):
        text = row["text"]
        text_chunks = sem_text_splitter.create_documents([text])
        unique_id = f"{row.file_name}_page_{row.page_number}"

        for chunk in text_chunks:
            chunk.metadata = {"id": unique_id}
            chunks.append(chunk)

    print(f"Processed {len(chunks)} chunks. Initializing vector store...")

    # Create a new Chroma vector store
    vec_store = Chroma(embedding_function=embedding, persist_directory=persist_directory)
    uuids = [str(uuid4()) for _ in range(len(chunks))]
    vec_store.add_documents(documents=chunks, ids=uuids)
    return vec_store


def init_retriever(vec_store, k):
    '''
    Sets up a retriever on vec_store that when invoked will find the k most similar docs
    '''
    return vec_store.as_retriever(search_kwargs={"k": k})

def build_rag_chain(retriever, llm):

    template = """
    Use the following context to answer the question. Only use  information from the context provided.
    Do not ask questions.

    Context: {context}

    Question: {question}

    Answer: """

    prompt = PromptTemplate.from_template(template)

    doc_chain = retriever | (lambda docs: docs if docs else None)

    rag_chain = (
        {"context": retriever | (lambda docs: " ".join(doc.page_content for doc in docs) if docs else ""),
        "question": RunnablePassthrough()}
        | prompt
        | llm
        | (lambda output: output.replace("\n", " ").strip())
        | StrOutputParser()
    )

    combined_chain = RunnableParallel(
        {
            "question": RunnablePassthrough(),
            "answer": rag_chain,
            "docs": doc_chain,
        }
    )

    return combined_chain

def handle_query(rag_chain, query):
    return rag_chain.invoke(query)

def create_answer_fn(chain):
    """
    Creates an answer_fn for Giskard's evaluation function based on the handle_query function.

    Parameters:
        chain: The RAG chain to be used for answering questions.

    Returns:
        A callable `answer_fn` that takes a `question` and `history` and uses the chain to generate answers.
    """
    def answer_fn(question, history=None):
        # Call handle_query with the provided chain and question
        output = handle_query(chain, question)
        return output["answer"]
        
    return answer_fn


# Use another LLM to set up a knowledgebase using giskard.rag.knowledge_base.KnowledgeBase

A class to handle the knowledge base and the associated vector store.

## Parameters:
- **`knowledge_base_df`** (`pd.DataFrame`)  
  A dataframe containing the whole knowledge base.

- **`columns`** (`Sequence[str]`, *optional*)  
  The list of columns from the knowledge base to consider. If not specified, all columns of the knowledge base dataframe will be concatenated to produce a single document.  
  **Example**: If your knowledge base consists of FAQ data with columns `"Q"` and `"A"`, the rows will be formatted into a single document as:  
  `Q: [question]\nA: [answer]`.

- **`seed`** (`int`, *optional*)  
  The seed to use for random number generation.

- **`llm_client`** (`LLMClient`, *optional*)  
  The LLM client to use for question generation. If not specified, a default OpenAI client will be used.

- **`embedding_model`** (`BaseEmbedding`, *optional*)  
  The embedding model to use for the knowledge base. By default, the Giskard default model is used, which is OpenAI `"text-embedding-ada-002"`.

- **`min_topic_size`** (`int`, *optional*)  
  The minimum number of documents to form a topic inside the knowledge base.

- **`chunk_size`** (`int`, *default: 2048*)  
  The number of documents to embed in a single batch.<>
  

In [None]:
print(list(AVAILABLE_LLMS.keys()))

In [None]:
from giskard.rag import KnowledgeBase, generate_testset

FILE_PATH = "./data/raw"
columns = ["text", "file_name", "page_number"]  # Which columns the evaluation should be based on
num_most_similar_docs = 5  # How many documents the RAG should take into account when constructing an answer
num_questions_in_testset = 30
language = "en"

# We use one LLM to set up the knowledge base - this should preferably be a stronger model than the one we use in the RAG, since this will produce the "correct answers".
rag_model_name = list(AVAILABLE_LLMS.keys())[2]
eval_model_name = list(AVAILABLE_LLMS.keys())[2]

print(f"Selected RAG model: {rag_model_name}")
print(f"Selected evaluation model: {eval_model_name}")

# Set up the LLMs
rag_llm = init_llm(rag_model_name)
eval_llm = init_llm(eval_model_name)

# Set up the embedding models
rag_emb = init_emb(rag_model_name)
eval_emb = init_emb(eval_model_name)

In [None]:
# Preprocess data and create vector store
# This script first checks if preprocessed data and vector store files already exist. 
# If they do, it loads them to save time. Otherwise, it processes the data and/or initializes 
# the vector store, saving them for future runs.
FILE_PATH = "./data/raw"
columns = ["text", "file_name", "page_number"]  # Which columns the evaluation should be based on
num_most_similar_docs = 5  # How many documents the RAG should take into account when constructing an answer
num_questions_in_testset = 30
language = "en"
PROCESSED_DATA_PATH = "./data/processed/processed_data.pkl"
VECTORSTORE_PATH = "./chroma_langchain_db"

# Load or process the data
if os.path.exists(PROCESSED_DATA_PATH):
    print(f"Loading processed data from {PROCESSED_DATA_PATH}...")
    with open(PROCESSED_DATA_PATH, "rb") as f:
        data = pickle.load(f)
    print(f"Loaded processed data. Number of records: {len(data)}")
else:
    print(f"Processed data file not found. Reading files from {FILE_PATH}...")
    data = read_files(FILE_PATH)
    print(f"Finished processing files. Number of records: {len(data)}")

    print(f"Saving processed data to {PROCESSED_DATA_PATH}...")
    with open(PROCESSED_DATA_PATH, "wb") as f:
        pickle.dump(data, f)
    print("Processed data saved successfully.")

# Load or create the vector store
if os.path.exists(VECTORSTORE_PATH):
    print(f"Loading vector store from {VECTORSTORE_PATH}...")
    with open(VECTORSTORE_PATH, "rb") as f:
        vec_store = pickle.load(f)
    print("Vector store loaded successfully.")
else:
    print("Vector store file not found. Initializing vector store...")
    vec_store = init_vectorstore(data, eval_emb)
    print("Vector store initialized successfully.")

In [None]:
from openai import OpenAI

num_most_similar_docs = 3
columns = ["text", "file_name", "page_number"]

# Set up the retriever
print(f"Initializing retriever with {num_most_similar_docs} most similar documents.")
retriever = init_retriever(vec_store, num_most_similar_docs)
print("Retriever initialized.")

# Build the RAG chain
print("Building the RAG chain...")
rag_chain = build_rag_chain(retriever, rag_llm)
print("RAG chain built.")

# Create the answer function
print("Creating the answer function for the RAG chain...")
answer_fn = create_answer_fn(rag_chain)
print("Answer function created.")

print("Setup complete! Ready to generate test sets or evaluate.")

print("Creating the knowledgebase...")
gpt_knowledge_base = KnowledgeBase(
    data=data,  # Your DataFrame
    columns=columns,  # List of columns to use
    seed=42,
    min_topic_size=2,
    chunk_size=2048
)

num_questions_in_testset = 10
language = "en"

testset = generate_testset(
    knowledge_base=gpt_knowledge_base,
    num_questions=num_questions_in_testset,  # Specify the number of questions
    language=language,  # Language for questions
    agent_description="This LLM assists with queries using a knowledge base."
)

# Testset Generation

The `giskard.rag.generate_testset` function generates a test set from a knowledge base.

```python
giskard.rag.generate_testset(
    knowledge_base: KnowledgeBase, 
    num_questions: int = 120, 
    question_generators: QuestionGenerator | Sequence[QuestionGenerator] = None, 
    language: str | None = 'en', 
    agent_description: str | None = 'This agent is a chatbot that answers questions from users.'
) -> QATestset
```

## Parameters:
- **`knowledge_base`** (`KnowledgeBase`):  
  The knowledge base to generate questions from.

- **`num_questions`** (`int`):  
  The number of questions to generate. By default, 120.

- **`question_generators`** (`Union[BaseQuestionModifier, Sequence[BaseQuestionModifier]]`):  
  Question generators to use for question generation. If multiple generators are specified, `num_questions` will be generated with each generator.  
  If not specified, all available question generators will be used.

- **`language`** (`str`, optional):  
  The language to use for question generation. The default is `"en"` for English.

- **`agent_description`** (`str`, optional):  
  Description of the agent to be evaluated. This will be used in the prompt for question generation to get more fitting questions.

## Returns:
- The generated test set.

### Return Type:
- **`QATestset`**

In [None]:
from giskard.rag import evaluate as gk_eval

report = gk_eval(answer_fn, testset=testset, knowledge_base=gpt_knowledge_base)

In [None]:
report