In [1]:
%%bash
pip install --upgrade haystack-ai anthropic-haystack pypdf PyMuPDF reportlab Pillow typing jinja2 --upgrade gradio --upgrade reportlab langdetect  "sentence-transformers>=2.2.0" --quiet


     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 78.6/78.6 kB 7.7 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 981.5/981.5 kB 57.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 494.0/494.0 kB 39.8 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 303.4/303.4 kB 28.1 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 20.0/20.0 MB 114.6 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.0/2.0 MB 94.3 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 54.1/54.1 MB 45.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 323.1/323.1 kB 30.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 95.2/95.2 kB 10.3 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 92.0/92.0 kB 9.8 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 11.5/11.5 MB 133.6 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 72.0/72.0 kB 7.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3

In [None]:
import os
import gradio as gr
from datetime import datetime
from pathlib import Path
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators.utils import print_streaming_chunk
from haystack.utils import Secret
from haystack_integrations.components.generators.anthropic import AnthropicChatGenerator
from haystack.components.generators import HuggingFaceAPIGenerator, OpenAIGenerator
from typing import List, Optional, Dict
from haystack.dataclasses import ChatMessage, Document
from haystack import component
from jinja2 import Template
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
import numpy as np
from typing import Dict, Tuple

# Bestehende Komponenten
from haystack.components.writers import DocumentWriter
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.dataclasses import ChatMessage

# for pdf
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.units import mm
from reportlab.platypus import PageTemplate, Frame
from reportlab.pdfgen import canvas
from PIL import Image as PILImage
from reportlab.lib.colors import HexColor

from langdetect import detect

# detect language of uploaded documents
# to help the LLM answer in english if questions and documents are in english
def detect_language(text):
    try:
        return detect(text)
    except:
        return 'en'  # Default to English if detection fails

# Globale variables
global global_model_name
global global_output_dir
global_model_name = "Nicht spezifiziert"
global_output_dir = None


# some predefined questions to be presented in a dropdown menu
predefined_questions = [
    "Wie hoch ist die Solvenzquote?",
    "manual input"
]

# read api key for each selected model from local text file
def read_api_key(model_name):
    key_file_map = {
        "claude": "/content/anthropic_api_key.txt",
        "gpt": "/content/openai_api_key.txt",
        "default": "/content/huggingface_api_key.txt"
    }

    if "claude" in model_name.lower():
        key_file = key_file_map["claude"]
    elif "gpt" in model_name.lower():
        key_file = key_file_map["gpt"]
    else:
        key_file = key_file_map["default"]

    try:
        with open(key_file, 'r') as file:
            return file.read().strip()
    except FileNotFoundError:
        raise FileNotFoundError(f"API key file not found: {key_file}")

# initializing inmemory document store
# store documents in local memory only
document_store = InMemoryDocumentStore()

# function to clear document store when documents are deleted from interface
def clear_document_store():
    global document_store
    document_store = InMemoryDocumentStore()
    print("Document store cleared. A new empty store has been created.")

# batch embedding algorithms - needed to process large documents that won't fit into the models token window
# helps to reduce RAM and GPU requirements
@component
# divides a large list of documents into smaller batches (default: 8 documents per batch)
# process each batch separately through the Sentence transformer embedder and merges the results
class BatchSentenceTransformersDocumentEmbedder:
    def __init__(self, batch_size: int = 8):
        self.embedder = SentenceTransformersDocumentEmbedder()
        self.batch_size = batch_size

    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document]):
        embeddings = []
        self.embedder.warm_up()
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i+self.batch_size]
            batch_embeddings = self.embedder.run(documents=batch)['documents']
            embeddings.extend(batch_embeddings)
        return {"documents": embeddings}

@component
# divides long texts into chunks (default: 4000 characters)
# generates separate embeddings for each chunk
# calculates average embeddings to retrieve a single representation of the complete text
class BatchSentenceTransformersTextEmbedder:
    def __init__(self):
        self.embedder = SentenceTransformersTextEmbedder()

    @component.output_types(embedding=List[float])
    def run(self, text: str):
        chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
        embeddings = []
        self.embedder.warm_up()
        for chunk in chunks:
            embedding = self.embedder.run(text=chunk)['embedding']
            embeddings.append(embedding)
        avg_embedding = np.mean(embeddings, axis=0).tolist()
        return {"embedding": avg_embedding}

# template handling: original template may be updated by user
# make sure that important parts of template (protected placeholders) are not changed and the original template may be restored
# Template Definition
ORIGINAL_TEMPLATE = """
Role: You are acting as a Supervisory ESG Compliance Advisor.
Objective: Your task is to evaluate whether a financial fund qualifies as an ESG (Environmental, Social, and Governance) fund under the Sustainable Finance Disclosure Regulation (SFDR), and to determine whether it falls under Article 8 or Article 9 classification.

Instructions:
1. Stick strictly to the facts presented in the document.
2. If specific information is not available, clearly state: “I cannot find this information.”
3. For each piece of information, include the paragraph number and page number where it was found in the document.
4. Ensure the output is easy to read, visually separated, and traceable to the source document.

Contexts:
{% for doc_name, documents in all_documents.items() %}
Document: {{ doc_name }}
{% for document in documents %}
    {{ document.content }}
{% endfor %}

{% endfor %}

Question: {{ question }}

Answer:
"""

# Global variable to store the current template
current_template = ORIGINAL_TEMPLATE

# Define the placeholders that should not be changed, with their required occurrences
PROTECTED_PLACEHOLDERS: Dict[str, int] = {
    "{% for doc_name, documents in all_documents.items() %}": 1,
    "{% for document in documents %}": 1,
    "{% endfor %}": 2,  # This placeholder should appear twice
    "{{ doc_name }}": 1,
    "{{ document.content }}": 1,
    "{{ question }}": 1
}

def validate_template(template: str) -> Tuple[bool, str]:
    placeholder_counts = {placeholder: 0 for placeholder in PROTECTED_PLACEHOLDERS}

    for placeholder, required_count in PROTECTED_PLACEHOLDERS.items():
        actual_count = template.count(placeholder)
        placeholder_counts[placeholder] = actual_count

        if actual_count < required_count:
            return False, f"Protected placeholder missing or insufficient: '{placeholder}'. Expected {required_count}, found {actual_count}."
        elif actual_count > required_count:
            return False, f"Too many occurrences of protected placeholder: '{placeholder}'. Expected {required_count}, found {actual_count}."

    return True, "Template is valid"

def update_template(new_template: str) -> str:
    global current_template
    is_valid, message = validate_template(new_template)
    if is_valid:
        current_template = new_template
        return "Template updated successfully"
    else:
        return f"Template update failed: {message}"

def update_template_with_validation(new_template: str) -> Tuple[str, str]:
    result = update_template(new_template)
    if "successfully" in result:
        return result, new_template
    else:
        return result, current_template

def reset_template() -> Tuple[str, str]:
    global current_template
    current_template = ORIGINAL_TEMPLATE
    return ORIGINAL_TEMPLATE, "Template reset to original"

# convert the prompt string into a ChatMessage to be used by LLM
@component
class PromptToChatMessage:
    @component.output_types(messages=List[ChatMessage])
    def run(self, prompt: str):
        return {"messages": [ChatMessage.from_user(prompt)]}

# Modify the CustomPromptBuilder to use the current_template
@component
class CustomPromptBuilder:
    @component.output_types(prompt=str)
    def run(self, question: str, documents: List[Document], all_documents: Optional[Dict[str, List[Document]]] = None):
        if all_documents is None:
            all_documents = {"Default": documents}
        template = Template(current_template)
        context = template.render(all_documents=all_documents, question=question)
        return {"prompt": context}
# end template handling

# functions to process pdfs
# change pdfs into a format to be processed by LLMs
def process_pdfs(pdf_files, split_by, split_length, split_overlap):
    global document_store
    # clear the document store before processing new documents to avoid including old documents into the analysis
    clear_document_store()

    all_documents = {}
    total_docs = 0
    for pdf_file in pdf_files:
        try:
            preprocessing_pipeline = Pipeline()
            preprocessing_pipeline.add_component(instance=PyPDFToDocument(), name="pdf_converter") # converts pdf to text
            preprocessing_pipeline.add_component(instance=DocumentJoiner(), name="document_joiner") # joins documents together
            preprocessing_pipeline.add_component(instance=DocumentCleaner(), name="document_cleaner") # cleans text before processing
            preprocessing_pipeline.add_component(instance=DocumentSplitter(split_by=split_by, split_length=split_length, split_overlap=split_overlap), name="document_splitter") # splits large documents into smaller chunks
            preprocessing_pipeline.add_component(instance=BatchSentenceTransformersDocumentEmbedder(), name="document_embedder")

            preprocessing_pipeline.connect("pdf_converter", "document_joiner")
            preprocessing_pipeline.connect("document_joiner", "document_cleaner")
            preprocessing_pipeline.connect("document_cleaner", "document_splitter")
            preprocessing_pipeline.connect("document_splitter", "document_embedder")

            print(f"Processing file: {pdf_file.name}")
            result = preprocessing_pipeline.run({"pdf_converter": {"sources": [pdf_file.name]}})
            print(f"Pipeline result keys: {result.keys()}")

            if 'document_embedder' in result and 'documents' in result['document_embedder']:
                docs = result['document_embedder']['documents']
                all_documents[pdf_file.name] = docs
                print(f"Processed {len(docs)} documents from {pdf_file.name}")

                # Store documents in the document store
                for doc in docs:
                    doc.meta['source'] = pdf_file.name # add meta data about data source to each document
                document_store.write_documents(docs) # save documents in document store
                total_docs += len(docs)
            else:
                print(f"No documents found in the result for {pdf_file.name}")

        except Exception as e:
            print(f"Error processing {pdf_file.name}: {str(e)}")

    print(f"Total documents processed and stored: {total_docs}")
    return f"{len(pdf_files)} PDFs processed. Total of {total_docs} documents stored in the Document Store. Parameters: split_by={split_by}, split_length={split_length}, split_overlap={split_overlap}"

# pipelines for different llm model types: take into account different api specifications
def setup_claude_pipeline(model_name):
    pipe = Pipeline()
    pipe.add_component("embedder", BatchSentenceTransformersTextEmbedder())
    pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=10))
    pipe.add_component("prompt_builder", CustomPromptBuilder())
    pipe.add_component("prompt_to_chat", PromptToChatMessage())

    # take api key provided by local text file
    api_key = read_api_key(model_name)
    os.environ["ANTHROPIC_API_KEY"] = api_key
    pipe.add_component("llm", AnthropicChatGenerator(
        api_key=Secret.from_token(api_key),
        streaming_callback=print_streaming_chunk,
        model=model_name
    ))

    pipe.connect("embedder.embedding", "retriever.query_embedding")
    pipe.connect("retriever.documents", "prompt_builder.documents")
    pipe.connect("prompt_builder.prompt", "prompt_to_chat.prompt")
    pipe.connect("prompt_to_chat.messages", "llm.messages")

    return pipe

def setup_openai_pipeline(model_name):
    pipe = Pipeline()
    pipe.add_component("embedder", BatchSentenceTransformersTextEmbedder())
    pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=10))
    pipe.add_component("prompt_builder", CustomPromptBuilder())

    # take api key provided by local text file
    api_key = read_api_key(model_name)
    os.environ["OPENAI_API_KEY"] = api_key
    pipe.add_component("llm", OpenAIGenerator(
        api_key=Secret.from_token(api_key),
        model=model_name,
        streaming_callback=print_streaming_chunk
    ))

    pipe.connect("embedder.embedding", "retriever.query_embedding")
    pipe.connect("retriever.documents", "prompt_builder.documents")
    pipe.connect("prompt_builder.prompt", "llm.prompt")

    return pipe

def setup_other_pipeline(model_name):
    pipe = Pipeline()
    pipe.add_component("embedder", BatchSentenceTransformersTextEmbedder())
    pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=10))
    pipe.add_component("prompt_builder", CustomPromptBuilder())

    # read api key provided by local text file
    api_key = read_api_key(model_name)
    os.environ["HUGGINGFACE_API_TOKEN"] = api_key
    pipe.add_component("llm", HuggingFaceAPIGenerator(
        api_type="serverless_inference_api",
        api_params={"model": model_name},
        token=Secret.from_token(api_key)
    ))

    pipe.connect("embedder.embedding", "retriever.query_embedding")
    pipe.connect("retriever.documents", "prompt_builder.documents")
    pipe.connect("prompt_builder.prompt", "llm.prompt")

    return pipe

# controls the length of the response for the Gradio interface
def truncate_text(text: str, max_tokens: int) -> str:
    """
    Truncate the text to approximately max_tokens.
    This is a simple approximation and might not be exact.
    """
    words = text.split()
    if len(words) <= max_tokens:
        return text
    return ' '.join(words[:max_tokens]) + '...'

# process of taking a question and generating an answer by LLM
def get_question_and_answer(model, predefined, manual, history, output_dir):
    global global_model_name, global_output_dir
    global_model_name = update_model_name(model)
    global_output_dir = update_output_dir(output_dir)
    print(f"In get_question_and_answer - Current model: {global_model_name}")
    print(f"In get_question_and_answer - Current output directory: {global_output_dir}")
    question = manual if predefined == "manual input" else predefined

    if not question or not question.strip():
        return "Please ask a question.", history, "no valid question asked"

    print(f"Processing question: {question}")
    print(f"Using model: {model}")

    try:
        # Pipe-Setup according to selected LLM
        if "claude" in model.lower():
            pipe = setup_claude_pipeline(model)
        elif "gpt" in model.lower():
            pipe = setup_openai_pipeline(model)
        else:
            pipe = setup_other_pipeline(model)

        print("Pipeline setup complete")

        # detect the language of the answer - e.g. to respond in english if asked in english
        question_language = detect_language(question)

        # embeddings for question
        embedder = BatchSentenceTransformersTextEmbedder()
        question_embedding = embedder.run(text=question)["embedding"]

        print("Question embedding created")

        # retrieve relevant documents
        retriever = InMemoryEmbeddingRetriever(document_store=document_store, top_k=10)
        relevant_docs = retriever.run(query_embedding=question_embedding)["documents"]
        print(f"Retrieved {len(relevant_docs)} relevant documents")

        # group documents according to source
        grouped_documents = {}
        for doc in relevant_docs:
            source = doc.meta.get('source', 'Unknown')
            if source not in grouped_documents:
                grouped_documents[source] = []
            grouped_documents[source].append(doc)

        print(f"Retrieved documents from: {list(grouped_documents.keys())}")

        result = pipe.run(
            {
                "embedder": {"text": question},
                "prompt_builder": {"question": question, "all_documents": grouped_documents}
            }
        )
        print("Pipeline run complete")

        # extract answer from selected LLM
        if "llm" in result:
            if "replies" in result["llm"]:
                # Handle Claude output
                if isinstance(result["llm"]["replies"][0], ChatMessage):
                    answer = result["llm"]["replies"][0].text
                else:
                    answer = result["llm"]["replies"][0]
            elif "generated_texts" in result["llm"]:
                answer = result["llm"]["generated_texts"][0]
            else:
                raise ValueError("Unexpected response format from LLM")
        else:
            raise ValueError("No LLM output found in pipeline result")

        print(f"LLM Answer: {answer[:100]}...")  # Print first 100 characters of the answer

        # answer needs to be in string format
        if not isinstance(answer, str):
          answer = str(answer)

        # clean answers if necessary
        answer = answer.split("[Document:")[0].strip()
        if "Question:" in answer:
            answer = answer.split("Question:")[0].strip()

        # Truncate the answer if it's too long (adjust the limit as needed)
        max_tokens = 1000  # Adjust this value based on your needs
        truncated_answer = truncate_text(answer, max_tokens)

        if len(truncated_answer) < len(answer):
            truncated_answer += "\n\n[answers trunkated due to token restrictions.]"

        # Update history
        history.append({"question": question, "answer": truncated_answer})
        print(f"Updated history length: {len(history)}")

        return truncated_answer, history, "Question successfully answered"

        # Update history
        history.append({"question": question, "answer": answer})
        print(f"Updated history length: {len(history)}")

        return answer, history, "Question successfully answered"

    except Exception as e:
        print(f"Error in get_question_and_answer: {str(e)}")
        return f"An error occurred: {str(e)}", history, "Error answering the question"


# update name of selected model for pdf report
def update_model_name(name):
    global global_model_name
    global_model_name = name
    print(f"update_model_name called. Updated model name: {global_model_name}")
    return name

# set directory to store pdf report
def update_output_dir(dir):
    global global_output_dir
    global_output_dir = dir.strip() if dir and dir.strip() else None
    print(f"update_output_dir called. Updated output directory: {global_output_dir}")
    return dir

# function called when files are removed from interface
def on_files_removed(files):
    if not files:  # If all files are removed
        clear_document_store()
        return "document store emptied. You can upload new documents."
    return "Some documents have been removed. Please process pdfs again to update document store."

# add FMA logo to report header
def add_header(canvas, doc):
    canvas.saveState()
    # for the time being, logo has to be stored locally
    logo_path = "/content/florence 1.JPG"

    with PILImage.open(logo_path) as img:
        img_width, img_height = img.size
        img_mode = img.mode

    aspect = img_height / float(img_width)

    logo_width = doc.width
    logo_height = logo_width * aspect

    canvas.setFillColor(HexColor('#ec6600'))
    canvas.rect(0, doc.height, doc.width + doc.leftMargin + doc.rightMargin, logo_height, fill=True)

    canvas.drawImage(logo_path, doc.leftMargin, doc.height + doc.topMargin - logo_height,
                     width=logo_width, height=logo_height, mask='auto' if img_mode == 'RGBA' else None)
    canvas.restoreState()

# functions for pdf report generation
def generate_pdf_report_wrapper(history):
    global global_model_name
    global global_output_dir

    try:
        print(f"generate_pdf_report_wrapper called.")
        print(f"Current global_model_name: {global_model_name}")
        print(f"Current global_output_dir: {global_output_dir}")
        print(f"History length: {len(history)}")
        print(f"History content: {history}")

        if not history:
            return "No questions and answers available to generate report."

        model_name = global_model_name if global_model_name != "Nicht spezifiziert" else "Standard-Modell"
        output_dir = global_output_dir if global_output_dir else os.getcwd()

        return generate_pdf_report(history, llm_model=model_name, output_dir=output_dir)
    except Exception as e:
        print(f"Error in generate_pdf_report_wrapper: {str(e)}")
        return f"Error in generate_pdf_report_wrapper: {str(e)}"

def generate_pdf_report(history, llm_model=None, output_dir=None):
    print(f"Received llm_model in generate_pdf_report: {llm_model}")
    print(f"Type of llm_model: {type(llm_model)}")
    print(f"Received output_dir in generate_pdf_report: {output_dir}")
    print(f"Type of output_dir: {type(output_dir)}")

    if not history:
        return "No questions and answers available to generate report."

    try:
        # look after output directory
        if output_dir is None or (isinstance(output_dir, str) and not output_dir.strip()):
            output_dir = os.getcwd()

        # output directory needs to be in string format
        output_dir = str(output_dir)

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        pdf_filename = f"qa_report_{timestamp}.pdf"
        pdf_path = os.path.join(output_dir, pdf_filename)

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        pdf_filename = f"qa_report_{timestamp}.pdf"
        pdf_path = os.path.join(output_dir, pdf_filename)

        # style the page of the generated report
        doc = SimpleDocTemplate(pdf_path, pagesize=A4, topMargin=40*mm)
        styles = getSampleStyleSheet()
        styles.add(ParagraphStyle(name='Orange', textColor=HexColor('#ec6600')))
        styles.add(ParagraphStyle(name='Small', fontSize=10, textColor=HexColor('#666666')))
        story = []

        page_template = PageTemplate(frames=[Frame(doc.leftMargin, doc.bottomMargin, doc.width, doc.height - 45*mm)],
                                     onPage=add_header)
        doc.addPageTemplates([page_template])

        story.append(Paragraph("Q&A report", styles['Title']))
        story.append(Spacer(1, 6))

        # store the name of the selected LLM
        model_text = "selected LLM model: "
        if llm_model and isinstance(llm_model, str) and llm_model.strip():
            model_text += llm_model.strip()
        else:
            model_text += "Nicht spezifiziert"

        print(f"Model text to be added to PDF: {model_text}")
        story.append(Paragraph(model_text, styles['Small']))

        story.append(Spacer(1, 12))

        for i, qa in enumerate(history, 1):
            question = qa.get('question', 'no question available')
            answer = qa.get('answer', 'no answer available')

            # Q&A need to be in string format
            question = str(question)
            answer = str(answer)

            story.append(Paragraph(f"Question {i}: {question}", styles['Orange']))
            story.append(Paragraph(answer, styles['BodyText']))
            story.append(Spacer(1, 12))

        doc.build(story)
        return f"pdf report generated: {pdf_path}"
    except Exception as e:
        print(f"Error in generate_pdf_report: {str(e)}")
        return f"Error in generate_pdf_report: {str(e)}"

# Gradio interface
# set theme for interface
theme = gr.themes.Base(
    primary_hue="gray",
    secondary_hue="orange",
).set(
    # Button styles
    button_primary_background_fill="*secondary_500",
    button_primary_background_fill_dark="*secondary_600",
    button_primary_background_fill_hover="*neutral_200",  # Light gray on hover
    button_primary_background_fill_hover_dark="*neutral_700",  # Darker gray for dark mode
    button_primary_border_color="*secondary_500",
    button_primary_border_color_hover="*neutral_200",  # Light gray border on hover
    button_primary_text_color="white",
    button_primary_text_color_hover="*neutral_800",  # Dark text on light background
    button_secondary_background_fill="*secondary_500",
    button_secondary_background_fill_dark="*secondary_600",
    button_secondary_background_fill_hover="*neutral_200",  # Light gray on hover
    button_secondary_background_fill_hover_dark="*neutral_700",  # Darker gray for dark mode
    button_secondary_border_color="*secondary_500",
    button_secondary_border_color_hover="*neutral_200",  # Light gray border on hover
    button_secondary_text_color="white",
    button_secondary_text_color_hover="*neutral_800",  # Dark text on light background
    # Slider styles
    slider_color="*secondary_500",
    slider_color_dark="*secondary_600",
)

# Custom CSS to style the warning text with higher specificity and !important
custom_css = """
.warning-orange.custom-info {
    margin-top: 0.5rem;
    padding: 0.5rem;
    border: 1px solid #ec6600;
    border-radius: 4px;
    background-color: #fff3e0;
}
.warning-orange.custom-info p {
    color: #ec6600 !important;
    font-weight: bold !important;
    margin: 0;
}
"""

# define parts of interface
with gr.Blocks(theme=theme, css=custom_css) as iface:
    history = gr.State([])

    with gr.Row():
        gr.Image("/content/florence 1.JPG", show_label=False, height=180, width=1500)

    gr.Markdown("# ESGenius")
    gr.Markdown("Upload one or more pdf documents, preprocess them, select an LLM model and pose your questions.")

    with gr.Tab("process pdfs"):
        pdf_files = gr.Files(label="upload pdf documents", file_types=[".pdf"])
        split_by = gr.Dropdown(choices=["word", "sentence", "passage"], value="word", label="Split By")
        split_length = gr.Slider(minimum=100, maximum=1000, value=250, step=50, label="Split Length", elem_classes="custom-slider")
        split_overlap = gr.Slider(minimum=0, maximum=500, value=50, step=50, label="Split Overlap", elem_classes="custom-slider")
        process_button = gr.Button("process pdfs")
        pdf_output = gr.Textbox(label="process status")
        process_button.click(process_pdfs, inputs=[pdf_files, split_by, split_length, split_overlap], outputs=[pdf_output])

        # event handler for removing files
        pdf_files.change(on_files_removed, inputs=[pdf_files], outputs=[pdf_output])

    with gr.Tab("Template Management"):
        gr.Markdown("Adapt the prompt template according to your needs! Changes will only take effect after clicking the button UPDATE TEMPLATE Buttons. Using the RESET TEMPLATE button, the original template can be restored. ")
        template_text = gr.TextArea(
            label="current template",
            value=current_template,
            lines=20
        )

        warning_text = gr.Markdown(
            "⚠️ **Warning:** placeholders like {% for ... %}, {{ ... %}, must not be changed or deleted!",
            elem_classes=["warning-orange", "custom-info"]
        )

        with gr.Row():
            update_template_button = gr.Button("Update Template", size="sm")
            reset_template_button = gr.Button("Reset Template", size="sm")

        template_status = gr.Textbox(label="Template Status")

        update_template_button.click(
            update_template_with_validation,
            inputs=[template_text],
            outputs=[template_status, template_text]
        )

        reset_template_button.click(
            reset_template,
            outputs=[template_text, template_status]
        )

    with gr.Tab("Q&A section"):
        model_dropdown = gr.Dropdown(
            choices=[
                "HuggingFaceH4/zephyr-7b-beta",
                "meta-llama/Meta-Llama-3.1-8B-Instruct",
                "meta-llama/Meta-Llama-3.1-70B-Instruct",
                "claude-3-opus-20240229",
                "claude-3-sonnet-20240229",
                "claude-3-haiku-20240307",
                "claude-3-7-sonnet-20250219",
                "gpt-3.5-turbo",
                "gpt-4",
                "gpt-4-turbo-preview"
            ],
            label="select LLM model",
            value="HuggingFaceH4/zephyr-7b-beta"
        )

        predefined_question_dropdown = gr.Dropdown(
            choices=predefined_questions,
            label="choose a question or 'manual input'",
            value="manual input"
        )

        manual_question_input = gr.Textbox(
            lines=2,
            placeholder="put your question here...",
            label="manual input",
            visible=True,
            interactive=True
        )


        answer_button = gr.Button("answer question")
        answer_output = gr.Textbox(label="answer")

        output_dir_input = gr.Textbox(label="output directory for pdf (optional)", placeholder="keep empty for current directory")

        generate_report_button = gr.Button("generate pdf report")
        report_status = gr.Textbox(label="status report")


    def toggle_manual_input(choice):
        return gr.update(visible=(choice == "manual input"))

    predefined_question_dropdown.change(
        toggle_manual_input,
        inputs=[predefined_question_dropdown],
        outputs=[manual_question_input]
    )

    def clear_manual_input(choice):
        return gr.update(visible=(choice == "manual input"), value="")

    def set_initial_state():
        return gr.update(visible=True)

    iface.load(set_initial_state, outputs=[manual_question_input])

    predefined_question_dropdown.change(
        clear_manual_input,
        inputs=[predefined_question_dropdown],
        outputs=[manual_question_input]
    )

    def update_model_name(name):
        gr.State.model_name = name
        return name

    def update_output_dir(dir):
        gr.State.output_dir = dir
        return dir

    def show_history(history):
        return f"current history: {history}"

    model_dropdown.change(update_model_name, inputs=[model_dropdown], outputs=[model_dropdown])
    output_dir_input.change(update_output_dir, inputs=[output_dir_input], outputs=[output_dir_input])

    answer_button.click(
        get_question_and_answer,
        inputs=[model_dropdown, predefined_question_dropdown, manual_question_input, history, output_dir_input],
        outputs=[answer_output, history, report_status]
    )

    generate_report_button.click(
        generate_pdf_report_wrapper,
        inputs=[history],
        outputs=[report_status]
    )

# start interface
if __name__ == "__main__":
    iface.launch(debug=True)




It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://8ec436f87dbde07e63.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://8ec436f87dbde07e63.gradio.live
