# Estudio del impacto del reemplazo de la función de pooling en una CNN

In [31]:
# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F  # Commonly used functions like relu, softmax
from torch.nn import Unfold 

# Data loading
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

# Auxiliary functions
from torch.utils.tensorboard import SummaryWriter  # Used for Tensorboard logging
import os
import numpy as np
import matplotlib.pyplot as plt
from math import floor, ceil
import datetime

# Directorios

In [33]:
# Configuración de directorios para la organización del proyecto

# Directorio raíz del proyecto
base_dir = 'cnn_project'

# Subdirectorios
data_dir = os.path.join(base_dir, 'data')                 # Para datasets
reports_dir = os.path.join(base_dir, 'reports')            # Para resultados y modelos
models_dir = os.path.join(reports_dir, 'models')           # Para guardar modelos entrenados
results_dir = os.path.join(reports_dir, 'results')         # Para los resultados de pruebas en el conjunto de test
runs_dir = os.path.join(reports_dir, 'runs')               # Para logs de Tensorboard

# Crear los directorios si no existen
os.makedirs(data_dir, exist_ok=True)
os.makedirs(models_dir, exist_ok=True)
os.makedirs(results_dir, exist_ok=True)
os.makedirs(runs_dir, exist_ok=True)

writer = SummaryWriter(log_dir=runs_dir)

print("Estructura de carpetas creada:")
print(f"Data directory: {data_dir}")
print(f"Reports directory: {reports_dir}")
print(f"Models directory: {models_dir}")
print(f"Results directory: {results_dir}")
print(f"Runs directory: {runs_dir}")

Estructura de carpetas creada:
Data directory: cnn_project/data
Reports directory: cnn_project/reports
Models directory: cnn_project/reports/models
Results directory: cnn_project/reports/results
Runs directory: cnn_project/reports/runs


## Cuda o CPU

In [41]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


## Carga de datos: Datasets y Dataloaders

In [43]:
# Proporción de entrenamiento
train_proportion = 0.9  
num_train = 5000
indices = list(range(num_train))
split = int(np.floor(train_proportion * num_train))

np.random.shuffle(indices) 

train_idx, val_idx = indices[:split], indices[split:]

# Generar samplers para el conjunto de entrenamiento y validación
train_sampler = SubsetRandomSampler(train_idx)
val_sampler = SubsetRandomSampler(val_idx)

In [45]:
batch_size = 128
num_workers =  2
KERNEL_SIZE = 3
STRIDE = 3
PADDING = 1

In [47]:
# Transformaciones
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalización para CIFAR-10
])

# Descarga del dataset CIFAR-10
train_dataset = datasets.CIFAR10(root=data_dir, train=True, transform=transform, download=True)
val_dataset = datasets.CIFAR10(root=data_dir, train=True, transform=transform, download=True)
test_dataset = datasets.CIFAR10(root=data_dir, train=False, transform=transform, download=True)

# Dataloaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=batch_size, sampler=val_sampler, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

print("Datasets cargados y dataloaders creados.")
print(f"Tamaño del conjunto de entrenamiento: {len(train_loader.sampler)}")
print(f"Tamaño del conjunto de validación: {len(val_loader.sampler)}")
print(f"Tamaño del conjunto de prueba: {len(test_dataset)}")

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Datasets cargados y dataloaders creados.
Tamaño del conjunto de entrenamiento: 4500
Tamaño del conjunto de validación: 500
Tamaño del conjunto de prueba: 10000


## AggPoolingLayer

In [84]:
class AggPoolingLayer(nn.Module):
    def __init__(self, kernel_size, stride=None, padding=0, agg_function='mean', keepdims=False):
        super(AggPoolingLayer, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride or kernel_size
        self.padding = padding
        self.agg_function = agg_function
        self.keepdims = keepdims
        self.owa_weights = nn.Parameter(torch.ones(self.kernel_size * self.kernel_size))  # Define con el tamaño del bloque

    def forward(self, x):
        # Aplicar im2col para dividir el tensor en bloques
        unfolded_x = F.unfold(x, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding)
        unfolded_x = unfolded_x.view(x.size(0), x.size(1), self.kernel_size * self.kernel_size, -1)

        # Normalización min-max
        min_vals = unfolded_x.min(dim=2, keepdim=True)[0]
        max_vals = unfolded_x.max(dim=2, keepdim=True)[0]
        unfolded_x = (unfolded_x - min_vals) / (max_vals - min_vals + 1e-6)
        
        # Selección de la función de agregación
        if self.agg_function == 'mean':
            agg_result = unfolded_x.mean(dim=2)
        elif self.agg_function == 'max':
            agg_result = unfolded_x.max(dim=2)[0]
        elif self.agg_function == 'owa':
            # Ordenar los valores en cada bloque y aplicar los pesos de OWA
            sorted_vals, _ = unfolded_x.sort(dim=2, descending=True)
            # Asegura que `owa_weights` tenga el mismo tamaño que cada bloque
            owa_weights_expanded = self.owa_weights.view(1, 1, -1, 1)
            agg_result = (sorted_vals * owa_weights_expanded).sum(dim=2)
        elif self.agg_function == 'choquet':
            sorted_vals, _ = unfolded_x.sort(dim=2, descending=True)
            choquet_vals = sorted_vals * torch.cumsum(sorted_vals, dim=2)
            agg_result = choquet_vals.sum(dim=2)
        elif self.agg_function == 'tnorm_min':
            agg_result = unfolded_x.min(dim=2)[0]
        elif self.agg_function == 'tnorm_product':
            agg_result = unfolded_x.prod(dim=2)
        elif self.agg_function == 'tnorm_lukasiewicz':
            agg_result = torch.clamp(unfolded_x.sum(dim=2) - 1, min=0)
        elif self.agg_function == 'tnorm_hamacher':
            agg_result = (unfolded_x.prod(dim=2)) / (unfolded_x.sum(dim=2) - unfolded_x.prod(dim=2) + 1e-6)
        elif self.agg_function == 'tconorm_max':
            agg_result = unfolded_x.max(dim=2)[0]
        elif self.agg_function == 'tconorm_lukasiewicz':
            agg_result = torch.clamp(unfolded_x.sum(dim=2), max=1)
        elif self.agg_function == 'tconorm_hamacher':
            agg_result = (unfolded_x.sum(dim=2) - unfolded_x.prod(dim=2)) / (1 - unfolded_x.prod(dim=2) + 1e-6)
        elif self.agg_function == 'uninorm_minmax':
            agg_result = torch.where(unfolded_x <= 0.5, unfolded_x.min(dim=2)[0], unfolded_x.max(dim=2)[0])
        elif self.agg_function == 'uninorm_ll':
            agg_result = torch.where(unfolded_x <= 0.5, torch.clamp(unfolded_x.sum(dim=2) - 1, min=0), torch.clamp(unfolded_x.sum(dim=2), max=1))
        else:
            raise ValueError(f"Función de agregación desconocida: {self.agg_function}")

        # Desnormalización
        agg_result = agg_result * (max_vals.squeeze(2) - min_vals.squeeze(2) + 1e-6) + min_vals.squeeze(2)
        
        # Cambiar la forma a [batch_size, num_channels, output_height, output_width]
        output_size = int((x.size(2) + 2 * self.padding - self.kernel_size) / self.stride + 1)
        agg_result = agg_result.view(x.size(0), x.size(1), output_size, output_size)
        
        return agg_result


In [21]:
def hamacher_tconorm(tensor, keepdim=False, dim=-1):
    tensor_shape = list(tensor.shape)
    tensor_shape.pop(dim)
    out_tensor = tensor.new_zeros(tensor_shape)
    ones = tensor.new_ones(tensor_shape)
    # Indexar la Ãºtlima dimensiÃ³n facilita la legibilidad del cÃ³digo (tendrÃ­amos que usar torch.index_select en caso contrario)
    if (dim == -1) or (dim == len(tensor.shape)-1):
        out_tensor = tensor[..., 0] # Tensor auxiliar donde acumularemos la salida (harÃ¡ las veces de x)
        for i in range(1, tensor.shape[dim]):  # La t-conorma es asociativa: Trataremos los elementos de 2 en 2
            # Dado que la t-conorma es asociativa, trataremos los elementos de 2 en 2. En cada iteraciÃ³n:
            x = out_tensor
            y = tensor[..., i]
            diff_indices = torch.where(torch.abs(torch.mul(x, y)-1) > 1e-9) # Devuelve los Ã­ndices de los elementos para los cuÃ¡les x*y-1 > 0 (condiciÃ³n de la funciÃ³n por partes)
            # Asignamos los valores en funciÃ³n de las condiciones
            # if ab == 1 -> T(a, b) = 1
            out_tensor = ones  # Por defecto, asumimos que todos los valores caen en el caso x*y-1=0
            # otherwise -> T(a, b) = (2ab - a - b) / (ab - 1)
            out_tensor[diff_indices] = (
                2 * torch.mul(x[diff_indices], y[diff_indices]) - x[diff_indices] - y[diff_indices]) / (
                torch.mul(x[diff_indices], y[diff_indices]) - 1)  # Corregimos los valores para los cuÃ¡les x*y-1>0 (los que corresponden a los Ã­ndices de diff_indices)
    else:
        # El cÃ³digo serÃ­a idÃ©ntico, sustituyendo tensor[..., 0] por torch.index_select(tensor, dim, tensor.new_tensor([0], dtype=torch.int)).squeeze(dim)
        # torch.index_select(tensor, dim, tensor.new_tensor([0], dtype=torch.int)).squeeze(dim) indexa todos los elementos de la dimensiÃ³n dim
        # NO HACE FALTA IMPLEMENTARLO
        raise Exception('Utilizar la versiÃ³n con dim=-1')
    if keepdim:
        torch.unsqueeze(out_tensor, dim=dim)
    return out_tensor

## Definición del modelo

In [98]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = AggPoolingLayer(kernel_size=3, stride=1, padding=0, agg_function='mean')
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = AggPoolingLayer(kernel_size=3, stride=1, padding=0, agg_function='mean')
        
        # Calcular el tamaño de la salida después de las capas de convolución y pooling
        self._to_linear = None
        self.convs(torch.randn(1, 3, 32, 32))
        
        # Definir capas totalmente conectadas
        self.fc = nn.Linear(self._to_linear, 128)  # Capa intermedia
        self.fc2 = nn.Linear(128, 10)              # Capa final para 10 clases de salida
        
    def convs(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        if self._to_linear is None:
            self._to_linear = x.view(x.size(0), -1).shape[1]
        return x
    
    def forward(self, x):
        x = self.convs(x)         # Pasa por las capas convolucionales y de pooling
        x = x.view(x.size(0), -1) # Aplana el tensor antes de la capa totalmente conectada
        x = self.fc(x)            # Primera capa totalmente conectada
        x = F.relu(x)             # Activación ReLU para la primera capa totalmente conectada
        x = self.fc2(x)           # Segunda capa totalmente conectada para la salida final
        return x


## Configuración de TensorBoard

In [75]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, writer):

    for epoch in range(num_epochs):
        model.train()  # Poner el modelo en modo de entrenamiento
        train_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Reiniciar los gradientes
            optimizer.zero_grad()

            # Paso hacia adelante
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Paso hacia atrás y optimización
            loss.backward()
            optimizer.step()

            # Acumular la pérdida y la precisión
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Calcular la pérdida y precisión de la época
        epoch_loss = train_loss / len(train_loader)
        epoch_accuracy = 100 * correct / total

        # Registrar en TensorBoard
        writer.add_scalar('Pérdida de entrenamiento', epoch_loss, epoch)
        writer.add_scalar('Precisión de entrenamiento', epoch_accuracy, epoch)

        # Validación del modelo
        validate_model(model, val_loader, criterion, epoch, writer)

        print(f'Epoca [{epoch + 1}/{num_epochs}], Pérdida: {epoch_loss:.4f}, Precisión: {epoch_accuracy:.2f}%')

    return model

def validate_model(model, val_loader, criterion, epoch, writer):
    model.eval()  # Poner el modelo en modo de evaluación
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # No calcular gradientes en validación
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Acumular la pérdida y la precisión
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Calcular la pérdida y precisión de validación
    epoch_val_loss = val_loss / len(val_loader)
    epoch_val_accuracy = 100 * correct / total

    # Registrar en TensorBoard
    writer.add_scalar('Pérdida de validación', epoch_val_loss, epoch)
    writer.add_scalar('Precisión de validación', epoch_val_accuracy, epoch)

    print(f'Validación - Pérdida: {epoch_val_loss:.4f}, Precisión: {epoch_val_accuracy:.2f}%')


In [None]:
model = SimpleCNN()

# Inicialización de los parámetros
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
num_epochs = 20


# Entrena el modelo
trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, writer)
writer.close()

Validación - Pérdida: 2.2791, Precisión: 22.20%
Epoca [1/20], Pérdida: 2.2952, Precisión: 13.67%
Validación - Pérdida: 2.2280, Precisión: 24.60%
Epoca [2/20], Pérdida: 2.2577, Precisión: 23.91%
Validación - Pérdida: 2.1456, Precisión: 27.20%
Epoca [3/20], Pérdida: 2.1876, Precisión: 25.13%
