# Modelo GAN

Este entrenamiento está basado en los modelos T5 y BERT, y los datos de entrenamiento provienen del Corpus ClearSim.

El token de Hugging Face está omitido por motivos de seguridad.

## Dependencias y librerías

In [None]:
# Instalar las dependencias
!pip install transformers torch datasets evaluate sacrebleu rouge-score nltk textstat huggingface_hub --quiet

In [None]:
# Importar las librerías
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import (
    AutoTokenizer, AutoModel,
    T5ForConditionalGeneration, T5Tokenizer,
    get_linear_schedule_with_warmup
)
import nltk
import textstat
from rouge_score import rouge_scorer
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from google.colab import files
import warnings
warnings.filterwarnings('ignore')

# Descargar recursos de NLTK
nltk.download('punkt')
nltk.download('stopwords')

# Subir el archivo de entrenamiento en formato JSON
uploaded = files.upload()

## Clases secundarias del modelo GAN

In [None]:
# Clase para el dataset
class TextSimplificationDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=2048):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        original = item['TXT']
        simplified = item['FAC']

        original_tokens = self.tokenizer(
            original,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # Tokenizar texto simplificado
        simplified_tokens = self.tokenizer(
            simplified,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'original_input_ids': original_tokens['input_ids'].squeeze(),
            'original_attention_mask': original_tokens['attention_mask'].squeeze(),
            'simplified_input_ids': simplified_tokens['input_ids'].squeeze(),
            'simplified_attention_mask': simplified_tokens['attention_mask'].squeeze(),
            'original_text': original,
            'simplified_text': simplified
        }


# Clase del modelo Generador basado en T5
class Generator(nn.Module):
    def __init__(self, model_name='t5-small'):
        super(Generator, self).__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask, decoder_input_ids=None, labels=None):
        if labels is not None:
            outputs = self.model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                decoder_input_ids=decoder_input_ids,
                labels=labels
            )
            return outputs
        else:
            outputs = self.model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=2048,
                num_beams=4,
                early_stopping=True,
                do_sample=True,
                temperature=0.7
            )
            return outputs

    def generar_facilitado(self, original_text):
        prompt = f"simplify: {original_text}"
        inputs = self.tokenizer(
            prompt,
            max_length=2048,
            padding=True,
            truncation=True,
            return_tensors='pt'
        )
        inputs = {k: v.to(next(self.model.parameters()).device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = self.model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=2048,
                num_beams=4,
                early_stopping=True,
                do_sample=True,
                temperature=0.7
            )
        simplified = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return simplified


# Clase del modelo Discriminador basado en BERT
class Discriminator(nn.Module):
    def __init__(self, model_name='bert-base-uncased'):
        super(Discriminator, self).__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return self.sigmoid(logits)

## Funciones auxiliares de manejo de datos

In [None]:
# Funcion para segmentar textos largos
def segmentar_textos(text, max_length=400, overlap=50):
    import re

    sentences = re.split(r'[.!?]+', text)
    segments = []
    current_segment = ""

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        if len(current_segment.split()) + len(sentence.split()) > max_length:
            if current_segment:
                segments.append(current_segment.strip())
                words = current_segment.split()
                current_segment = " ".join(words[-overlap:]) + " " + sentence
            else:
                words = sentence.split()
                for i in range(0, len(words), max_length - overlap):
                    chunk = " ".join(words[i:i + max_length])
                    segments.append(chunk)
                current_segment = ""
        else:
            current_segment += " " + sentence if current_segment else sentence

    if current_segment:
        segments.append(current_segment.strip())

    return segments


# Función para analizar longitudes de textos
def analizar_lengths(data, tokenizer):
    original_lengths = []
    simplified_lengths = []

    for item in data:
        orig_tokens = tokenizer.encode(item['TXT'])
        simp_tokens = tokenizer.encode(item['FAC'])
        original_lengths.append(len(orig_tokens))
        simplified_lengths.append(len(simp_tokens))

    return {
        'original_lengths': original_lengths,
        'simplified_lengths': simplified_lengths,
        'max_original': max(original_lengths),
        'max_simplified': max(simplified_lengths)
    }


# Función para preprocesar textos largos
def adaptar_textos(data, max_length=400):
    processed_data = []

    for item in data:
        original_segments = segmentar_textos(item['TXT'], max_length)
        simplified_segments = segmentar_textos(item['FAC'], max_length)

        if len(original_segments) == len(simplified_segments):
            for orig_seg, simp_seg in zip(original_segments, simplified_segments):
                processed_data.append({
                    'URL': item['URL'],
                    'TXT': orig_seg,
                    'FAC': simp_seg
                })
        else:
            processed_data.append({
                'URL': item['URL'],
                'TXT': original_segments[0],
                'FAC': simplified_segments[0]
            })

    return processed_data


# Función para cargar y dividir los datos
def dividir_corpus(json_file_path, test_size=0.1, random_state=42, segment_long_texts=True, max_length=400):
    with open(json_file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    print(f"Dataset cargado: {len(data)} muestras")

    # Análisis preliminar de longitudes
    tokenizer = T5Tokenizer.from_pretrained('t5-small')
    length_stats = analizar_lengths(data, tokenizer)

    if segment_long_texts and (length_stats['max_original'] > 2048 or length_stats['max_simplified'] > 2048):
        data = adaptar_textos(data, max_length)

    train_data, val_data = train_test_split(
        data,
        test_size=test_size,
        random_state=random_state,
        shuffle=True
    )

    print(f"Datos de entrenamiento: {len(train_data)} muestras")
    print(f"Datos de validación: {len(val_data)} muestras")
    return train_data, val_data

## Clase del GAN

In [None]:
class TextSimplificationGAN:
    def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.device = device
        print(f"Usando dispositivo: {self.device}")

        # Inicializar modelos
        self.generator = Generator().to(device)
        self.discriminator = Discriminator().to(device)

        # Tokenizers
        self.gen_tokenizer = T5Tokenizer.from_pretrained('t5-small')
        self.disc_tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

        # Optimizadores
        self.gen_optimizer = optim.AdamW(self.generator.parameters(), lr=2e-5)
        self.disc_optimizer = optim.AdamW(self.discriminator.parameters(), lr=2e-5)

        # Función de pérdida
        self.criterion = nn.BCELoss()
        self.mse_loss = nn.MSELoss()

        # Métricas
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

        # Historial de entrenamiento
        self.training_history = defaultdict(list)


    # Función para mover tensores al dispositivo correcto
    def to_device(self, tensor_dict):
        if isinstance(tensor_dict, dict):
            return {k: v.to(self.device) if isinstance(v, torch.Tensor) else v for k, v in tensor_dict.items()}
        elif isinstance(tensor_dict, torch.Tensor):
            return tensor_dict.to(self.device)
        else:
            return tensor_dict


    # Función para preparar la entrada del discriminador
    def preparar_salida(self, original_texts, simplified_texts, is_real=True):
        combined_texts = []
        for orig, simp in zip(original_texts, simplified_texts):
            combined = f"[CLS] {orig} [SEP] {simp} [SEP]"
            combined_texts.append(combined)

        tokens = self.disc_tokenizer(
            combined_texts,
            max_length=512,
            padding=True,
            truncation=True,
            return_tensors='pt'
        )

        labels = torch.ones(len(combined_texts), 1) if is_real else torch.zeros(len(combined_texts), 1)
        return tokens['input_ids'].to(self.device), tokens['attention_mask'].to(self.device), labels.to(self.device)


    # Función para calcular la métrica Flesch Reading Ease
    def calcular_FRE(self, text):
        try:
            return textstat.flesch_reading_ease(text)
        except:
            return 50.0


    # Funcion para calcular ROUGE-L
    def calcular_rouge(self, text1, text2):
        scores = self.rouge_scorer.score(text1, text2)
        return scores['rougeL'].fmeasure


    # Función para realizar un "step" completo de entrenamiento
    def train_step(self, batch):
        batch_size = batch['original_input_ids'].size(0)
        original_texts = batch['original_text']
        real_simplified_texts = batch['simplified_text']

        # Entrenamiento del Discriminador
        self.disc_optimizer.zero_grad()

        real_input_ids, real_attention_mask, real_labels = self.preparar_salida(
            original_texts, real_simplified_texts, is_real=True
        )
        real_output = self.discriminator(real_input_ids, real_attention_mask)
        real_loss = self.criterion(real_output, real_labels)

        fake_simplified_texts = []
        for orig_text in original_texts:
            fake_simplified = self.generator.generar_facilitado(orig_text)
            fake_simplified_texts.append(fake_simplified)

        fake_input_ids, fake_attention_mask, fake_labels = self.preparar_salida(
            original_texts, fake_simplified_texts, is_real=False
        )
        fake_output = self.discriminator(fake_input_ids, fake_attention_mask)
        fake_loss = self.criterion(fake_output, fake_labels)

        disc_loss = (real_loss + fake_loss) / 2
        disc_loss.backward()
        self.disc_optimizer.step()

        # Entrenamiento del Generador
        self.gen_optimizer.zero_grad()

        prompt_inputs = []
        for orig_text in original_texts:
            prompt_inputs.append(f"simplify: {orig_text}")

        tokenized_inputs = self.gen_tokenizer(
            prompt_inputs,
            max_length=2048,
            padding=True,
            truncation=True,
            return_tensors='pt'
        )

        tokenized_inputs = {k: v.to(self.device) for k, v in tokenized_inputs.items()}

        tokenized_targets = self.gen_tokenizer(
            real_simplified_texts,
            max_length=2048,
            padding=True,
            truncation=True,
            return_tensors='pt'
        )

        tokenized_targets = {k: v.to(self.device) for k, v in tokenized_targets.items()}

        gen_outputs = self.generator(
            input_ids=tokenized_inputs['input_ids'],
            attention_mask=tokenized_inputs['attention_mask'],
            labels=tokenized_targets['input_ids']
        )
        supervised_loss = gen_outputs.loss

        fake_input_ids, fake_attention_mask, _ = self.preparar_salida(
            original_texts, fake_simplified_texts, is_real=True
        )
        fake_discriminator_output = self.discriminator(fake_input_ids, fake_attention_mask)
        adversarial_loss = self.criterion(fake_discriminator_output, torch.ones_like(fake_discriminator_output))

        readability_scores = []
        for text in fake_simplified_texts:
            score = self.calcular_FRE(text)
            readability_scores.append(score)
        readability_tensor = torch.tensor(readability_scores, dtype=torch.float32, device=self.device)
        readability_loss = -torch.mean(readability_tensor) / 100.0

        gen_loss = supervised_loss + 0.1 * adversarial_loss + 0.05 * readability_loss
        gen_loss.backward()
        self.gen_optimizer.step()

        # Se devuelven todas las pérdidas calculadas
        return {
            'disc_loss': disc_loss.item(),
            'gen_loss': gen_loss.item(),
            'supervised_loss': supervised_loss.item(),
            'adversarial_loss': adversarial_loss.item(),
            'readability_loss': readability_loss.item()
        }


    # Función para evaluar el modelo
    def evaluate(self, dataloader):
        self.generator.eval()
        self.discriminator.eval()

        total_rouge1, total_rouge2, total_rougeL = 0, 0, 0
        total_readability_improvement = 0
        total_semantic_similarity = 0
        num_samples = 0

        with torch.no_grad():
            for batch in tqdm(dataloader, desc="Evaluando"):
                original_texts = batch['original_text']
                reference_texts = batch['simplified_text']

                for orig_text, ref_text in zip(original_texts, reference_texts):
                    generated_text = self.generator.generar_facilitado(orig_text)

                    # ROUGE
                    rouge_scores = self.rouge_scorer.score(ref_text, generated_text)
                    total_rouge1 += rouge_scores['rouge1'].fmeasure
                    total_rouge2 += rouge_scores['rouge2'].fmeasure
                    total_rougeL += rouge_scores['rougeL'].fmeasure

                    # Mejora en legibilidad
                    orig_readability = self.calcular_FRE(orig_text)
                    gen_readability = self.calcular_FRE(generated_text)
                    total_readability_improvement += (gen_readability - orig_readability)

                    # Similitud semántica
                    total_semantic_similarity += self.calcular_rouge(orig_text, generated_text)

                    num_samples += 1

        # Se guardan las métricas
        metrics = {
            'rouge1': total_rouge1 / num_samples,
            'rouge2': total_rouge2 / num_samples,
            'rougeL': total_rougeL / num_samples,
            'readability_improvement': total_readability_improvement / num_samples,
            'semantic_similarity': total_semantic_similarity / num_samples
        }
        self.generator.train()
        self.discriminator.train()
        return metrics


    # Función para entrenar el GAN
    def train(self, train_dataloader, val_dataloader, epochs=10):
        print("Iniciando entrenamiento del GAN...")

        for epoch in range(epochs):
            epoch_losses = defaultdict(list)

            for batch_idx, batch in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{epochs}")):
                losses = self.train_step(batch)
                for key, value in losses.items():
                    epoch_losses[key].append(value)
                if batch_idx % 50 == 0:
                    print(f"Batch {batch_idx}: Gen Loss: {losses['gen_loss']:.4f}, Disc Loss: {losses['disc_loss']:.4f}")

            # Promediar pérdidas de la época
            avg_losses = {key: np.mean(values) for key, values in epoch_losses.items()}
            val_metrics = self.evaluate(val_dataloader)

            for key, value in avg_losses.items():
                self.training_history[key].append(value)

            for key, value in val_metrics.items():
                self.training_history[f'val_{key}'].append(value)

            # Imprimir resumen de la epoch
            print(f"\nEpoch {epoch+1} completada:")
            print(f"  Gen Loss: {avg_losses['gen_loss']:.4f}")
            print(f"  Disc Loss: {avg_losses['disc_loss']:.4f}")
            print(f"  Val ROUGE-L: {val_metrics['rougeL']:.4f}")
            print(f"  Val Readability Improvement: {val_metrics['readability_improvement']:.2f}")
            print(f"  Val Semantic Similarity: {val_metrics['semantic_similarity']:.4f}")
            print("-" * 30)


    # Función para guardar el modelo
    def save_model(self, path):
        torch.save({
            'generator_state_dict': self.generator.state_dict(),
            'discriminator_state_dict': self.discriminator.state_dict(),
            'gen_optimizer_state_dict': self.gen_optimizer.state_dict(),
            'disc_optimizer_state_dict': self.disc_optimizer.state_dict(),
            'training_history': dict(self.training_history)
        }, path)
        print(f"Modelo guardado en: {path}")


    # Función para cargar el modelo
    def load_model(self, path):
        checkpoint = torch.load(path, map_location=self.device, weights_only=False)
        self.generator.load_state_dict(checkpoint['generator_state_dict'])
        self.discriminator.load_state_dict(checkpoint['discriminator_state_dict'])
        self.gen_optimizer.load_state_dict(checkpoint['gen_optimizer_state_dict'])
        self.disc_optimizer.load_state_dict(checkpoint['disc_optimizer_state_dict'])
        self.training_history = defaultdict(list, checkpoint['training_history'])
        print(f"Modelo cargado desde: {path}")

## Entrenamiento

In [None]:
# Cargar y dividir datos
train_data, val_data = dividir_corpus(
    '500_facilitadas.json',
    test_size=0.2,
    segment_long_texts=True,
    max_length=400
)

# Crear dataloaders
tokenizer = T5Tokenizer.from_pretrained('t5-small')

train_dataset = TextSimplificationDataset(train_data, tokenizer, 1024)
val_dataset = TextSimplificationDataset(val_data, tokenizer, 1024)

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Entrenar el GAN
gan = TextSimplificationGAN()
gan.train(train_dataloader, val_dataloader, epochs=4)

# Guardar modelo
gan.save_model('e2r-gan.pth')

## Subir modelo a un nuevo repositorio Hugging Face

In [None]:
from huggingface_hub import create_repo, upload_file, whoami, login

# Crear repositorio
login("HF-TOKEN") # Aquí iría el token de Hugging Face
repo_name = "e2r-gan"
create_repo(repo_id=repo_name, private=False)

# Subir el modelo
user = whoami()["name"]
repo_id = f"{user}/{repo_name}"
upload_file(
    path_or_fileobj="e2r-gan.pth",
    path_in_repo="e2r_gan.pth",
    repo_id=repo_id,
    repo_type="model"
)

# Uso del modelo

Aquí tenemos un ejemplo completo de cómo utilizar el modelo para generar texto.

In [None]:
import torch
import torch.optim as optim
from huggingface_hub import hf_hub_download
from transformers import T5ForConditionalGeneration, T5Tokenizer
import torch.nn as nn



class Generator(nn.Module):
    def __init__(self, model_name='t5-small'):
        super(Generator, self).__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask, decoder_input_ids=None, labels=None):
        if labels is not None:
            outputs = self.model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                decoder_input_ids=decoder_input_ids,
                labels=labels
            )
            return outputs
        else:
            outputs = self.model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=2048,
                num_beams=4,
                early_stopping=True,
                do_sample=True,
                temperature=0.7
            )
            return outputs

    def generar_facilitado(self, original_text):
        prompt = f"simplify: {original_text}"
        inputs = self.tokenizer(
            prompt,
            max_length=2048,
            padding=True,
            truncation=True,
            return_tensors='pt'
        )
        inputs = {k: v.to(next(self.model.parameters()).device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = self.model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=2048,
                num_beams=4,
                early_stopping=True,
                do_sample=True,
                temperature=0.7
            )
        simplified = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return simplified



class TextSimplificationGAN:
    def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.device = device
        print(f"Usando dispositivo: {self.device}")

        self.generator = Generator().to(device)
        self.gen_tokenizer = T5Tokenizer.from_pretrained('t5-small')
        self.gen_optimizer = optim.AdamW(self.generator.parameters(), lr=2e-5)

    def to_device(self, tensor_dict):
        if isinstance(tensor_dict, dict):
            return {k: v.to(self.device) if isinstance(v, torch.Tensor) else v for k, v in tensor_dict.items()}
        elif isinstance(tensor_dict, torch.Tensor):
            return tensor_dict.to(self.device)
        else:
            return tensor_dict

    def load_model(self, path):
        checkpoint = torch.load(path, map_location=self.device, weights_only=False)
        self.generator.load_state_dict(checkpoint['generator_state_dict'])
        self.gen_optimizer.load_state_dict(checkpoint['gen_optimizer_state_dict'])
        print(f"Modelo cargado desde: {path}")



def simplificar_con_gan(texto: str, hf_token: str | None = None) -> str:
    local_ckpt = hf_hub_download(repo_id="Nizaress/e2r-gan", filename="e2r_gan.pth")
    print(f"Checkpoint descargado en: {local_ckpt}")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    gan = TextSimplificationGAN(device=device)
    gan.load_model(local_ckpt)

    return gan.generator.generar_facilitado(texto)



texto = "Prueba del modelo."
resultado = simplificar_con_gan(texto)
print(resultado)