Dependencias

In [None]:
"""
This script installs the necessary Python packages for the project. The packages include:

- llama-index: A package for creating and managing indexes.
- llama-index-llms-azure-openai: A package for integrating Llama Index with Azure OpenAI.
- python-dotenv: A package for reading key-value pairs from a .env file and setting them as environment variables.
- pymupdf==1.23.22: A package for working with PDF files.
- transformers: A package for state-of-the-art Natural Language Processing for Pytorch and TensorFlow 2.0.
- tiktoken: A package for tokenizing text.
- matplotlib: A package for creating static, animated, and interactive visualizations in Python.
"""
%pip install llama-index
%pip install llama-index-llms-azure-openai
%pip install python-dotenv
%pip install pymupdf==1.23.22
%pip install transformers tiktoken
%pip install matplotlib

Librerías

In [2]:
"""
This script imports various libraries and modules required for different functionalities such as:

- `json`: For handling JSON data.
- `os`: For interacting with the operating system.
- `requests`: For making HTTP requests.
- `fitz`: For working with PDF documents.
- `matplotlib.pyplot`: For creating visualizations.
- `numpy`: For numerical operations.
- `pandas`: For data manipulation and analysis.
- `tiktoken`: (Assumed to be a custom or third-party library, not a standard Python library)
- `dotenv`: For loading environment variables from a .env file.
- `openai.AzureOpenAI`: For interacting with Azure OpenAI services.
- `llama_index.core.agent.FunctionCallingAgentWorker`: For agent-related functionalities.
- `llama_index.core.agent.AgentRunner`: For running agents.
- `llama_index.core.tools.FunctionTool`: For using function tools.
- `llama_index.llms.azure_openai.AzureOpenAI`: For Azure OpenAI language model services.
"""
import json
import os
import requests
import fitz
import json
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tiktoken
from dotenv import load_dotenv
from llama_index.core.agent import FunctionCallingAgentWorker
from llama_index.core.agent import AgentRunner
from llama_index.core.tools import FunctionTool
from llama_index.llms.azure_openai import AzureOpenAI

Carga de variables de env

In [3]:
"""
This script loads environment variables and assigns them to corresponding variables for use in an Azure OpenAI and Bing Search API integration.
Environment Variables:
- AZURE_OPENAI_ENDPOINT: The endpoint URL for the Azure OpenAI service.
- AZURE_OPENNAI_DEPLOYMENT_NAME: The deployment name for the Azure OpenAI service.
- AZURE_OPENAI_API_KEY: The API key for accessing the Azure OpenAI service.
- AZURE_OPENAI_ENDPOINT_EMBEDINGS: The endpoint URL for the Azure OpenAI embeddings service.
- BING_SEARCH_API_KEY: The API key for accessing the Bing Search API.
- AZURE_OPENAI_EMBEDINGS_API_KEY: The API key for accessing the Azure OpenAI embeddings service.
- AZURE_OPENAI_EMBEDINGS_DEPLOYMENT_NAME: The deployment name for the Azure OpenAI embeddings service.
Note:
Ensure that the .env file contains the above environment variables with valid values.
"""
load_dotenv(override=True)

azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
azure_openai_deployment_name = os.getenv("AZURE_OPENNAI_DEPLOYMENT_NAME")
azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_endpoint_embeding = os.getenv("AZURE_OPENAI_ENDPOINT_EMBEDINGS")
bing_search_api_key = os.getenv("BING_SEARCH_API_KEY")
azure_openai_embeding_api_key = os.getenv("AZURE_OPENAI_EMBEDINGS_API_KEY")
azure_openai_embeding_deployment_name = os.getenv("AZURE_OPENAI_EMBEDINGS_DEPLOYMENT_NAME")

Crea el cliente Azure OpenAI en LlamaIndex

In [4]:
"""
Initializes two instances of the AzureOpenAI class with different configurations.
Attributes:
    azure_openai_client (AzureOpenAI): An instance of AzureOpenAI configured for general use with the specified engine, endpoint, API key, and API version.
    azure_openai_client_embeding (AzureOpenAI): An instance of AzureOpenAI configured for embedding use with a different engine, endpoint, API key, and the same API version.
Parameters:
    azure_openai_deployment_name (str): The name of the deployment for the general use engine.
    azure_openai_endpoint (str): The endpoint URL for the general use Azure OpenAI service.
    azure_openai_api_key (str): The API key for authenticating with the general use Azure OpenAI service.
    azure_openai_embeding_deployment_name (str): The name of the deployment for the embedding engine.
    azure_openai_endpoint_embeding (str): The endpoint URL for the embedding Azure OpenAI service.
    azure_openai_embeding_api_key (str): The API key for authenticating with the embedding Azure OpenAI service.
"""
azure_openai_client = AzureOpenAI(
                                engine = azure_openai_deployment_name,
                                azure_endpoint=azure_openai_endpoint,
                                api_key=azure_openai_api_key,
                                api_version="2024-05-01-preview")

from openai import AzureOpenAI

azure_openai_client_embeding = AzureOpenAI(
                                azure_endpoint=azure_openai_endpoint_embeding,
                                api_key=azure_openai_embeding_api_key,
                                api_version="2024-05-01-preview")

El usuario inicializa el prompt de sistema

In [5]:
"""
This script defines a simple conversational agent system that stores messages in a list.
Functions:
    store_message_in_list(message_list, message_role, **kwargs):
        Stores a message in the provided list with the given role and additional keyword arguments.
    get_user_input(prompt, default_message):
        Prompts the user for input and returns the input if provided, otherwise returns a default message.
Constants:
    SYSTEM_MESSAGE:
        A system message obtained from user input or a default message if no input is provided.
"""
messages = []

def store_message_in_list(message_list, message_role, **kwargs):
    message = {"role": message_role}
    message.update(kwargs)

    message_list.append(message)

def get_user_input(prompt, default_message):
        user_input = input(prompt)
        if user_input == "":
            return default_message
        return user_input

SYSTEM_MESSAGE = get_user_input("Prompt de Sistema: ","Eres un asistente que ayuda. La respuesta siempre la devolverás en el idioma en el que te hablen.") + " Debes de ser breve y conciso en tus respuestas."

store_message_in_list(messages, "system", content=SYSTEM_MESSAGE)

Añadimos tool, busqueda bing

In [6]:
"""
Searches for data using the Bing Search API and returns the snippet of the first result.
Args:
    query (str): The search query string.
Returns:
    str: The snippet of the first search result if available, otherwise "No results found."
Raises:
    requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code.
"""
def search_for_data_in_bing(query):
    headers = {"Ocp-Apim-Subscription-Key": bing_search_api_key}
    params = {"q": query,"count": 1}

    response = requests.get("https://api.bing.microsoft.com/v7.0/search", headers=headers, params=params)  
    response.raise_for_status()
    data = response.json()

    if "webPages" in data and "value" in data["webPages"] and len(data["webPages"]["value"]) > 0:
        first_result = data["webPages"]["value"][0]
        snippet = first_result.get("snippet","no snippet")
        return snippet
    else:
        return "No results found."
    

Añadimos tool de búsqueda en índice

In [7]:
"""
Calculate the embeddings for a given text using the specified Azure OpenAI client.
Args:
    text (str): The input text to calculate embeddings for.
    client: The Azure OpenAI client to use for generating embeddings. Defaults to azure_openai_client_embeding.
Returns:
    list: The embeddings for the input text.
"""
pass
"""
Calculate the cosine similarity between two vectors.
Args:
    vec1 (numpy.ndarray): The first vector.
    vec2 (numpy.ndarray): The second vector.
Returns:
    float: The cosine similarity between the two vectors.
"""
pass
"""
Find the most similar documents to the input text from a given dataset.
Args:
    input_text (str): The input text to compare against the dataset.
    data (list): A list of dictionaries containing 'text' and 'embeddings' keys.
    desired_doc_count (int): The number of most similar documents to return. Defaults to 1.
Returns:
    list: A list of tuples containing the most similar documents and their similarity scores.
"""
pass

def calculate_embeddings(text, client=azure_openai_client_embeding):
    embeddings = client.embeddings.create(input=text, model=azure_openai_embeding_deployment_name)

    return embeddings.data[0].embedding


def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

def find_most_similar(input_text, data, desired_doc_count=1):
    input_text_embeding = calculate_embeddings(input_text)
    similarities = []
    for entry in data:
        similarity = cosine_similarity(input_text_embeding, np.array(entry["embeddings"]))
        similarities.append((entry["text"], similarity))

    sorted_documents =sorted(similarities, key=lambda x: x[1], reverse=True)
    return sorted_documents[:desired_doc_count]

Localización de los pdfs usados en RAG

In [8]:
"""
Retrieves PDF files from a specified source, which can be a local directory, a local file, or a URL.
Args:
    source (str, optional): The path to a local directory, a local PDF file, or a URL to download a PDF file from. 
                            If not provided or empty, the current working directory is used.
Returns:
    list: A list of paths to the PDF files found or downloaded.
The function performs the following actions based on the source:
- If the source is a URL, it downloads the PDF file from the URL if it does not already exist locally.
- If the source is a local directory, it lists all PDF files in the directory.
- If the source is a local PDF file, it adds the file to the list.
- If the source is invalid, it prints an error message.
Example:
    pdf_files = get_pdfs("http://example.com/sample.pdf")
    pdf_files = get_pdfs("/path/to/local/directory")
    pdf_files = get_pdfs("/path/to/local/file.pdf")
"""
def get_pdfs(source=None):
    pdf_files = []

    if source is None or source == "":
        source = os.getcwd()

    if source.startswith("http://") or source.startswith("https://"):
        local_filename = source.split("/")[-1]
        if not local_filename.endswith(".pdf"):
            local_filename += ".pdf"

        if not os.path.exists(local_filename):
            print(f"File {local_filename} does not exist. Downloading from {source}...")

            response = requests.get(source)

            if response.status_code == 200:
                with open(local_filename, "wb") as file:
                    file.write(response.content)
                print(f"File downloaded and saved as {local_filename}")
                pdf_files.append(local_filename)
            else:
                print(f"Failed to download file: {response.status_code}")
        else:
            print(f"File {local_filename} already exists.")
            pdf_files.append(local_filename)
    else:
        if os.path.isdir(source):
            for file in os.listdir(source):
                if file.endswith(".pdf"):
                    pdf_files.append(os.path.join(source, file))
        elif os.path.isfile(source) and source.endswith(".pdf"):
            pdf_files.append(source)
        else:
            print(f"Invalid source: {source}")

    return pdf_files

pdf_files = get_pdfs(input("Indica la ruta del archivo o archivos en PDF o la URL de descarga del archivo PDF: "))

Explorar los archivos pdf

In [9]:
"""
Formats the given text by replacing newline characters with spaces and stripping leading/trailing whitespace.
Args:
    text (str): The text to be formatted.
Returns:
    str: The formatted text.
"""
pass
"""
Opens a PDF file, reads its content, and extracts text from each page. The text is then tokenized and word count is calculated.
Args:
    pdf_path (str): The path to the PDF file.
Returns:
    list: A list of dictionaries, each containing information about a page in the PDF, including:
        - file_name (str): The name of the PDF file.
        - page_number (int): The page number.
        - page_word_count (int): The word count of the page.
        - page_token_cont (int): The token count of the page.
        - text (str): The extracted text from the page.
"""
pass
"""
Filters out pages with zero word count from the given list of pages and texts.
Args:
    pages_and_texts_sublist (list): A list of dictionaries, each containing information about a page.
Returns:
    list: A filtered list of dictionaries, each containing information about a page with a word count greater than zero.
"""
pass

def text_formatter(text: str) -> str:
    cleaned_text = text.replace("\n", " ").strip()

    return cleaned_text

def open_and_read_pdf(pdf_path: str):
    doc = fitz.open(pdf_path)
    pages_and_texts = []

    tokenizer = tiktoken.encoding_for_model("gpt-4o")

    for page_number, page in enumerate(doc):
        text = page.get_text()
        text = text_formatter(text)

        tokens = tokenizer.encode(text)
        word_count = len(text.split())

        pages_and_texts.append({
            "file_name": pdf_path,
            "page_number": page_number,
            "page_word_count": word_count,
            "page_token_cont": len(tokens),
            "text": text
        })

    return pages_and_texts

pages_and_texts =[open_and_read_pdf(local_filename) for local_filename in pdf_files]

def get_pages_and_texts(pages_and_texts_sublist):
    pages_and_texts = [page for page in pages_and_texts_sublist if page["page_word_count"] > 0]

    return pages_and_texts

filtered_pages_and_texts =[get_pages_and_texts(pages_and_texts_sublist) for pages_and_texts_sublist in pages_and_texts]

Solapar las páginas, de tal manera que una página tenga un trozo de la anterior y de la siguiente, para evitar cortes de bloques. Un 20% es un valor de referencia común.

In [10]:
"""
Create contextual texts for each PDF by combining text from adjacent pages with a specified overlap ratio.
Args:
    filtered_pages_and_texts (list of list of dict): A list where each element is a list of dictionaries containing 
                                                        'text' and 'file_name' keys for each page of a PDF.
    overlap_ratio (float, optional): The ratio of text overlap between adjacent pages. Default is 0.2.
Returns:
    list of dict: A list of dictionaries where each dictionary contains:
                    - 'file_name': The name of the PDF file.
                    - 'texts': A single concatenated string of contextual texts for the entire PDF.
"""
def create_contextual_texts_per_pdf(filtered_pages_and_texts, overlap_ratio=0.2):
    all_output_texts = []

    for pdf_texts in filtered_pages_and_texts:
        output_texts = []
        input_texts = [page['text'] for page in pdf_texts]
        file_name = pdf_texts[0]['file_name'] if pdf_texts else ""

        for i in range(len(input_texts)):
            previous_context = input_texts[i - 1][-int(len(input_texts[i - 1]) * overlap_ratio/2):] if i > 0 else ""
            next_context = input_texts[i + 1][:int(len(input_texts[i + 1]) * overlap_ratio/2)] if i < len(input_texts) - 1 else ""

            combined_text = f"{previous_context} {input_texts[i]} {next_context}".strip()

            output_texts.append(combined_text)

        all_output_texts.append({
            "file_name": file_name,
            "texts": output_texts
        })

    return all_output_texts

overlap_ratio = 0.2

overlapped_texts_per_pdf = create_contextual_texts_per_pdf(filtered_pages_and_texts, overlap_ratio=overlap_ratio)

Concatenamos páginas sucesivas con pocos tokens para tener textos de longitud similar

In [11]:
"""
Tokenizes and calculates the token size for each text in the provided list of texts.
This script uses the `tiktoken` library to encode texts for the "gpt-3.5-turbo" model.
It processes a list of dictionaries containing file names and their associated texts,
and generates a new list of dictionaries with the file name, text, and the token size.
Variables:
    tokenizer (tiktoken.Tokenizer): The tokenizer for the "gpt-3.5-turbo" model.
    input_texts_and_tokens (list): A list of dictionaries, each containing:
        - 'file_name' (str): The name of the file.
        - 'text' (str): The text content.
        - 'token_size' (int): The number of tokens in the text.
List comprehensions:
    input_texts_and_tokens: Iterates over each text in `overlapped_texts_per_pdf` and
    each `texto` in `text['texts']`, encoding the text and calculating its token size.
"""
tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo")

input_texts_and_tokens = [{'file_name': text['file_name'],'text': texto, 'token_size': len(tokenizer.encode(texto))} for text in overlapped_texts_per_pdf for texto in text['texts']]

In [12]:
"""
Concatenates text documents into groups based on a maximum token size.
Args:
    docs (list of dict): A list of dictionaries where each dictionary represents a document with keys:
        - "file_name" (str): The name of the file the document belongs to.
        - "text" (str): The text content of the document.
        - "token_size" (int): The token size of the document.
    max_tokens (int): The maximum number of tokens allowed in a concatenated group.
Returns:
    list of dict: A list of dictionaries where each dictionary represents a concatenated group of documents with keys:
        - "file_name" (str): The name of the file the group belongs to.
        - "text" (str): The concatenated text content of the group.
        - "token_size" (int): The total token size of the concatenated group.
"""
max_tokens = 1000

def concatenate_documents(docs, max_tokens):
    concatenated_docs = []
    current_group = {"file_name": "", "text": "", "token_size": 0}

    for doc in docs:
        if current_group["file_name"] != doc["file_name"]:
            if current_group["token_size"] > 0:
                concatenated_docs.append(current_group)
            current_group = {"file_name": doc["file_name"], "text": doc["text"], "token_size": doc["token_size"]}
        elif current_group["token_size"] + doc["token_size"] <= max_tokens:
            current_group["text"] += (" " + doc["text"]).strip()
            current_group["token_size"] += doc["token_size"]
        else:
            concatenated_docs.append(current_group)
            current_group = {"file_name": doc["file_name"], "text": doc["text"], "token_size": doc["token_size"]}

    if current_group["token_size"] > 0:
        concatenated_docs.append(current_group)
    
    return concatenated_docs

concatenated_input_texts_and_tokens = concatenate_documents(input_texts_and_tokens, max_tokens)

Calcular los embeddings

In [13]:
"""
This script calculates embeddings for a list of text blocks using Azure OpenAI and saves the results to a JSON file.
Functions:
    calculate_embeddings(text, client=azure_openai_client_embeding):
        Calculates embeddings for the given text using the specified Azure OpenAI client.
        Args:
            text (str): The input text to calculate embeddings for.
            client: The Azure OpenAI client to use for generating embeddings.
        Returns:
            list: The embeddings for the input text.
Variables:
    text_and_embeddings (list): A list of dictionaries containing block IDs, text, and their corresponding embeddings.
    oputput_file (str): The name of the output file where embeddings will be saved.
File Operations:
    Opens a file named "Embeddings.json" in write mode and saves the text and embeddings data in JSON format.
    Prints a message indicating the file where embeddings are saved.
"""
def calculate_embeddings(text, client=azure_openai_client_embeding):
    embeddings = client.embeddings.create(input=text, model=azure_openai_embeding_deployment_name)

    return embeddings.data[0].embedding

text_and_embeddings = [{'block_id': block_id, 'text': text['text'], 'embeddings': calculate_embeddings(text['text'])} for block_id, text in enumerate(concatenated_input_texts_and_tokens)]

oputput_file = "Embeddings.json"

with open(oputput_file, "w") as file:
    json.dump(text_and_embeddings, file)

Tools de búsqueda en índice

In [28]:
"""
Calculate the cosine similarity between two vectors.
Parameters:
vec1 (numpy.ndarray): The first vector.
vec2 (numpy.ndarray): The second vector.
Returns:
float: The cosine similarity between vec1 and vec2.
"""
# function implementation
"""
Find the most similar documents to the input text based on cosine similarity.
Parameters:
input_text (str): The input text to compare.
data (list of dict): A list of dictionaries, each containing 'text' and 'embeddings' keys.
desired_doc_count (int, optional): The number of most similar documents to return. Defaults to 1.
Returns:
list of tuple: A list of tuples, each containing the text and its similarity score, sorted by similarity in descending order.
"""
# function implementation
def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

def find_most_similar(input_text, data, desired_doc_count=0):
    input_text_embeding = calculate_embeddings(input_text)
    similarities = []
    
    for entry in data:
        similarity = cosine_similarity(input_text_embeding, np.array(entry["embeddings"]))
        similarities.append((entry["text"], similarity))

    sorted_documents = sorted(similarities, key=lambda x: x[1], reverse=True)

    # Si no hay resultados, devolver None
    return sorted_documents[:desired_doc_count] if sorted_documents else None

Tools de búsqueda de embeddings

In [31]:
"""
This script loads a list of embeddings from a JSON file and provides a function to search for the most similar text based on input text.
Functions:
    search_for_info(input_text, comparision_data=embedings_list, desired_doc_count=1):
        Searches for the most similar text to the input_text within the comparision_data.
Parameters:
    input_text (str): The text to search for similar entries.
    comparision_data (list, optional): The list of embeddings to compare against. Defaults to embedings_list.
    desired_doc_count (int, optional): The number of most similar documents to return. Defaults to 1.
Returns:
    str: The concatenated text of the most similar entries.
"""
embedings_list = json.load(open(oputput_file,'r'))

embedings_list = [{
    "text": entry["text"],
    "embeddings": np.array(entry["embeddings"])
} for entry in embedings_list]

def search_for_info(input_text,comparision_data=embedings_list,desired_doc_count=1):
    most_similar = find_most_similar(input_text, comparision_data, desired_doc_count)
    if not most_similar:  # Si la lista está vacía
        return None
    most_similar_text = " ".join([entry[0] for entry in most_similar])
    return most_similar_text

probamos las tools

In [None]:
"""
This script contains functions to search for information and data.

Functions:
    search_for_info(input_text: str) -> str:
        Searches for information based on the input text and returns the result.
        Example usage:

    search_for_data_in_bing(query: str) -> None:
        Searches for data on Bing based on the query and prints the result.
        Example usage:
"""
print(search_for_info(input_text="¿Valor nutricional de un plátano?"))
print(search_for_info(input_text="¿Tipos de variables en Python?"))
search_for_data_in_bing("¿Quién ganó la eurocopa en 2024?")

Registro de tools para llamada a AzureOpenAI

In [54]:
"""
Registers a custom search tool with Azure OpenAI and returns an agent.
The custom search tool first searches for information using embeddings and, if no relevant information is found, 
it falls back to searching for data in Bing. The relevance of the information is determined by calculating the 
cosine similarity between the query and the response.
Returns:
    AgentRunner: An agent runner instance with the registered custom search tool.
Functions:
    custom_agent_worker(query: str) -> str:
        Executes the search tools in order and stops the search if relevant information is found.
        Args:
            query (str): The search query.
        Returns:
            str: The search result, either from embeddings or Bing.
    search_custom:
        A FunctionTool instance created from the custom_agent_worker function.
    agent_worker:
        A FunctionCallingAgentWorker instance created from the search_custom tool and Azure OpenAI client.
    agent:
        An AgentRunner instance created from the agent_worker.
"""
def register_search_tools_with_azure_openai():
    def custom_agent_worker(query: str):
        """Ejecuta las herramientas en orden y detiene la búsqueda si encuentra información."""
        response = search_for_info(query) 
        # Verifica si la respuesta es válida (no None, no cadena vacía)
        if response and isinstance(response, str) and response.strip():
            # Compara la similitud entre la query y la respuesta encontrada
            similarity_score = cosine_similarity(calculate_embeddings(query), calculate_embeddings(response))

            # Si la similitud es baja (por ejemplo, < 0.4), se considera irrelevante y se devuelve None
            print(similarity_score)
            if similarity_score >= 0.4:
                return response
            else:
                print(f"Respuesta irrelevante, similitud: {similarity_score}. Devolviendo None.")
                return search_for_data_in_bing(query)

        return search_for_data_in_bing(query)
    
    # Convertimos la función en una herramienta compatible con el agente
    search_custom = FunctionTool.from_defaults(
        fn=custom_agent_worker, 
        description="Busca información en embeddings y, si no encuentra, en Bing."
    )

    agent_worker = FunctionCallingAgentWorker.from_tools(
        [search_custom],  # Solo se registra la herramienta que coordina ambas búsquedas
        llm=azure_openai_client,
        verbose=True
    )

    agent = AgentRunner(agent_worker)
    return agent

# Registrar el agente
agent = register_search_tools_with_azure_openai()

Llamada a API con tools

In [None]:
"""
This script queries an agent to find out who won the Euro Cup in 2024 and prints the response.
Functions:
    agent.query(question: str) -> str: Sends a query to the agent and returns the response.
Variables:
    response (str): The response from the agent to the query "Quién ganó la eurocopa en 2024?".
Usage:
    The script sends a query to the agent asking who won the Euro Cup in 2024 and prints the response.
"""
response = agent.query("Quién ganó la eurocopa en 2024?")

print(f"La respuesta es: {response}")