In [None]:
from transformers import YolosImageProcessor, AutoModelForObjectDetection
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from PIL import Image
import json
import os
from torch.utils.data import  DataLoader
from sklearn.model_selection import train_test_split
from torch.optim import AdamW
from transformers import get_scheduler
import numpy as np
import matplotlib.pyplot as plt
import copy

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForObjectDetection.from_pretrained("hustvl/yolos-small", 
                                                             num_labels=1,
                                                             ignore_mismatched_sizes=True)
model.to(device)
processor = YolosImageProcessor.from_pretrained("hustvl/yolos-small")

num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Parametros entrenables de la red {num_params}")

In [None]:
def convert_bboxes_to_width_height(data):
    """
    Convierte el formato de los bounding boxes en una lista de diccionarios.
    Cambia de [x_min, y_min, x_max, y_max] a [x_min, y_min, width, height].

    Args:
        data (list): Lista de diccionarios con la clave 'bbox' en formato [x_min, y_min, x_max, y_max].

    Returns:
        list: Lista de diccionarios con los bounding boxes convertidos.
    """
    for item in data:
        x_min, y_min, x_max, y_max = item['bbox']
        width = x_max - x_min
        height = y_max - y_min
        # Actualizar el bounding box al nuevo formato
        item['bbox'] = [x_min, y_min, width, height]
        item["category_id"] = 0 # el category_id se lo cambie de 1 a 0 porque creo que el modelo interpreta la clase 1 como no objeto
    return data

class CustomDataset(Dataset):
    def __init__(self, json_file, images_dir, processor,transform=None):
        with open(json_file, 'r') as file:
            self.data = json.load(file)
        
        self.images_dir = images_dir
        self.transform = transform
        self.processor = processor

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Obtener la información de la imagen
        image_data = self.data[idx]
        image_id = image_data["image_id"]
        image_name = image_data["image_name"]
        image_path = os.path.join(self.images_dir, image_name)
        
        # Cargar la imagen usando PIL
        image = Image.open(image_path)
        
        # Obtener las anotaciones
        converted_data = convert_bboxes_to_width_height(image_data["annotations"])
        annotations = converted_data
        
        # Transformaciones, si las hay
        if self.transform:
            image = self.transform(image)
        
        
        # Procesar con el processor
        inputs = self.processor(images=image, annotations={"image_id": image_id, "annotations": annotations}, return_tensors="pt")
        
        return {
            "pixel_values": inputs["pixel_values"].squeeze(0),  # Quitar batch dimension
            "labels": inputs["labels"][0]
        }
        
def guarda_ckpt(ckptpath, modelo, epoca, opt):
    estado_modelo = {'epoch': epoca,
                     'model_state_dict': modelo.state_dict(),
                     'optimizer_state_dict': opt.state_dict()}
    torch.save(estado_modelo, ckptpath)
    
def carga_ckpt(ckptpath, modelo, opt):
    checkpoint = torch.load(ckptpath)
    
    modelo.load_state_dict(checkpoint['model_state_dict'])
    opt.load_state_dict(checkpoint['optimizer_state_dict'])
    
    epoca = checkpoint['epoch']  # Obtener la época en la que se guardó
    return epoca

In [None]:
dataset = CustomDataset(json_file="labels.json", images_dir="frames", processor= processor)
# Obtenemos una lista de los índices del dataset
indices = list(range(len(dataset)))

# Primero dividimos entre entrenamiento y el resto (validación + prueba)
train_indices, temp_indices = train_test_split(indices, test_size=0.2, random_state=42)

# Ahora dividimos el resto entre validación y prueba
val_indices, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)

# Crear los datasets con los índices
train_dataset = torch.utils.data.Subset(dataset, train_indices)
val_dataset = torch.utils.data.Subset(dataset, val_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)

# Crear los DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True , collate_fn=lambda x: {
    "pixel_values": torch.stack([item["pixel_values"] for item in x]),
    "labels": [item["labels"] for item in x]  # Lista de diccionarios
} )
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=lambda x: {
    "pixel_values": torch.stack([item["pixel_values"] for item in x]),
    "labels": [item["labels"] for item in x]  # Lista de diccionarios
})
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=lambda x: {
    "pixel_values": torch.stack([item["pixel_values"] for item in x]),
    "labels": [item["labels"] for item in x]  # Lista de diccionarios
})
print(len(train_dataset),len(val_dataset), len(test_dataset))

In [None]:
numeroExperimento = "11"
EPOCAS = 15
PATH = f'checkpoint/checkpoint{numeroExperimento}.pt'
optimizer = AdamW(model.parameters(), lr=5e-5)
# Scheduler
#num_training_steps = len(train_loader) * EPOCAS  # 100 épocas
#scheduler = get_scheduler("linear", optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

In [None]:
historial = {'perdida_ent': np.zeros(EPOCAS, dtype = np.float32),
             'perdida_val': np.zeros(EPOCAS, dtype = np.float32)}

perdidaMinima = torch.inf
mejorModelo = copy.deepcopy(model)

for e in range(EPOCAS):  # Número de épocas
    model.train()
    trainLoss = 0.0
    for i, batch in enumerate(train_loader):
        print(f"Va en la epoca:{e + 1 } , en el lote de entrenamiento: {i} ")
        # Mover datos a GPU
        pixel_values = batch["pixel_values"].to(device)
        labels = [{k: v.to(device) for k, v in t.items()} for t in batch["labels"]]
        optimizer.zero_grad()
        # Calcular pérdida
        outputs = model(pixel_values=pixel_values, labels=labels)
        loss = outputs.loss
        
        # Backpropagation
        loss.backward()
        optimizer.step()
        #scheduler.step()
        trainLoss += loss.item()
    
    trainLoss /= len(train_loader.dataset)
    historial['perdida_ent'][e] = trainLoss
    
    model.eval()
    valLoss = 0.0
    with torch.no_grad():
        for i, batch in enumerate(val_loader):
            print(f"Va en la epoca:{e + 1} , en el lote de validacion: {i} ")
            # Mover datos a GPU
            pixel_values = batch["pixel_values"].to(device)
            labels = [{k: v.to(device) for k, v in t.items()} for t in batch["labels"]]
            
            outputs = model(pixel_values=pixel_values, labels=labels)
            loss = outputs.loss
            valLoss += loss.item()
    
    valLoss /= len(val_loader.dataset)
    historial['perdida_val'][e] = valLoss
    
    if historial['perdida_val'][e] < perdidaMinima:
        mejorModelo.load_state_dict(model.state_dict())
        guarda_ckpt(PATH, model, e, optimizer)
        perdidaMinima = historial['perdida_val'][e]
        torch.save(mejorModelo.state_dict(), f"YOLOS_small_mejorModelo_entrenamiento_{numeroExperimento}_covid.pt")
        print(f"Guardo el mejor modelo en la epoca {e + 1}")
        
    print(f"Epoca {e+1}/{EPOCAS}")
    print(f"Train Loss de la epoca {e+1}: {trainLoss:.12f}")
    print(f"Validation Loss de la epoca {e+1}: {valLoss:.12f}")

In [None]:
torch.save(model.state_dict(), f"YOLOS_small_entrenamiento_{numeroExperimento}_covid.pt")

In [None]:
plt.plot(historial['perdida_ent'], label='Entrenamiento')
plt.plot(historial['perdida_val'], label='Validación')
plt.title(f'Perdida para el entrenamiento {numeroExperimento}')
plt.xlabel('Época')
plt.ylabel('Pérdida')
plt.legend()
plt.show()

In [None]:
import pickle
with open(f'historial_{numeroExperimento}.pkl', 'wb') as archivo_pickle:
    pickle.dump(historial, archivo_pickle)
    
with open(f'historial_{numeroExperimento}.pkl', 'rb') as f:
    history = pickle.load(f)

# Mostrar el contenido del diccionario
print(history)

In [None]:
#model.load_state_dict(torch.load('YOLOS_entrenamiento_1_covid.pt'))