# Generacion de Datasets utilizando RLHF para fine tunning

## Procedimiento

- Recoleccion de Datos: Obtener Informacion mediante documentos de textos seleccionables (no escaneados), mediante web, redes sociales, scrapping,etc
- Extraer Texto: Usa PyMuPDF para extraer texto de PDFs no escaneados, procesando en paralelo con ProcessPoolExecutor para manejar grandes volúmenes (>100, >10,000).
- Limpiar Datos: Elimina espacios múltiples y caracteres no deseados con expresiones regulares, asegurando texto coherente.
- Validar: Verifica que los fragmentos extraídos no estén vacíos y tengan una longitud mínima (por ejemplo, 50 caracteres).
- Generar Conversaciones: Divide el texto en párrafos, usa un modelo como meta-llama/Llama-3.2-1B-Instruct para generar diálogos conversacionales específicos al contenido, con al menos 4 intercambios por conversación, en formato {"messages": [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}], "topic": "..."}.
- Estructurar Dataset: Guarda las conversaciones en archivos JSONL con dos columnas: "messages" y "topic".
Cargar y Subir: Carga el dataset con load_dataset("json", data_files="path/*.jsonl") y súbelo a un repositorio privado en Hugging Face Hub con push_to_hub("username/dataset_name", private=True).

In [1]:
%pip install --upgrade pymupdf nltk ollama openai


Collecting openai
  Downloading openai-1.99.3-py3-none-any.whl.metadata (29 kB)
Collecting distro<2,>=1.7.0 (from openai)
  Downloading distro-1.9.0-py3-none-any.whl.metadata (6.8 kB)
Collecting jiter<1,>=0.4.0 (from openai)
  Downloading jiter-0.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.2 kB)
Downloading openai-1.99.3-py3-none-any.whl (785 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m785.8/785.8 kB[0m [31m85.1 MB/s[0m  [33m0:00:00[0m
[?25hDownloading distro-1.9.0-py3-none-any.whl (20 kB)
Downloading jiter-0.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (352 kB)
Installing collected packages: jiter, distro, openai
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3/3[0m [openai]2m2/3[0m [openai]
[1A[2KSuccessfully installed distro-1.9.0 jiter-0.10.0 openai-1.99.3
Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch

torch.cuda.is_available()

  cpu = _conversion_method_template(device=torch.device("cpu"))


True

In [5]:
import os

OLLAMA_MODEL = "llama3.1:8b"
TEMPERATURE = 0.7  # Equilibrio entre creatividad y coherencia
TOP_P = 0.9  # Filtrado de núcleo para diversidad
MIN_CONVERSATION_LENGTH = 3 

# Constantes
MIN_FRAGMENT_LENGTH = 500
MAX_FRAGMENT_LENGTH = 2000
REPEAT_THRESHOLD = 0.3
MIN_BLOCK_LENGTH = 30  # Reducido de 50 a 30 para incluir bloques más cortos

### Recoleccion de Datos

In [6]:
from pathlib import Path

# Verificar si la carpeta existe

def get_files():
    folder_url = "./docs"
    folder = Path(folder_url)


    if not folder.exists() or not folder.is_dir():
        print("Invalid Folder")
        
    # Obtener todos los archivos de la carpeta
    files = [f for f in folder.rglob("*") if f.is_file()]

    return files

### Extraccion de Texto y Limpieza de Datos

In [7]:
import pymupdf
import re
import os
import hashlib
from collections import Counter



def clean_text(text, repeated_blocks=None):
    """Limpia el texto, eliminando caracteres repetitivos, texto redundante y caracteres no deseados."""
    # Eliminar caracteres de control no deseados (excepto \n)
    text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
    # Eliminar patrones repetitivos como ----, ...., ****
    text = re.sub(r'([^\w\s])\1{2,}|\s*[.]{3,}\s*', '', text)
    # Normalizar espacios múltiples
    text = re.sub(r'[ \t]+', ' ', text)
    # Eliminar espacios al inicio y final de cada línea
    lines = [line.strip() for line in text.splitlines() if line.strip()]
    # Filtrar líneas duplicadas, números, correos y contenido administrativo
    seen_lines = set()
    unique_lines = []
    for line in lines:
        if line not in seen_lines and \
           not re.match(r'^\d+$', line) and \
           not re.match(r'.*@(.*\.)+.*', line) and \
           not re.match(r'^(Tel|Fax|E-mail|www\.).*', line, re.IGNORECASE) and \
           (repeated_blocks is None or line not in repeated_blocks):
            unique_lines.append(line)
            seen_lines.add(line)
    return '\n'.join(unique_lines)

def extract_text_from_pdf(pdf_path):
    """
    Extrae texto de un PDF en una sola pasada, preservando el texto del inicio de la página,
    eliminando encabezados, pies de página y contenido irrelevante,
    dividiendo en fragmentos de 500 a 2000 caracteres con metadata corregida.
    """
    try:
        doc = pymupdf.open(pdf_path)
        total_pages = len(doc)
        chunks = []
        current_chunk = []  # Lista de (párrafo, página)
        current_chunk_length = 0
        filename = os.path.basename(pdf_path)
        block_counter = Counter()

        for page_number in range(1, total_pages + 1):
            page = doc[page_number - 1]
            page_height = page.rect.height
            blocks = page.get_text("blocks")
            page_text = []

            # Procesar bloques y filtrar encabezados/pies de página
            for block in blocks:
                text = block[4]
                y0, y1 = block[1], block[3]
                # Excluir pies de página (parte inferior de la página)
                if y1 > 0.95 * page_height:  # Relajado de 0.95 a 0.9
                    continue
                # Excluir encabezados solo si son repetitivos
                if y0 < 0.05 * page_height:  # Relajado de 0.05 a 0.1
                    block_counter[text] += 1
                    if block_counter[text] > total_pages * REPEAT_THRESHOLD:
                        continue
                if text and len(text) >= MIN_BLOCK_LENGTH:
                    block_counter[text] += 1
                    page_text.append((text, page_number))

            # Si no hay texto válido en la página, continuar
            if not page_text:
                continue

            # Acumular párrafos con su número de página
            for paragraph, page in page_text:
                current_chunk.append((paragraph, page))
                current_chunk_length += len(paragraph) + 2  # +2 por "\n\n"

                # Si el fragmento alcanza la longitud mínima, procesarlo
                if current_chunk_length >= MIN_FRAGMENT_LENGTH:
                    chunk_text = "\n\n".join(p for p, _ in current_chunk)
                    cleaned_chunk = clean_text(chunk_text, None)
                    cleaned_length = len(cleaned_chunk)

                    # Obtener el rango de páginas del fragmento
                    page_numbers = sorted(set(page for _, page in current_chunk))
                    start_page = page_numbers[0] if page_numbers else page_number
                    end_page = page_numbers[-1] if page_numbers else page_number

                    # Dividir fragmentos largos
                    while cleaned_length > MAX_FRAGMENT_LENGTH:
                        sub_chunk = cleaned_chunk[:MAX_FRAGMENT_LENGTH]
                        last_paragraph_end = sub_chunk.rfind("\n\n")
                        if last_paragraph_end == -1:
                            last_paragraph_end = MAX_FRAGMENT_LENGTH
                        chunk_to_add = cleaned_chunk[:last_paragraph_end].strip()

                        # Calcular el número de páginas para el subfragmento
                        chars_so_far = 0
                        sub_chunk_pages = []
                        for paragraph, page in current_chunk:
                            chars_so_far += len(paragraph) + 2
                            if chars_so_far <= last_paragraph_end:
                                sub_chunk_pages.append(page)
                            else:
                                break
                        sub_start_page = min(sub_chunk_pages) if sub_chunk_pages else page_number
                        sub_end_page = max(sub_chunk_pages) if sub_chunk_pages else page_number

                        metadata = (
                            f"# FILENAME: {filename} | CHARACTERS: {len(chunk_to_add)} | "
                            f"PAGES: {sub_start_page}-{sub_end_page}/{total_pages}\n\n"
                        )
                        chunks.append((metadata + chunk_to_add, hashlib.md5(chunk_to_add.encode()).hexdigest()))
                        cleaned_chunk = cleaned_chunk[last_paragraph_end:].strip()
                        cleaned_length = len(cleaned_chunk)

                        # Actualizar current_chunk para los párrafos restantes
                        remaining_chunk = []
                        chars_so_far = 0
                        for paragraph, page in current_chunk:
                            chars_so_far += len(paragraph) + 2
                            if chars_so_far > last_paragraph_end:
                                remaining_chunk.append((paragraph, page))
                        current_chunk = remaining_chunk
                        current_chunk_length = cleaned_length
                        page_numbers = sorted(set(page for _, page in current_chunk))
                        start_page = page_numbers[0] if page_numbers else page_number

                    # Añadir el fragmento completo
                    if cleaned_length >= MIN_FRAGMENT_LENGTH:
                        metadata = (
                            f"# FILENAME: {filename} | CHARACTERS: {cleaned_length} | "
                            f"PAGES: {start_page}-{end_page}/{total_pages}\n\n"
                        )
                        chunks.append((metadata + cleaned_chunk, hashlib.md5(cleaned_chunk.encode()).hexdigest()))
                        current_chunk = []
                        current_chunk_length = 0

                    else:
                        current_chunk = [(cleaned_chunk, page_numbers[-1])] if page_numbers else []
                        current_chunk_length = cleaned_length

        # Añadir el fragmento final si cumple con la longitud mínima
        if current_chunk and current_chunk_length >= MIN_FRAGMENT_LENGTH:
            chunk_text = "\n\n".join(p for p, _ in current_chunk)
            cleaned_chunk = clean_text(chunk_text, None)
            cleaned_length = len(cleaned_chunk)
            if cleaned_length >= MIN_FRAGMENT_LENGTH:
                page_numbers = sorted(set(page for _, page in current_chunk))
                start_page = page_numbers[0] if page_numbers else total_pages
                end_page = page_numbers[-1] if page_numbers else total_pages
                metadata = (
                    f"# FILENAME: {filename} | CHARACTERS: {cleaned_length} | "
                    f"PAGES: {start_page}-{end_page}/{total_pages}\n\n"
                )
                chunks.append((metadata + cleaned_chunk, hashlib.md5(cleaned_chunk.encode()).hexdigest()))

        doc.close()

        # Filtrar bloques repetitivos y duplicados
        repeated_blocks = {text for text, count in block_counter.items() if count > total_pages * REPEAT_THRESHOLD}
        final_chunks = []
        seen_hashes = set()

        for chunk, chunk_hash in chunks:
            chunk_text = '\n'.join(line for line in chunk.splitlines() if not line.startswith('#'))
            cleaned_chunk = clean_text(chunk_text, repeated_blocks)
            if len(cleaned_chunk) >= MIN_FRAGMENT_LENGTH and chunk_hash not in seen_hashes:
                # Actualizar la longitud en la metadata después de la limpieza final
                metadata_lines = chunk.splitlines()[0]
                metadata = re.sub(r'CHARACTERS: \d+', f'CHARACTERS: {len(cleaned_chunk)}', metadata_lines)
                final_chunks.append(f"{metadata}\n\n{cleaned_chunk}")
                seen_hashes.add(chunk_hash)

        return final_chunks if final_chunks else None
    except Exception as e:
        print(f"Error procesando {pdf_path}: {e}")
        return None

### Generar Conversaciones

In [9]:
from pydantic import BaseModel, Field
from typing import List
from dotenv import load_dotenv
load_dotenv()

class Message(BaseModel):
    role: str = Field(..., pattern="^(user|assistant)$")
    content: str

class Conversation(BaseModel):
    messages: List[Message] = Field(..., min_items=MIN_CONVERSATION_LENGTH)
    topic: str

In [20]:
def get_prompts(fragment: str):
  system_content = """Eres un generador de conversaciones optimizadas para entrenamiento supervisado (SFT) con técnicas avanzadas de prompt engineering. A partir de un fragmento de texto, construye una conversación natural, coherente y estructurada entre un usuario y un asistente para entrenar modelos como LLaMA 3.1 o GPT-4o-mini usando `SFTTrainer` de Hugging Face. El objetivo es mejorar la comprensión lectora, coherencia y respuestas naturales, incluso si el fragmento es incompleto o incoherente.

<instrucciones>

1. **Formato de salida**:
- Devuelve un JSON válido con:
  - `messages`: Lista de mensajes con estructura { "role": "user" | "assistant", "content": "..." }. **No incluir mensajes con rol "system" en la salida.**
  - `topic`: Cadena corta que resume el contenido (máx. 50 caracteres).

2. **Estructura de la conversación**:
- Genera 2 a 4 intercambios (4 a 8 mensajes) para mayor profundidad.
- Empieza con "user" y alterna roles.
- Mensajes largos, fluidos, humanos, evitando respuestas genéricas.
- Usa chain-of-thought en respuestas del asistente para desglosar razonamientos.
- Incorpora ejemplos variados como few-shot prompts.

3. **Usuario**:
- Evita frases como “según el texto”, “en el fragmento”, “de qué trata”.
- Mensajes deben incluir:
  - Preguntas inferenciales, técnicas o abiertas.
  - Hipótesis o afirmaciones intuitivas.
  - Contexto adicional para reflejar un escenario realista.
- Estilo conversacional, curioso, humano, variando tonos (curioso, preocupado, analítico).

4. **Asistente**:
- Responde con claridad, precisión y detalle, sin inventar información fuera del fragmento.
- Usa chain-of-thought: explica el razonamiento paso a paso.
- Integra contexto adicional del usuario, manteniendo coherencia.
- Si el fragmento es incoherente, incompleto o carece de contexto, genera una conversación coherente usando la información disponible, haciendo suposiciones razonables y explícalas en el razonamiento.
- Respuestas deben ser útiles, naturales y entrenables para SFT.

5. **Técnicas de Prompt Engineering**:
- **Few-shot learning**: Incluye 5 ejemplos variados (técnico, casual, contextual, 2 incoherentes) con fragmentos explícitos.
- **Chain-of-thought**: Desglosa el razonamiento en pasos claros.
- **Diversidad**: Varía tono, complejidad y tipo de preguntas.
- **Robustez**: Maneja fragmentos ambiguos o incoherentes con claridad.
- **Coherencia mejorada**: Valida que las respuestas sean consistentes con el fragmento y el contexto del usuario, evitando contradicciones o respuestas irrelevantes.

<ejemplos>

**Ejemplo 1 — Técnico (dispersión de luz)**:
<fragment>
La dispersión de Rayleigh explica por qué el cielo aparece azul durante el día. Las partículas en la atmósfera dispersan preferentemente las longitudes de onda cortas, como el azul, debido a su interacción con la luz solar. Durante el atardecer, el cielo adquiere tonos rojos porque la luz atraviesa más atmósfera, dispersando el azul y dejando predominar las longitudes de onda largas.
</fragment>
<response>
{
  "messages": [
    {
      "role": "user",
      "content": "Me intriga por qué el cielo es azul de día y rojo al atardecer. ¿Qué pasa con la luz para que cambie así?"
    },
    {
      "role": "assistant",
      "content": "El color del cielo se debe a la dispersión de Rayleigh. Durante el día, las partículas de la atmósfera dispersan más las longitudes de onda cortas, como el azul, porque su dispersión es inversamente proporcional a la cuarta potencia de la longitud de onda. Esto hace que el azul se esparza por todo el cielo. Al atardecer, la luz solar atraviesa más atmósfera, dispersando casi todo el azul, lo que permite que las longitudes de onda largas, como el rojo, dominen."
    }
  ],
  "topic": "Dispersión de luz y color del cielo"
}
</response>

**Ejemplo 2 — Casual (salud visual)**:
<fragment>
La exposición prolongada a la luz azul de pantallas puede interrumpir la producción de melatonina, afectando el ciclo de sueño. Esto ocurre porque la luz azul engaña al cerebro, haciéndole creer que es de día, lo que retrasa el inicio del sueño.
</fragment>
<response>
{
  "messages": [
    {
      "role": "user",
      "content": "Paso horas frente a mi laptop y no duermo bien. ¿Puede la luz de la pantalla estar afectándome?"
    },
    {
      "role": "assistant",
      "content": "Sí, la luz azul de las pantallas puede interferir con tu sueño. Inhibe la melatonina, la hormona que regula el ciclo de sueño, porque tu cerebro cree que es de día. Esto retrasa el sueño. Usa modo nocturno, evita pantallas antes de dormir y aplica la regla 20-20-20 para descansar la vista."
    },
    {
      "role": "user",
      "content": "¿Y a largo plazo? ¿Puedo dañar mis ojos?"
    },
    {
      "role": "assistant",
      "content": "A largo plazo, la luz azul no daña permanentemente los ojos, pero puede causar fatiga visual o sequedad. El impacto mayor es en el sueño: la falta crónica de melatonina puede llevar a insomnio o fatiga. Usa filtros de luz azul y consulta a un oftalmólogo si sientes molestias persistentes."
    }
  ],
  "topic": "Impacto de la luz azul en el sueño"
}
</response>

**Ejemplo 3 — Contextual (política monetaria)**:
<fragment>
Subir las tasas de interés es una herramienta del banco central para controlar la inflación. Al encarecer los préstamos, se reduce el consumo y la inversión, lo que desacelera la economía y disminuye la presión sobre los precios. Sin embargo, un aumento excesivo puede llevar a una recesión.
</fragment>
<response>
{
  "messages": [
    {
      "role": "user",
      "content": "El banco central sigue subiendo las tasas por la inflación. ¿Cómo afecta eso mi día a día?"
    },
    {
      "role": "assistant",
      "content": "Subir las tasas encarece los préstamos, como hipotecas o créditos, lo que reduce el consumo. Menos gasto significa menos presión sobre los precios, controlando la inflación. En tu día a día, podrías notar cuotas más altas en préstamos o menos compras grandes, como autos."
    },
    {
      "role": "user",
      "content": "¿Pero no puede eso causar una recesión si todos gastamos menos?"
    },
    {
      "role": "assistant",
      "content": "Exacto. Si las tasas suben demasiado, el consumo y la inversión caen tanto que las empresas podrían reducir producción o empleos, llevando a una recesión. Los bancos centrales ajustan las tasas con cuidado, monitoreando empleo y consumo para evitarlo."
    }
  ],
  "topic": "Tasas de interés e inflación"
}
</response>

**Ejemplo 4 — Incoherente (fragmento sin contexto claro)**:
<fragment>
...error crítico en el núcleo... datos perdidos...
</fragment>
<response>
{
  "messages": [
    {
      "role": "user",
      "content": "Mi computadora mostró un error crítico y perdí mis archivos. ¿Qué pasó?"
    },
    {
      "role": "assistant",
      "content": "Un error crítico en el núcleo suele ser un fallo grave en el sistema operativo. Podría deberse a un problema de hardware, como un disco duro dañado, o a un software corrupto. Los datos perdidos sugieren que el sistema no pudo guardar correctamente. Intenta usar software de recuperación como TestDisk, pero evita escribir en el disco. Un técnico podría ayudar si no funciona."
    },
    {
      "role": "user",
      "content": "¿Puedo evitar que pase de nuevo?"
    },
    {
      "role": "assistant",
      "content": "Para prevenirlo, haz backups regulares, actualiza el sistema y usa un antivirus. Si sospechas de hardware, revisa el disco con herramientas como SMART. Un diagnóstico profesional puede identificar la causa exacta."
    }
  ],
  "topic": "Error crítico en sistema operativo"
}
</response>

**Ejemplo 5 — Incoherente (fragmento ambiguo)**:
<fragment>
...fluctuaciones de energía... inestabilidad en el sistema...
</fragment>
<response>
{
  "messages": [
    {
      "role": "user",
      "content": "He notado que mis luces parpadean a veces. Leí algo sobre fluctuaciones de energía. ¿Qué significa eso?"
    },
    {
      "role": "assistant",
      "content": "Las fluctuaciones de energía son variaciones en el suministro eléctrico, como picos o caídas de voltaje, que causan inestabilidad. Esto puede hacer que las luces parpadeen o que los dispositivos fallen. Podría ser por cableado defectuoso o sobrecarga en la red. Un regulador de voltaje o un UPS puede estabilizar el suministro."
    },
    {
      "role": "user",
      "content": "¿Es costoso arreglar esto?"
    },
    {
      "role": "assistant",
      "content": "Depende. Un regulador de voltaje cuesta entre $20 y $100. Primero, revisa enchufes y evita sobrecargar tomacorrientes. Si persiste, un electricista puede verificar el cableado, lo que podría costar más según el problema."
    }
  ],
  "topic": "Fluctuaciones de energía eléctrica"
}
</response>

</ejemplos>
"""
  return [
        {
            "role": "system",
            "content": system_content
        },
        {
            "role": "user",
            "content": fragment
        }
    ]

In [21]:
def validate_genterated_conv(conversation:Conversation):
  # Validar que la conversación tenga al menos un intercambio completo
  if len(conversation.messages) < 2:
      print(f"Error: Conversación inválida, número de mensajes insuficiente: {len(conversation.messages)}")
      return None
  # Si el número de mensajes es impar, eliminar el último (asumiendo que es del usuario)
  if len(conversation.messages) % 2 != 0 and len(conversation.messages) > 2:
      conversation.messages = conversation.messages[:-1]
      
  return conversation

In [22]:
from ollama import AsyncClient

ollama_client = AsyncClient()

async def generate_ollama_conversations(messages):
    try:
        # Llama a Ollama con roles system y user
        response = await ollama_client.chat(
            model=OLLAMA_MODEL,
            messages=messages,
            options={
                "temperature": TEMPERATURE,
                "top_p": TOP_P,
                
                
            },
            format=Conversation.model_json_schema()  # Especifica el esquema JSON
            
        )
        conversation = Conversation.model_validate_json(response.message.content)

        return validate_genterated_conv(conversation)
    except Exception as e:
        print(f"Error generando conversación con Ollama: {e}")
        return None


In [23]:
from openai import AsyncClient

openai_client = AsyncClient(
    api_key=os.environ.get("OPENAI_API_KEY"),
)


async def generate_openai_conversations(messages):
    try:
        response = await openai_client.responses.parse(
      model="gpt-4o-mini",
      input=messages,
      text_format=Conversation  
        )
        conversation = response.output_parsed
        
        if conversation:
            return validate_genterated_conv(conversation)
        
    except Exception as e:
        print(f"Error al generar conversacion con OpenAI: {e}")
        return None

In [24]:
from typing import Literal
async def generate_conversation(fragment:str, provider: Literal["ollama", "openai"]):
    """
    Genera una conversación estructurada en formato JSON usando Ollama, basada en un fragmento de texto.
    Optimizado para múltiples iteraciones en la generación de datasets para fine-tuning.
    """
    if not fragment or len(fragment) < MIN_FRAGMENT_LENGTH:
        print(f"Error: Fragmento demasiado corto ({len(fragment)} caracteres).")
        return None

    # Extraer el texto sin la metadata (líneas que comienzan con '#')
    fragment_content = '\n'.join(line for line in fragment.splitlines() if not line.startswith('#')).strip()
    if len(fragment_content) < MIN_FRAGMENT_LENGTH:
        print(f"Error: Contenido útil del fragmento demasiado corto ({len(fragment_content)} caracteres).")
        return None

    # Prompt optimizado para múltiples iteraciones
    messages = get_prompts(fragment_content)

    try:
        if provider=='ollama':
            return await generate_ollama_conversations(messages)
        elif provider=='openai':
            return await generate_openai_conversations(messages)
        else: return None
    except Exception as e:
        print(f"Error generando conversación con Ollama: {e}")
        return None

In [25]:
import logging

# Configuración del logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pdf_processing.log'),
        logging.StreamHandler()
    ]
)

In [26]:
import json
import os
from filelock import FileLock
import asyncio
from typing import Literal


OUTPUT_FOLDER = "data"

class MetadataManager:
    """Clase para manejar archivos de metadatos JSON."""
    def __init__(self, index: int, output_dir: str):
        self.folder = os.path.join(output_dir, "metadata")
        self.metadata_file = os.path.join(self.folder, f"metadata_{index:04d}.json")
        self.lock_file = f"{self.metadata_file}.lock"
        self.index = index
        os.makedirs(self.folder, exist_ok=True)

    def exists(self) -> bool:
        """Verifica si el archivo de metadatos existe."""
        return os.path.exists(self.metadata_file)

    def get(self, param: str):
        """Obtiene chunks, conversations, fileName, num_chunks, num_messages o num_exchanges desde el archivo de metadatos."""
        if not self.exists():
            return [] if param in ["chunks", "conversations"] else 0 if param in ["num_chunks", "num_messages", "num_exchanges"] else ""
        try:
            with FileLock(self.lock_file):
                with open(self.metadata_file, "r", encoding="utf-8") as f:
                    metadata = json.load(f)
                
                updated = False
                if param == "conversations":
                    conversations = metadata.get("conversations", [])
                    for i, conv in enumerate(conversations):
                        if isinstance(conv, dict) and "chunk_index" not in conv:
                            conv["chunk_index"] = i
                            updated = True
                    conversations.sort(key=lambda x: x.get("chunk_index", 0))
                    metadata["conversations"] = conversations
                
                if "num_chunks" not in metadata and "chunks" in metadata:
                    metadata["num_chunks"] = len(metadata.get("chunks", []))
                    updated = True
                
                if "num_exchanges" not in metadata and "conversations" in metadata:
                    metadata["num_exchanges"] = sum(len(conv["messages"]) // 2 for conv in metadata.get("conversations", []))
                    updated = True
                
                if "num_messages" not in metadata and "conversations" in metadata:
                    metadata["num_messages"] = sum(len(conv["messages"]) for conv in metadata.get("conversations", []))
                    updated = True
                    
                if "total_conversations" not in metadata and "conversations" in metadata:
                    metadata["total_conversations"] = len(conversations)
                    updated = True
                
                if updated:
                    with open(self.metadata_file, "w", encoding="utf-8") as f:
                        json.dump(metadata, f, ensure_ascii=False, indent=2)
                
                return metadata.get(param, []) if param in ["chunks", "conversations"] else metadata.get(param, 0 if param in ["num_chunks", "num_messages", "num_exchanges"] else metadata.get(param,""))
        except Exception as e:
            print(f"Error al leer {param} desde {self.metadata_file}: {e}")
            return [] if param in ["chunks", "conversations"] else 0 if param in ["num_chunks", "num_messages", "num_exchanges"] else ""

    def set(self, param: str, value):
        """Establece chunks, conversations, fileName, num_chunks, num_messages o num_exchanges en el archivo de metadatos."""
        try:
            with FileLock(self.lock_file):
                metadata = {"chunks": [], "fileName": "", "conversations": [], "num_chunks": 0, "num_messages": 0, "num_exchanges": 0}
                if self.exists():
                    with open(self.metadata_file, "r", encoding="utf-8") as f:
                        metadata = json.load(f)
                
                metadata[param] = value
                if param == "conversations":
                    metadata["conversations"].sort(key=lambda x: x.get("chunk_index", 0))
                    metadata["num_messages"] = sum(len(conv["messages"]) for conv in metadata["conversations"])
                    metadata["num_exchanges"] = sum(len(conv["messages"]) // 2 for conv in metadata["conversations"])
                    metadata["total_conversations"] = len(metadata["conversations"])
                if param == "chunks":
                    metadata["num_chunks"] = len(value)
                with open(self.metadata_file, "w", encoding="utf-8") as f:
                    json.dump(metadata, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"Error al escribir {param} en {self.metadata_file}: {e}")

    def append_conversation(self, conversation: dict):
        """Añade una conversación al array de conversaciones en el archivo de metadatos."""
        try:
            with FileLock(self.lock_file):
                metadata = {"chunks": [], "fileName": "", "conversations": [], "num_chunks": 0, "num_messages": 0, "num_exchanges": 0}
                if self.exists():
                    with open(self.metadata_file, "r", encoding="utf-8") as f:
                        metadata = json.load(f)
                
                for i, conv in enumerate(metadata["conversations"]):
                    if isinstance(conv, dict) and "chunk_index" not in conv:
                        conv["chunk_index"] = i
                
                if "num_chunks" not in metadata:
                    metadata["num_chunks"] = len(metadata.get("chunks", []))
                
                if "num_exchanges" not in metadata:
                    metadata["num_exchanges"] = sum(len(conv["messages"]) // 2 for conv in metadata.get("conversations", []))
                
                if "num_messages" not in metadata:
                    metadata["num_messages"] = sum(len(conv["messages"]) for conv in metadata.get("conversations", []))
                
                metadata["conversations"].append(conversation)
                metadata["conversations"].sort(key=lambda x: x.get("chunk_index", 0))
                metadata["num_messages"] = sum(len(conv["messages"]) for conv in metadata["conversations"])
                metadata["num_exchanges"] = sum(len(conv["messages"]) // 2 for conv in metadata["conversations"])
                metadata["total_conversations"] = len(metadata["conversations"])
                
                with open(self.metadata_file, "w", encoding="utf-8") as f:
                    json.dump(metadata, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"Error al añadir conversación a {self.metadata_file}: {e}")

def get_text_from_pdf(index: int, pdf_path: str, output_dir: str) -> list:
    """Obtiene el texto de un PDF desde la caché (JSON) o lo extrae si no existe."""
    metadata_manager = MetadataManager(index, output_dir)
    
    if metadata_manager.exists():
        chunks = metadata_manager.get("chunks")
        file_name = metadata_manager.get("fileName")
        if chunks and file_name == os.path.basename(pdf_path):
            return chunks
    
    try:
        pages_text = extract_text_from_pdf(pdf_path)
        if pages_text:
            metadata_manager.set("chunks", pages_text)
            metadata_manager.set("fileName", os.path.basename(pdf_path))
            metadata_manager.set("conversations", [])
        return pages_text
    except Exception as e:
        print(f"Error al extraer texto del PDF {pdf_path}: {e}")
        return []
    
def get_conv_from_jsonl(index, output_dir):
    metadata_manager = MetadataManager(index, output_dir)
    jsonl_file = os.path.join(output_dir, OUTPUT_FOLDER, f"pdf_{index:04d}.jsonl")
    if os.path.exists(jsonl_file):
        try:
            chunks = metadata_manager.get("chunks")
            with open(jsonl_file, "r", encoding="utf-8") as f:
                conversations = []
                for i, line in enumerate(f):
                    if line.strip():
                        conv = json.loads(line)
                        conv["source_chunk"] = chunks[i]
                        conv["chunk_index"] = i
                        conversations.append(conv)
                conversations.sort(key=lambda x: x.get("chunk_index", 0))
                metadata_manager.set("conversations", conversations)
                metadata_manager.set("num_messages", sum(len(conv["messages"]) for conv in conversations))
                metadata_manager.set("num_exchanges", sum(len(conv["messages"]) // 2 for conv in conversations))
                metadata_manager.set("total_conversations", len(conversations))
                return conversations
        except Exception as e:
            print(f"Error al leer conversaciones desde {jsonl_file}: {e}")
    return []

async def get_conversation_from_chunk(index: int, output_dir: str, chunk: str, chunk_index: int, provider: Literal["ollama", "openai"]):
    """Obtiene o genera una conversación para un fragmento de texto."""
    metadata_manager = MetadataManager(index, output_dir)
    existing_conversations = metadata_manager.get("conversations")
    
    if not existing_conversations:
        existing_conversations = get_conv_from_jsonl(index, output_dir)
    
    for conv in existing_conversations:
        if isinstance(conv, dict) and conv.get("source_chunk") == chunk and conv.get("chunk_index") == chunk_index:
            return conv
    try:
        conversation = await generate_conversation(chunk, provider)
        if conversation:
            conv_dict = conversation.model_dump()
            conv_dict["source_chunk"] = chunk
            conv_dict["chunk_index"] = chunk_index
            metadata_manager.append_conversation(conv_dict)
            return conv_dict
        return None
    except Exception as e:
        print(f"Error al generar conversación para chunk_index {chunk_index} en PDF #{index} con {provider}: {e}")
        return None

async def get_conv(index: int, output_dir: str, chunk: str, chunk_index: int, provider: Literal["ollama", "openai"]):
    result = await get_conversation_from_chunk(index, output_dir, chunk, chunk_index, provider)
    return result if result is not None else []

async def process_pdf(index: int, pdf_path: str, output_dir: str):
    """Procesa un PDF, genera conversaciones y las guarda en JSONL."""
    logger = logging.getLogger()
    logger.info(f"[PDF {index}] Procesando PDF: {Path(pdf_path).name}")
    try:
        pages_text = get_text_from_pdf(index, pdf_path, output_dir)
        
        if not pages_text:
            logger.warning(f"[PDF {index}] No se encontraron fragmentos de texto")
            return
        
        data_dir = os.path.join(output_dir, OUTPUT_FOLDER)
        os.makedirs(data_dir, exist_ok=True)
        jsonl_file = os.path.join(data_dir, f"pdf_{index:04d}.jsonl")
        
        tasks = [(i, fragment) for i, fragment in enumerate(pages_text) if len(fragment) > 20]
        if not tasks:
            logger.warning(f"[PDF {index}] No hay fragmentos válidos para procesar")
            return
        
        logger.info(f"[PDF {index}] Procesando {len(tasks)} fragmentos")
        
        # Dividir tareas entre providers (50% openai, 50% ollama)
        total_tasks = len(tasks)
        half_tasks = total_tasks // 2
        openai_tasks = [get_conv(index, output_dir, fragment, i, "openai") for i, fragment in tasks[:half_tasks]]
        ollama_tasks = [get_conv(index, output_dir, fragment, i, "ollama") for i, fragment in tasks[half_tasks:]]
        
        # Ejecutar tareas en paralelo con límite de concurrencia
        from asyncio import Semaphore
        async def limited_gather(tasks, limit=10):
            semaphore = Semaphore(limit)
            async def sem_task(task):
                async with semaphore:
                    return await task
            return await asyncio.gather(*[sem_task(task) for task in tasks], return_exceptions=True)
        
        task_results = await limited_gather(openai_tasks + ollama_tasks)
        
        if task_results:
            valid_conversations = [conv for conv in task_results if conv is not None and conv is not TypeError]
            if valid_conversations:
                valid_conversations.sort(key=lambda x: x["chunk_index"])
                
                actual_indices = [conv["chunk_index"] for conv in valid_conversations]
                if actual_indices != sorted(actual_indices):
                    logger.warning(f"[PDF {index}] El orden de las conversaciones no coincide con el esperado")
                
                with open(jsonl_file, "w", encoding="utf-8") as f:
                    output_str = "\n".join(json.dumps({"messages": conv["messages"], "topic": conv["topic"]}, ensure_ascii=False) 
                                        for conv in valid_conversations)
                    f.write(output_str + "\n")
                    logger.info(f"[PDF {index}] Dataset conversacional generado y guardado en JSONL")
            else:
                logger.warning(f"[PDF {index}] No se generaron conversaciones")
    except Exception as e:
        logger.error(f"[PDF {index}] Error al procesar el PDF: {e}")

In [27]:
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
import asyncio
import os
from pathlib import Path
import math


def process_pdf_wrapper(args):
    index, pdf_path, output_dir = args
    logger = logging.getLogger()
    logger.info(f"[PDF {index}] Inicio del procesamiento del PDF: {Path(pdf_path).name}")
    try:
        asyncio.run(process_pdf(index, pdf_path, output_dir))
        logger.info(f"[PDF {index}] Procesamiento del PDF completado")
    except Exception as e:
        logger.error(f"[PDF {index}] Error durante el procesamiento: {str(e)}")
        raise

def generate(pdf_files,max_workers=12):
    output_dir = "outputs"
    output_folder_path = Path(output_dir)
    os.makedirs(output_folder_path, exist_ok=True)
    
    logger = logging.getLogger()
    logger.info(f"Generando Datasets - max_workers={max_workers}, # Files: {len(pdf_files)}")
    
    # Dividir archivos en lotes para optimizar el uso de memoria
    batch_size = max(1, math.ceil(len(pdf_files) / max_workers))
    batches = [pdf_files[i:i + batch_size] for i in range(0, len(pdf_files), batch_size)]
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        for batch_idx, batch in enumerate(batches):
            logger.info(f"Procesando lote {batch_idx + 1}/{len(batches)} con {len(batch)} archivos")
            tasks = [(i + batch_idx * batch_size, p, output_dir) for i, p in enumerate(batch)]
            for _ in tqdm(
                executor.map(process_pdf_wrapper, tasks),
                total=len(batch),
                desc=f"Batch {batch_idx + 1}/{len(batches)}"
            ):
                pass
            logger.info(f"Lote {batch_idx + 1}/{len(batches)} completado")
    logger.info("Generación de datasets finalizada")

In [28]:
files = get_files()
files

[PosixPath('docs/titulo cap_II CALIFICACION DE ACTIVOS DE RIESGO.pdf'),
 PosixPath('docs/cap_I DE LA GESTION Y ADMINISTRACION DE RIESGOS.pdf'),
 PosixPath('docs/BASILEA II CONVERGENCIA INTERNACINOAL bcbs128_es.pdf'),
 PosixPath('docs/9 Recomendaciones Especiales GAFI.pdf'),
 PosixPath('docs/cap_II ADMINISTRACION RIESGO DE CREDITO.pdf')]

In [None]:
await process_pdf(0,files[0],"outputs")

2025-08-07 18:13:34,222 - INFO - [PDF 0] Procesando PDF: titulo cap_II CALIFICACION DE ACTIVOS DE RIESGO.pdf
2025-08-07 18:13:34,241 - INFO - [PDF 0] Procesando 50 fragmentos
2025-08-07 18:13:45,065 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:13:54,947 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:03,979 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:09,122 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:19,645 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:25,563 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:31,132 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-07 18:14:39,422 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-08-0

In [None]:
%%time
generate(max_workers=15)

Generando Datasets
max_workers=15
# Files: 89


  0%|          | 0/89 [00:00<?, ?it/s]

# 9 | Procesando PDF: 'RESUMEN-EJECUTIVO.pdf'# 5 | Procesando PDF: 'Spanish-Lavado-de-Activos-Basado-en-el-Comercio-Indicadores-de-Riesgo.pdf.coredownload.inline.pdf'
# 6 | Procesando PDF: 'Spanish-Lavado-de-Activos-Basado-en-el-Comercio-Indicadores-de-Riesgo.pdf.coredownload.inline (1).pdf'# 9 | Generando Conversaciones# 2 | Procesando PDF: 'titulo cap_II CALIFICACION DE ACTIVOS DE RIESGO.pdf'

# 7 | Procesando PDF: 'RML BANCO CENTRAL_2.pdf'# 10 | Procesando PDF: 'Riesgo de Crédito.pdf'# 5 | Generando Conversaciones
# 12 | Procesando PDF: 'metodologia-SBR-lavado-activos-2.pdf'


# 6 | Generando Conversaciones
# 7 | Generando Conversaciones# 8 | Procesando PDF: 'RML BANCO CENTRAL.pdf'


# 14 | Procesando PDF: 'RECOMENDACIONES GAFI 49.pdf'# 2 | Generando Conversaciones# 4 | Procesando PDF: 'SUPERBANCOS L1_XIII_cap_IV.pdf'# 12 | Generando Conversaciones# 10 | Generando Conversaciones



# 11 | Procesando PDF: 'resol_JB-2011-1897.pdf'
# 14 | Generando Conversaciones


# 8 | Generando Conv

  1%|          | 1/89 [03:49<5:37:11, 229.90s/it]

# 45 | Procesando PDF: 'L1_XII_cap_II.pdf'
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
# 45 | Generando Conversaciones
Error generando conversación con Ollama: Event loop is closed
# 45 | Conversacion Generada
# 37 | Conversacion Generada
#37 | Pdf Procesado -  Generado Dataset Conversacional para PDF: 'L1_XI_cap_III.pdf'
# 46 | Procesando PDF: 'L1_XII_cap_III.pdf'
# 46 | Generando Conversaciones
Erro

### Saving

In [24]:
%pip install --upgrade datasets huggingface_hub ipywidgets pandas


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [25]:
from huggingface_hub import login
login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [26]:
from datasets import load_dataset


dataset = load_dataset("json", data_files="./outputs/data/*.jsonl")

dataset


AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

In [None]:
dataset.push_to_hub("jeanmcm/b_risks")

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0)                : |          |  0.00B /  0.00B            

New Data Upload                         : |          |  0.00B /  0.00B            

                                        : 100%|##########|  270kB /  270kB            

README.md:   0%|          | 0.00/376 [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/datasets/jeanmcm/b_risks/commit/13e6a6ad8f5d4eb1efc3bf36fe4e0b58c6e237fb', commit_message='Upload dataset', commit_description='', oid='13e6a6ad8f5d4eb1efc3bf36fe4e0b58c6e237fb', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/jeanmcm/b_risks', endpoint='https://huggingface.co', repo_type='dataset', repo_id='jeanmcm/b_risks'), pr_revision=None, pr_num=None)

# Testing RAG with Open WebUI 

In [None]:
import requests
import json
import sys
import time
from IPython.display import display, clear_output, HTML

url = "https://vwlppjjfa98c9x-8080.proxy.runpod.net"
api_key ="sk-05568562f28844fe997cadf960a346cd"

messages =  [{"role": "user", "content": "Que es el Riesgo Financiero?"}]

try:
    # Realizar la solicitud con stream=True
    with requests.post(f"{url}/api/chat/completions", stream=True,headers={
      "Content-Type": "application/json",
      "Authorization": f"Bearer {api_key}"
    },json={
      "model":"bosft-riesgos-rag-model",
      "messages":messages,
      "stream":True
      }) as response:
        response.raise_for_status()
        # Variable para almacenar la salida acumulada
        accumulated_output = ""

        # Iterar sobre las líneas de la respuesta
        for line in response.iter_lines():
            if line:
                # Decodificar la línea
                decoded_line = line.decode('utf-8').strip()
                # Si la línea comienza con "data:", extraer el contenido
                if decoded_line.startswith("data:"):
                    decoded_line = decoded_line[5:].strip()  # Quitar "data: "

                # Ignorar líneas vacías o marcadores de fin como "[DONE]"
                if not decoded_line or decoded_line == "[DONE]" :
                    continue
                
                
                try:
                    # Parsear si es JSON
                    data = json.loads(decoded_line)
                    if "choices" not in data: continue
                    
                    delta = data['choices'][0]['delta']
                    if "content" in delta: new_data = delta['content']
                except json.JSONDecodeError:
                    # Si no es JSON, usar la línea como texto
                    new_data = decoded_line

                # Acumular y mostrar la salida dinámicamente
                if new_data:
                    accumulated_output += new_data
                    # Limpiar la salida anterior y mostrar la nueva
                    clear_output(wait=True)
                    display(HTML(f"<b>Respuesta en streaming:</b> {accumulated_output}"))
                    time.sleep(0.1)  # Pequeña pausa para visibilidad
                    

except requests.exceptions.RequestException as e:
    print(f"\nError en la solicitud: {e}")

# Testing with Flowise