# Introducción

En este documento se presenta el desarrollo completo del proceso de entrenamiento y validación de un modelo diseñado para clasificar imagenes marinas en buena y mala calidad. 

In [1]:

import os
import random
from PIL import Image,  ImageOps
import numpy as np
import tensorflow as tf
from tensorflow.python.keras.layers import Input, Dense
import time
import torch
import torchvision
import torch.nn as nn
import torchvision.datasets as datasets
from torch.utils.data import Subset
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torch.optim as optim
import copy
from sklearn.metrics import confusion_matrix
from torch.utils.data import RandomSampler
import pandas as pd
import shutil
from PIL import ImageFile



In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"Usando GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("GPU no disponible, usando CPU")

Usando GPU: NVIDIA GeForce GTX 1060 6GB


In [8]:
transform = transforms.Compose([
    transforms.Resize((299,299)),#Las transforma a imagenes de 299x299
    transforms.ToTensor() #Transforma las imagenes a tensor
])


transform1 = transforms.Compose([ 
    transforms.RandomRotation(degrees=(60, 140)),#Realiza una rotacion entre 60/140 grados a todas las imagenes
    transforms.Resize((299,299)),   
    transforms.CenterCrop(180), #Se centra para eliminar las esquinas negras que aparecen
    transforms.Resize((299,299)), #Se vuelve al tamaño 299x299
    transforms.ToTensor()
])
transform2 = transforms.Compose([
    transforms.Resize((299,299)),
    transforms.RandomRotation(degrees=(200, 340)), #Realiza una rotacion entre 200/340 grados a todas las imagenes
    transforms.CenterCrop(180),
    transforms.Resize((299,299)),
    transforms.ToTensor()
])


transform3 = transforms.Compose([
    transforms.Resize((299,299)),
    transforms.RandomHorizontalFlip(p=1), #Realiza un flip horizontal a todas las imagenes
    transforms.ToTensor()
])

transform4 = transforms.Compose([
    transforms.Resize((299,299)),
    transforms.RandomVerticalFlip(p=1), #Realiza un flip vertical a todas las imagenes
    transforms.ToTensor()
])



# Constructor del dataset.
class goodBadDataset(Dataset):
    def __init__(self, image_paths, transform):
      super().__init__()
      self.paths = image_paths
      self.len = len(self.paths)
      self.transform = transform

    def __len__(self):
      return self.len

    def __getitem__(self, index):
      path = self.paths[index]
      image = Image.open(path)
      image = self.transform(image)
      label = 0 if 'Buenas-Reference' in path or 'Buena_Calidad' in path else 1 #Si en el path se encuentra el texto buena calidad se le coloca la etiqueta 0 si no un 1.
      return (image,label,path) #Devuelve la imagen, su etiqueta y su path

class goodBadDataset2(Dataset):
    def __init__(self, image_paths, transform):
      super().__init__()
      self.paths = image_paths
      self.len = len(self.paths)
      self.transform = transform

    def __len__(self):
      return self.len

    def __getitem__(self, index):
      path = self.paths[index]
      image = Image.open(path)
      image = self.transform(image)
      label = 0 if 'Buena_Calidad' in path else 1 #Si en el path se encuentra el texto buena calidad se le coloca la etiqueta 0 si no un 1.
      return (image,label,path) #Devuelve la imagen, su etiqueta y su path

# Algoritmo de entreno

In [12]:
def train(model, device, train_loader, optimizer, epoch, log_interval=10, verbose=True):
    
    model.train() #Se pone en estado de entreno

    loss_v = 0

    correct=0
    
    for batch_idx, (data, target,_) in enumerate(train_loader): #Por cada batch en el dataset
    
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)

        loss = nn.CrossEntropyLoss(reduction='sum')(output, target) 
            
        loss.backward()
        optimizer.step()
        if verbose:
            if len(data)==20:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}, Average: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item(), loss.item()/ len(data)))
            else:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}, Average: {:.6f}'.format(
                    epoch,len(train_loader.dataset)-len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item(), loss.item()/ len(data)))
        
        
        pred = output.argmax(dim=1, keepdim=True) 
        correct += pred.eq(target.view_as(pred)).sum().item() #Si se ha predecido bien se suma.
        loss_v += loss.item()

    loss_v /= len(train_loader.dataset)
    accuracy=correct / len(train_loader.dataset)
    print('\nTrain set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        loss_v, correct, len(train_loader.dataset),
        100. * correct / len(train_loader.dataset)))
    
 
    
    return loss_v,accuracy # Se devuelve la perdida y la precision conseguida


def test(model, device, test_loader):
    model.eval() #Se pone en estado de evaluacion
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target,_ in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)

            test_loss += nn.CrossEntropyLoss(reduction='sum')(output, target) 
            
            
            pred = output.argmax(dim=1, keepdim=True) 
            correct += pred.eq(target.view_as(pred)).sum().item() #Se suma si se ha predecido de forma correcta la etiqueta
 
  
    test_loss /= len(test_loader.dataset) #Se calcula la media de la perdida

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
    
    
    accuracy=correct / len(test_loader.dataset)
    return test_loss,accuracy # Se devuelve la perdida y la precision conseguida

# Definicion del modelo

In [13]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [14]:

    
def initialize_model2(num_classes):
    #Los pesos si se entrenan
    model_ft = models.inception_v3(weights=False) #Se carga el modelo inception_v3 sin sus pesos preentrenados.
    # False en las redes convencionales, se entrenaran
    set_parameter_requires_grad(model_ft, False)
    # Se modifica la red auxiliar
    num_ftrs = model_ft.AuxLogits.fc.in_features
    model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)

    # Se modifica la red principal para que sea de dos clases con una salida softmax y coger la con mas probabilidad.
    model_ft.dropout= nn.Dropout(0.5)
    num_ftrs = model_ft.fc.in_features
    model_ft.fc = nn.Sequential(
        nn.Linear(num_ftrs,num_classes),
        nn.Dropout(0.3),
        nn.Softmax(dim=1)
    )


    input_size=299 #Tamaño que usa la red
    return model_ft,input_size



In [56]:
def especifidad(MatrizConfusion):
    espeficifadM=(MatrizConfusion[1][1]/(MatrizConfusion[1][1]+MatrizConfusion[1][0]))*100
    return round(espeficifadM)

In [57]:
def recall(MatrizConfusion):
    recallM=(MatrizConfusion[0][0]/(MatrizConfusion[0][0]+MatrizConfusion[0][1]))*100
    return round(recallM)

In [58]:
def PrecisionClasePositiva(MatrizConfusion):
    PreP=(MatrizConfusion[0][0]/(MatrizConfusion[0][0]+MatrizConfusion[1][0]))*100
    return round(PreP)

In [59]:
def PrecisionClaseNegativa(MatrizConfusion):
    PreN=(MatrizConfusion[1][1]/(MatrizConfusion[1][1]+MatrizConfusion[0][1]))*100
    return round(PreN)

In [60]:
def exactitud(MatrizConfusion):
    exactitudM=((MatrizConfusion[0][0]+MatrizConfusion[1][1])/(MatrizConfusion[0][0]+MatrizConfusion[0][1]+MatrizConfusion[1][1]+MatrizConfusion[1][0]))*100
    return round(exactitudM)

# Entreno EUPV

## Dataloader de entreno

In [17]:
Buena_calidad=os.listdir('..\Imagenes\Dataset 2 modificado\Entreno\Buena_Calidad/')
##Buena_calidad = list(filter(lambda x: x != 'Imagen', Buena_calidad))
Buena_calidad= list(map(lambda  p: f"..\Imagenes\Dataset 2 modificado\Entreno\Buena_Calidad/{p}",Buena_calidad))
ImageFile.LOAD_TRUNCATED_IMAGES = True

Mala_calidad=os.listdir('..\Imagenes\Dataset 2 modificado\Entreno\Mala_Calidad/')
##Mala_calidad = list(filter(lambda x: x != 'Imagen', Mala_calidad))
Mala_calidad= list(map(lambda  p: f"..\Imagenes\Dataset 2 modificado\Entreno\Mala_Calidad/{p}",Mala_calidad))

print("Total de imagenes de Buena calidad en el dataset de entreno", len(Buena_calidad))
print("Total de imagenes de Mala calidad en el dataset de entreno", len(Mala_calidad))

img_files = Buena_calidad + Mala_calidad

DatasetEntrenodMod=goodBadDataset(img_files,transform)+goodBadDataset(img_files,transform1)+goodBadDataset(img_files,transform2)+goodBadDataset(img_files,transform3)+goodBadDataset(img_files,transform4) # #Se genera un dataset de entreno con todas las transformaciones expuestas anteriormente a las imagenesy se pasan a tensor.


#Randomizar las imagenes del dataset de entreno con las imagenes transformadas
random_train_idx = np.random.choice(np.array(range(len(DatasetEntrenodMod))),replace=False, size=DatasetEntrenodMod.__len__())
train_subset = Subset(DatasetEntrenodMod, random_train_idx)
DataloaderEntrenoMod=DataLoader(train_subset,batch_size=20,shuffle=True)

print("Longitud del dataset de entreno con transformaciones: ",len(DatasetEntrenodMod))
print("Cantidad de batches de 20 del dataloader de entreno con transformaciones: ",len(DataloaderEntrenoMod))


Total de imagenes de Buena calidad en el dataset de entreno 2540
Total de imagenes de Mala calidad en el dataset de entreno 2605
Longitud del dataset de entreno con transformaciones:  25725
Cantidad de batches de 20 del dataloader de entreno con transformaciones:  1287


## Dataloader de validación

In [18]:
ValBuena_calidad=os.listdir('..\Imagenes\Dataset 2 modificado\Validacion\Buena_Calidad/')
##ValBuena_calidad = list(filter(lambda x: x != 'Imagen', ValBuena_calidad))
ValBuena_calidad= list(map(lambda  p: f"..\Imagenes\Dataset 2 modificado\Validacion\Buena_Calidad/{p}",ValBuena_calidad))

ValMala_calidad=os.listdir('..\Imagenes\Dataset 2 modificado\Validacion\Mala_Calidad/')
##ValMala_calidad = list(filter(lambda x: x != 'Imagen', ValMala_calidad))
ValMala_calidad= list(map(lambda  p: f"..\Imagenes\Dataset 2 modificado\Validacion\Mala_Calidad/{p}",ValMala_calidad))

print("Total de imagenes de Buena calidad en el dataset de validacion", len(ValBuena_calidad))
print("Total de imagenes de Mala calidad en el dataset de validacion", len(ValMala_calidad))

Val_Imagenes = ValBuena_calidad + ValMala_calidad

val_dsMod = goodBadDataset(Val_Imagenes, transform) #Se carga el dataset de validacion con solo imagenes pasadas a tensor y de tamaño 299x299
val_dlMod = DataLoader(val_dsMod, batch_size=4) #Se carga el dataloader de validacion.

print("Longitud del dataset de validacion: ",len(val_dsMod))


Total de imagenes de Buena calidad en el dataset de validacion 600
Total de imagenes de Mala calidad en el dataset de validacion 600
Longitud del dataset de validacion:  1200


## Modelo con datos Ampliados

In [19]:
modelModft,_= initialize_model2(2)
modelMod=modelModft.to(device)

modelModft.aux_logits=False

params_to_update = modelModft.parameters()
print("Params to learn:")
params_to_update = []
for name,param in modelModft.named_parameters():
    
    if (param.requires_grad == True) & (not ('AuxLogits' in name)):
        params_to_update.append(param)
        print("\t",name)


print(modelModft.fc)


parametrosMod=sum(p.numel() for p in params_to_update)



Params to learn:
	 Conv2d_1a_3x3.conv.weight
	 Conv2d_1a_3x3.bn.weight
	 Conv2d_1a_3x3.bn.bias
	 Conv2d_2a_3x3.conv.weight
	 Conv2d_2a_3x3.bn.weight
	 Conv2d_2a_3x3.bn.bias
	 Conv2d_2b_3x3.conv.weight
	 Conv2d_2b_3x3.bn.weight
	 Conv2d_2b_3x3.bn.bias
	 Conv2d_3b_1x1.conv.weight
	 Conv2d_3b_1x1.bn.weight
	 Conv2d_3b_1x1.bn.bias
	 Conv2d_4a_3x3.conv.weight
	 Conv2d_4a_3x3.bn.weight
	 Conv2d_4a_3x3.bn.bias
	 Mixed_5b.branch1x1.conv.weight
	 Mixed_5b.branch1x1.bn.weight
	 Mixed_5b.branch1x1.bn.bias
	 Mixed_5b.branch5x5_1.conv.weight
	 Mixed_5b.branch5x5_1.bn.weight
	 Mixed_5b.branch5x5_1.bn.bias
	 Mixed_5b.branch5x5_2.conv.weight
	 Mixed_5b.branch5x5_2.bn.weight
	 Mixed_5b.branch5x5_2.bn.bias
	 Mixed_5b.branch3x3dbl_1.conv.weight
	 Mixed_5b.branch3x3dbl_1.bn.weight
	 Mixed_5b.branch3x3dbl_1.bn.bias
	 Mixed_5b.branch3x3dbl_2.conv.weight
	 Mixed_5b.branch3x3dbl_2.bn.weight
	 Mixed_5b.branch3x3dbl_2.bn.bias
	 Mixed_5b.branch3x3dbl_3.conv.weight
	 Mixed_5b.branch3x3dbl_3.bn.weight
	 Mixed_5b.b

## Entreno con Early stopping

In [None]:
# Variables para el seguimiento del early stopping
best_val_loss = float('inf')
patience = 5  # Número de épocas sin mejora antes de detener el entrenamiento
counter = 0

epochs=40
lr=0.001
EpocasTotalesEUPV = epochs

acurracytrainEUPV = []
acurracvalEUPV = []
losstrainEUPV = []
lossvalEUPV = []


print("Parameters ",parametrosMod)
optimizer = optim.Adam(params_to_update, lr=lr)


# Loop de entrenamiento
for epoch in range(epochs):
    # Entrenamiento
    train_loss, train_accuracy = train(modelMod, device, DataloaderEntrenoMod, optimizer, epoch, verbose=True)
    
    # Evaluación en el conjunto de validación
    val_loss, val_accuracy = test(modelMod, device, val_dlMod)
    
    acurracytrainEUPV.append(train_accuracy)
    acurracvalEUPV.append(val_accuracy)
    losstrainEUPV.append(train_loss)
    lossvalEUPV.append(val_loss)
    
    
    
    # Verifica si la pérdida de validación ha mejorado
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_epoch = epoch
        counter = 0  # Reinicia el contador de paciencia
        ruta_base = r'..\Pesos\Dataset 2 Modificado\modeltranformaciones'
        # Convierte el entero a una cadena y concatena todo
        PATH = f"{ruta_base}Epoca{epoch}.pt"      
        torch.save(modelMod.state_dict(), PATH)
    else:
        counter += 1
    
    # Aplica early stopping si se alcanza la paciencia máxima
    if counter >= patience:  # Ajusta el número de épocas de paciencia según tus necesidades
        print(f'Early stopping after epoch {epoch} (Best epoch: {best_epoch}).')
        EpocasTotalesEUPV = epoch + 1
        break

# Descripcion de como Cargar un modelo

Se debe tener una instancia del modelo creada, para ello primero se tiene que definir el modelo. Con el siguiente codigo se define el modelo utilizado:

    def initialize_model2(num_classes):
        #Los pesos si se entrenan
        model_ft = models.inception_v3(weights=False) #Se carga el modelo inception_v3 sin sus pesos preentrenados.
        # False en las redes convencionales, se entrenaran
        set_parameter_requires_grad(model_ft, False)
        # Se modifica la red auxiliar
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)

        #Se modifica la red principal para que sea de dos clases con una salida softmax y coger la con mas probabilidad.
        model_ft.dropout= nn.Dropout(0.5)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Sequential(
            nn.Linear(num_ftrs,num_classes),
            nn.Dropout(0.3),
            nn.Softmax(dim=1)
        )

        input_size=299 #Tamaño que usa la red
        return model_ft,input_size
    


Y con la siguiente funcion:

    def set_parameter_requires_grad(model, feature_extracting):
        if feature_extracting:
            for param in model.parameters():
                param.requires_grad = False
            
            

Y a continuacion se instancia un modelo. Para hacer eso se deben utilizar las siguientes lineas de codigo:
Device cpu si no se dispone de cuda y si no  device = torch.device("cuda:0") con el indice deseado.

    device = torch.device("cpu")
    Modeloft,_= initialize_model2(2)
    Modelo=Modeloft.to(device)

    #Para inutilizar la capa auxiliar.
    Modeloft.aux_logits=False



Para poder cargar los pesos del modelo se hace con la siguiente linea:

    PATH = '..\Pesos\modeltranformaciones.pt'
    Modelo.load_state_dict(torch.load(PATH))
    


Y para finalizar para poder usarlo en modo prediccion hay que ponerlo en modo eval:

    Modelo.eval()

# Puntuaciones a los diferentes datasets

In [7]:
class UnlabeledImageDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        path = self.image_paths[index]
        image = Image.open(path)
        
        if self.transform:
            image = self.transform(image)
        
        return image,path

# Recuento y Valorado de imagenes

In [12]:
Modelos = [
    "Dataset 2 Modificado\modeltranformacionesEpoca38",
    "Dataset unidos\modeltranformacionesEpoca26"  
]
RutasImagenes = [
    "../Imagenes/Pruebas/challenging-60/",
    "../Imagenes/Pruebas/Paired/underwater_scenes/Buena_Calidad/",
    "../Imagenes/Pruebas/Paired/underwater_scenes/Mala_Calidad/",
    "../Imagenes/Pruebas/Paired/underwater_imagenet/Buena_Calidad/",
    "../Imagenes/Pruebas/Paired/underwater_imagenet/Mala_Calidad/",
    "../Imagenes/Pruebas/Paired/underwater_dark/Buena_Calidad/",
    "../Imagenes/Pruebas/Paired/underwater_dark/Mala_Calidad/",
    "../Imagenes/Dataset 2 modificado/Validacion/Buena_Calidad/",
    "../Imagenes/Dataset 2 modificado/Validacion/Mala_Calidad/",
    "../Imagenes/Dataset 1/Validacion/Buenas-Reference/",
    "../Imagenes/Dataset 1/Validacion/Malas-Raw/" 
]
carpetasMo = [
    "Dataset 2 mod",
    "Dataset Unido"
]
archivosPunt = [
    "PuntuacionesChallenging",
    "PuntuacionesPairedScenesGood",
    "PuntuacionesPairedScenesBad",
    "PuntuacionesPairedNetGood",
    "PuntuacionesPairedNetBad",
    "PuntuacionesPairedDarkGood",
    "PuntuacionesPairedDarkBad",
    "Dataset 2 modificado Good",
    "Dataset 2 modificado Bad",
    "Dataset 1 Good",
    "Dataset 1 Bad"     
]
NombreColImagenes = [
    "Challenging",
    "Scenes Buenas",
    "Scenes Malas",
    "Net Buenas",
    "Net Malas",
    "Dark Buenas",
    "Dark Malas",
    "Dataset 2 Buenas",
    "Dataset 2 Malas",
    "Dataset 1 Buenas",
    "Dataset 1 Malas"  
]

In [13]:
print(RutasImagenes[1])

../Imagenes/Pruebas/Paired/underwater_scenes/Buena_Calidad/


In [14]:

modeloIndice = 0

for ruta in Modelos:
    # Crear un nuevo modelo
    Modeloft, _ = initialize_model2(2)
    Modelo = Modeloft.to(device)
    Modeloft.aux_logits = False

    # Construir la ruta completa al archivo
    PATH = f'..\\Pesos\\{ruta}.pt'

    # Cargar los pesos del modelo desde el archivo
    Modelo.load_state_dict(torch.load(PATH))
    Modelo.eval()
    print(f"Modelo cargado desde {PATH}")
    dfRecuentoImg = pd.DataFrame(columns=['Imagenes','Buenas', 'Malas']) 
      
    indice = 0
    for carpeta in RutasImagenes:
        
        Imagenes=os.listdir(carpeta)
        Imagenes= list(map(lambda  p: f"{carpeta}{p}",Imagenes))
        print("Longitud: " , len(Imagenes))
        Dataset = UnlabeledImageDataset(Imagenes,transform)
        buena_calidad = 0
        mala_calidad = 0
        dfMetricasImagenes = pd.DataFrame(columns=['Nombre Imagen', '% Buena calidad'])
        for i in range(len(Dataset)):
            img,path= Dataset.__getitem__(i)
            tensor_imagen = img.unsqueeze(0)
            tensor_imagen = tensor_imagen.to(device)
            with torch.no_grad():
                prediccion = Modelo(tensor_imagen)
                clase_mas_probable = torch.argmax(prediccion, dim=1)
                if clase_mas_probable == 0:
                    buena_calidad = buena_calidad + 1
                    destino = f'..\\Valores de calidad\\{carpetasMo[modeloIndice]}\\{NombreColImagenes[indice]}\\Buenas'
                else:
                    mala_calidad = mala_calidad + 1
                    destino = f'..\\Valores de calidad\\{carpetasMo[modeloIndice]}\\{NombreColImagenes[indice]}\\Malas'
    
                nombre_archivo = os.path.basename(path)
                destino_path = os.path.join(destino, nombre_archivo)
                shutil.copyfile(path, destino_path)

                #Prediccion[0][1] es el % de mala calidad que tiene la imagen
                dfMetricasImagenes.loc[i] = [path, '{:.6f}'.format(prediccion[0][0].item()* 100)]
        dfMetricasImagenes.to_csv(f'..\\Valores de calidad\\{carpetasMo[modeloIndice]}\\{archivosPunt[indice]}.csv', index=False)
        dfRecuentoImg.loc[indice] = [NombreColImagenes[indice],buena_calidad,mala_calidad]        
        indice = indice + 1
    dfRecuentoImg.to_csv(f'..\\Valores de calidad\\{carpetasMo[modeloIndice]}\\recuento{carpetasMo[modeloIndice]}.csv', index=False)    
    modeloIndice = modeloIndice + 1




Modelo cargado desde ..\Pesos\Dataset 2 Modificado\modeltranformacionesEpoca38.pt
Longitud:  60
Longitud:  2185
Longitud:  2185
Longitud:  3700
Longitud:  3700
Longitud:  5550
Longitud:  5550
Longitud:  600
Longitud:  600
Longitud:  190
Longitud:  190




Modelo cargado desde ..\Pesos\Dataset unidos\modeltranformacionesEpoca26.pt
Longitud:  60
Longitud:  2185
Longitud:  2185
Longitud:  3700
Longitud:  3700
Longitud:  5550
Longitud:  5550
Longitud:  600
Longitud:  600
Longitud:  190
Longitud:  190
