# Multichoice Question Answering



*   Elian Paniagua
*   Luciana Huertas
*   Sebastian Linares





In [1]:
!pip install tqdm -q

## Importación de librerías y carga de datos

In [None]:
import pandas as pd
import torch
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from transformers import RobertaTokenizer, RobertaForMultipleChoice
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
from tqdm import tqdm

# Cargamos el archivo CSV
file_path = 'train.csv'
df = pd.read_csv(file_path, sep='\t')

# Dividimos el conjunto de datos en entrenamiento y validación
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

# Inicializamos el tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-large')

## Definición del Dataset Personalizado

In [None]:
# Ajusta dinámicamente los datos por batch,
# necesario para crear tensores del mismo tamaño en cada batch.
def collate_fn(batch):
    max_len = max([item['input_ids'].size(1) for item in batch])

    input_ids = torch.stack([torch.cat([item['input_ids'], torch.full((item['input_ids'].size(0), max_len - item['input_ids'].size(1)), tokenizer.pad_token_id, dtype=torch.long)], dim=1) for item in batch])
    attention_mask = torch.stack([torch.cat([item['attention_mask'], torch.full((item['attention_mask'].size(0), max_len - item['attention_mask'].size(1)), 0, dtype=torch.long)], dim=1) for item in batch])
    labels = torch.tensor([item['labels'] for item in batch], dtype=torch.long)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'labels': labels
    }

# Define el dataset personalizado para preguntas de opción múltiple.
class CustomRaceAnsweringModel(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.label_mapping = {label: i for i, label in enumerate(["A", "B", "C", "D", "E"])}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        example = self.data.iloc[idx]
        context = example['text'] or ""
        question = example['question'] or ""
        options = [example['A'], example['B'], example['C'], example['D'], example['E']]
        reason = example['reason'] or ""
        label = self.label_mapping[example['answer']]

        # Preprocesamiento de texto
        context = context[:self.max_length // 3]
        question = question[:self.max_length // 6]
        reason = reason[:self.max_length // 6]

        # Tokenización de las opciones
        c_plus_q_r = f"{context} {tokenizer.bos_token} {question} {tokenizer.sep_token} {reason} {tokenizer.sep_token}"
        c_plus_q_r_5 = [c_plus_q_r + f" {option}" for option in options]

        tokenized_examples = self.tokenizer(
            c_plus_q_r_5,
            max_length=self.max_length,
            padding="longest",
            truncation=True,
            return_tensors="pt",
        )

        input_ids = tokenized_examples['input_ids']
        attention_mask = tokenized_examples['attention_mask']

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Creamos los datasets
train_dataset = CustomRaceAnsweringModel(train_df, tokenizer)
val_dataset = CustomRaceAnsweringModel(val_df, tokenizer)

# Creamos los dataloaders para cargar los datos en batches de tamaño 4
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, pin_memory=True, num_workers=2, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, pin_memory=True, num_workers=2, collate_fn=collate_fn)

## Inicialización del modelo y optimizador

In [None]:
model = RobertaForMultipleChoice.from_pretrained('roberta-large')

# Verificamos si se dispone de GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Definimos el optimizador
optimizer = optim.AdamW(model.parameters(), lr=3e-5)

# Inicializamos el escalador para FP16
scaler = GradScaler()

model.safetensors:   0%|          | 0.00/1.42G [00:00<?, ?B/s]

Some weights of RobertaForMultipleChoice were not initialized from the model checkpoint at roberta-large and are newly initialized: ['classifier.bias', 'classifier.weight', 'roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Funciones de Entrenamiento y Validación

In [None]:
def train_model(model, train_loader, val_loader, optimizer, device, scaler, epochs=6, accumulation_steps=4):
    model.train()
    val_accuracies = []
    train_losses = []

    for epoch in range(epochs):
        total_loss = 0
        model.train()
        print(f"\nIniciando epoch {epoch + 1}/{epochs}")

        # Barra de progreso para monitorear el entrenamiento
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch + 1}/{epochs}")

        for i, batch in progress_bar:
            # Mover los tensores a la GPU
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # Usar autocast para precisión mixta (FP16)
            with autocast(device_type='cuda'):
                outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss / accumulation_steps

            scaler.scale(loss).backward()

            if (i + 1) % accumulation_steps == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            total_loss += loss.item() * accumulation_steps
            progress_bar.set_postfix({"Training Loss": total_loss / (i + 1)})

        # Pérdida promedio por epoch
        avg_loss = total_loss / len(train_loader)
        train_losses.append(avg_loss)
        print(f"Epoch {epoch+1}/{epochs} completado, Pérdida promedio: {avg_loss:.4f}")

        # Validar el modelo en el conjunto de validación
        val_accuracy = validate_model(model, val_loader, device)
        val_accuracies.append(val_accuracy)
        print(f"Epoch {epoch+1}/{epochs} completado, Precisión de validación: {val_accuracy:.4f}")

    # Graficar resultados
    plot_metrics(train_losses, val_accuracies, epochs)

def validate_model(model, val_loader, device):
    model.eval()
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # Obtener las predicciones del modelo
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            predictions = torch.argmax(outputs.logits, dim=1)

            # Guardar las etiquetas reales y las predicciones
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

    acc = accuracy_score(all_labels, all_predictions)
    return acc

# Graficar la pérdida y precisión
def plot_metrics(train_losses, val_accuracies, epochs):
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(range(1, epochs + 1), train_losses, marker='o', color='blue')
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Training Loss Over Epochs')

    plt.subplot(1, 2, 2)
    plt.plot(range(1, epochs + 1), val_accuracies, marker='o', color='green')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.title('Validation Accuracy Over Epochs')

    plt.tight_layout()
    plt.show()

## Configuración y ejecución del entrenamiento

In [None]:
print(f"El entrenamiento se realizará en: {device}")
train_model(model, train_loader, val_loader, optimizer, device, scaler, epochs=6)
print("Entrenamiento completado exitosamente.")

El entrenamiento se realizará en: cuda
Iniciando el entrenamiento...

Iniciando epoch 1/8


Epoch 1/8: 100%|██████████| 1406/1406 [12:17<00:00,  1.91it/s, Training Loss=1.46]

Epoch 1/8 completado, Pérdida promedio: 1.4621





Precisión de validación: 0.5669
Epoch 1/8 completado, Precisión de validación: 0.5669

Iniciando epoch 2/8


Epoch 2/8: 100%|██████████| 1406/1406 [12:15<00:00,  1.91it/s, Training Loss=1.08]

Epoch 2/8 completado, Pérdida promedio: 1.0798





## Guardar el modelo en Google Drive (Funciones y Configuración)

Si vas a usar este campo no es necesario correr las celdas anteriores
- Funciones ...
- Personalización ...

In [None]:
import os
import torch
from google.colab import drive

# Montamos Google Drive para almacenar el modelo
drive.mount('/content/drive')

# Definimos la ruta en Google Drive donde se guardará el modelo
model_save_path = '/content/drive/My Drive/Multichoice Question'

# Creamos la carpeta si no existe
os.makedirs(model_save_path, exist_ok=True)

# Guardar el mejor modelo basado en precisión de validación
def save_best_model(model, val_accuracy, model_save_path, best_accuracy):
    if val_accuracy > best_accuracy:
        model_file = "best_model.pt"
        save_path = os.path.join(model_save_path, model_file)
        torch.save(model.state_dict(), save_path)
        print(f"Nuevo mejor modelo guardado con precisión de validación: {val_accuracy:.4f}")
        return val_accuracy
    return best_accuracy

# Validar el modelo
def validate_model(model, val_loader, device):
    model.eval()
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            predictions = torch.argmax(outputs.logits, dim=1)

            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

    acc = accuracy_score(all_labels, all_predictions)
    print(f"Precisión de validación: {acc:.4f}")
    return acc

# Entrenamiento con validación y guardado del mejor modelo
def train_model(model, train_loader, val_loader, optimizer, device, scaler, epochs=6, accumulation_steps=4):
    best_val_accuracy = 0.0
    train_losses = []
    val_accuracies = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        print(f"\nIniciando epoch {epoch + 1}/{epochs}")
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch + 1}/{epochs}")

        for i, batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            with autocast(device_type='cuda'):
                outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss / accumulation_steps

            scaler.scale(loss).backward()

            if (i + 1) % accumulation_steps == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            total_loss += loss.item() * accumulation_steps
            progress_bar.set_postfix({"Training Loss": total_loss / (i + 1)})

        avg_loss = total_loss / len(train_loader)
        train_losses.append(avg_loss)
        print(f"Epoch {epoch+1}/{epochs} completado, Pérdida promedio: {avg_loss:.4f}")

        # Validar el modelo y guardar si es el mejor
        val_accuracy = validate_model(model, val_loader, device)
        val_accuracies.append(val_accuracy)
        best_val_accuracy = save_best_model(model, val_accuracy, model_save_path, best_val_accuracy)

    plot_training_validation_metrics(train_losses, val_accuracies, epochs)

# Graficar las métricas de entrenamiento y validación
def plot_training_validation_metrics(train_losses, val_accuracies, epochs):
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(range(1, epochs + 1), train_losses, marker='o', color='blue')
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Training Loss Over Epochs')

    plt.subplot(1, 2, 2)
    plt.plot(range(1, epochs + 1), val_accuracies, marker='o', color='green')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.title('Validation Accuracy Over Epochs')

    plt.tight_layout()
    plt.show()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"El entrenamiento se realizará en: {device}")
model.to(device)

# Inicializamos el escalador para FP16
scaler = GradScaler()

# Entrenamos el modelo
print("Iniciando el entrenamiento...")
train_model(model, train_loader, val_loader, optimizer, device, scaler, epochs=25)
print("Entrenamiento completado exitosamente.")

## TEST

No es necesario correr ninguna celda anterior

### Importación de librerías y configuración del dataloader

In [1]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaForMultipleChoice

# Definición del collate_fn para manejar batches de datos
def collate_fn(batch):
    max_len = max([item['input_ids'].size(1) for item in batch])  # Longitud máxima del batch

    input_ids = torch.stack([torch.cat([item['input_ids'], torch.full((item['input_ids'].size(0), max_len - item['input_ids'].size(1)), tokenizer.pad_token_id, dtype=torch.long)], dim=1) for item in batch])
    attention_mask = torch.stack([torch.cat([item['attention_mask'], torch.full((item['attention_mask'].size(0), max_len - item['attention_mask'].size(1)), 0, dtype=torch.long)], dim=1) for item in batch])

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
    }

# Definir la clase CustomRaceAnsweringModel para manejar el dataset de test
class CustomRaceAnsweringModel(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        example = self.data.iloc[idx]
        context = example['text'] or ""
        question = example['question'] or ""
        options = [example['A'], example['B'], example['C'], example['D'], example['E']]

        # Preprocesar y recortar el texto si es necesario
        context = context[:self.max_length // 3]
        question = question[:self.max_length // 6]

        # Concatenar el contexto y la pregunta con las opciones
        sep_token = self.tokenizer.sep_token or ""
        bos_token = self.tokenizer.bos_token or ""
        c_plus_q = f"{context} {bos_token} {question} {sep_token}"
        c_plus_q_options = [c_plus_q + f" {option}" for option in options]

        # Tokenización de las opciones
        tokenized_examples = self.tokenizer(
            c_plus_q_options,
            max_length=self.max_length,
            padding="longest",
            truncation=True,
            return_tensors="pt",
        )

        input_ids = tokenized_examples['input_ids']
        attention_mask = tokenized_examples['attention_mask']

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask
        }

# Inicializar el tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-large')

# Cargar el archivo CSV de test
test_file_path = 'test.csv'  # Ruta donde subiste el archivo test.csv
test_df = pd.read_csv(test_file_path, sep='\t')

# Crear el dataset para test.csv
test_dataset = CustomRaceAnsweringModel(test_df, tokenizer)

# Crear el dataloader
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, pin_memory=True, num_workers=2, collate_fn=collate_fn)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


### Montar Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Inicialización del modelo y carga de pesos entrenados

In [3]:
model = RobertaForMultipleChoice.from_pretrained('roberta-large')

# Verificamos si se dispone de GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Cargamos el modelo previamente entrenado
model_save_path = '/content/drive/My Drive/Multichoice Question/best_model.pt'
model.load_state_dict(torch.load(model_save_path))
model.eval()

Some weights of RobertaForMultipleChoice were not initialized from the model checkpoint at roberta-large and are newly initialized: ['classifier.bias', 'classifier.weight', 'roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  model.load_state_dict(torch.load(model_save_path))


RobertaForMultipleChoice(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 1024, padding_idx=1)
      (position_embeddings): Embedding(514, 1024, padding_idx=1)
      (token_type_embeddings): Embedding(1, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-23): 24 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=1024, out_features=1024, bias=True)
              (key): Linear(in_features=1024, out_features=1024, bias=True)
              (value): Linear(in_features=1024, out_features=1024, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=1024, out_features=1024, bias=True)
         

### Predicción sobre test.csv y generación de archivo con respuestas

In [None]:
def predict_and_save(model, test_loader, device, output_file='test.txt'):
    model.eval()
    predictions_list = []
    label_mapping = {0: "A", 1: "B", 2: "C", 3: "D", 4: "E"}

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            predictions = torch.argmax(outputs.logits, dim=1)

            # Convertir las predicciones numéricas en etiquetas
            predictions_list.extend([label_mapping[pred.item()] for pred in predictions])

    # Guardar las predicciones en un archivo .txt
    with open(output_file, 'w') as f:
        for pred in predictions_list:
            f.write(pred + '\n')

    print(f"Predicciones guardadas en {output_file}")

# Ejecutamos la predicción y guardar el archivo
predict_and_save(model, test_loader, device, output_file='test.txt')