Universidad Torcuato Di Tella

Licenciatura en Tecnología Digital\
**Tecnología Digital VI: Inteligencia Artificial**


In [1]:
import os
import torch
import torchaudio
import tarfile
import wandb
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch.utils.data import Dataset
from torchaudio.datasets import GTZAN
from torch.utils.data import DataLoader
import torchaudio.transforms as tt
from torch.utils.data import random_split
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


# TP3: Encodeador de música



## Orden de pasos

0. Elijan GPU para que corra mas rapido (RAM --> change runtime type --> T4 GPU)
1. Descargamos el dataset y lo descomprimimos en alguna carpeta en nuestro drive.
2. Conectamos la notebook a gdrive y seteamos data_dir con el path a los archivos.
3. Visualización de los archivos
4. Clasificación
5. Evaluación




In [None]:
project_name='TP3-TD6'
username = "sansonmariano-universidad-torcuato-di-tella"
wandb.login(key="d2875c91a36209496ee81454cccd95ebe3dc948d")
wandb.init(project = project_name, entity = username)

In [3]:
random_seed = 42

torch.manual_seed(random_seed)

# Definir parámetros
samplerate = 22050
data_dir = './genres_5sec'

In [4]:
# Función para parsear géneros
def parse_genres(fname):
    parts = fname.split('/')[-1].split('.')[0]
    return parts

# Definir la clase del dataset
class MusicDataset(Dataset):
    def __init__(self, root):
        super().__init__()
        self.root = root
        self.files = []
        for c in os.listdir(root):
            self.files += [os.path.join(root, c, fname) for fname in os.listdir(os.path.join(root, c)) if fname.endswith('.wav')]
        self.classes = list(set(parse_genres(fname) for fname in self.files))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        fpath = self.files[idx]
        genre = parse_genres(fpath)
        class_idx = self.classes.index(genre)
        audio = torchaudio.load(fpath)[0]
        return audio, class_idx

In [5]:
from sklearn.model_selection import StratifiedShuffleSplit
from torch.utils.data import DataLoader, Subset
import torch

def calculate_mean_std(dataset):
    """
    Calculate mean and standard deviation of the audio data in the dataset.
    """
    all_data = []
    for i in range(len(dataset)):
        audio, _ = dataset[i]
        all_data.append(audio)
    stacked_data = torch.cat(all_data, dim=1)
    mean = stacked_data.mean()
    std = stacked_data.std()
    return mean, std

def normalize_dataset(dataset, mean, std):
    """
    Normalize the dataset using provided mean and standard deviation.
    """
    normalized_data = []
    for i in range(len(dataset)):
        audio, label = dataset[i]
        normalized_audio = (audio - mean) / std
        normalized_data.append((normalized_audio, label))
    return normalized_data

def create_dataloaders(dataset, batch_size, test_size=0.3, val_size=0.5, random_state=42):
    """
    Splits the dataset into train, validation, and test subsets, normalizes them,
    and returns corresponding DataLoaders.
    """
    # Stratified split: train and temporary (val+test) split
    labels = [label for _, label in dataset]
    split = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    for train_idx, temp_idx in split.split(range(len(dataset)), labels):
        train_dataset = Subset(dataset, train_idx)
        temp_dataset = Subset(dataset, temp_idx)

    # Calculate mean and std on the training set only
    mean, std = calculate_mean_std(train_dataset)

    # Normalize each subset
    train_dataset = normalize_dataset(train_dataset, mean, std)
    temp_dataset = normalize_dataset(temp_dataset, mean, std)

    # Stratified split on temp data: validation and test split
    val_test_labels = [labels[i] for i in temp_idx]
    split = StratifiedShuffleSplit(n_splits=1, test_size=val_size, random_state=random_state)
    for val_idx, test_idx in split.split(temp_idx, val_test_labels):
        val_dataset = Subset(temp_dataset, val_idx)
        test_dataset = Subset(temp_dataset, test_idx)

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    return train_loader, val_loader, test_loader, train_dataset, val_dataset, test_dataset

# Usage example with MusicDataset
dataset = MusicDataset(data_dir)
batch_size = 20
train_loader, val_loader, test_loader, train_dataset, val_dataset, test_dataset = create_dataloaders(dataset, batch_size)

In [6]:
list_files=os.listdir(data_dir)

classes = []

for file in list_files:

  name='{}/{}'.format(data_dir,file)

  if os.path.isdir(name):

    classes.append(file)

### 3. Visualización de los archivos

In [7]:
def audio_to_spectrogram(waveform):
    # Ensure the waveform is in the correct shape
    if len(waveform.shape) == 1:
        waveform = waveform.unsqueeze(0)
    
    # Convert the waveform to a spectrogram
    spectrogram = tt.Spectrogram()(waveform)
    return spectrogram

def process_dataloader_to_spectrograms(dataloader):
    spectrograms = []
    
    for batch in dataloader:
        # Assuming the batch is a tuple (waveforms, labels) and waveforms are the audio data
        waveforms, labels = batch
        
        # Process each waveform in the batch
        batch_spectrograms = [audio_to_spectrogram(waveform) for waveform in waveforms]
        
        # Append to the list of spectrograms
        spectrograms.append((torch.stack(batch_spectrograms), labels))
    
    return spectrograms

In [8]:
input_channels = 1  # for RGB images, or 1 for grayscale
num_classes = 10    # depends on your specific classification task

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [45]:
train_spectogram = process_dataloader_to_spectrograms(train_loader)
val_spectogram = process_dataloader_to_spectrograms(val_loader)
test_spectogram = process_dataloader_to_spectrograms(test_loader)



### Ejercicio 1

Modelo que recibe el tamaño de la capa y la cantidad de capas

In [10]:
# Ajuste en la clase del modelo
class ExperimentNN(nn.Module):
    def __init__(self, input_size, num_classes, layer_size, layers):
        super(ExperimentNN, self).__init__()
        self.fc_layers = nn.ModuleList()
        self.dropout = nn.Dropout(p=0.5)  # Dropout layer with 50% probability

        for layer in range(layers - 1):
            if layer == 0:
                self.fc_layers.append(nn.Linear(input_size, layer_size))
            else:
                self.fc_layers.append(nn.Linear(layer_size, layer_size))
        
        # Output layer
        self.fc_layers.append(nn.Linear(layer_size, num_classes))

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Aplanar la onda de audio
        x = (x - x.mean()) / x.std()  # Normalización
        for fc in self.fc_layers[:-1]:  # Skip last layer
            x = self.dropout(F.relu(fc(x)))  # ReLU + Dropout
        x = self.fc_layers[-1](x)  # Output layer (no activation)
        return x

In [11]:
import gc

def train_model_architecture(model, criterion, optimizer, epochs, train_loader, val_loader, device):
    
    model_name = f"{len(model.fc_layers)} capas de {model.fc_layers[0].out_features} nodos"
    
    wandb.init(
        name = model_name,
        config = {
            "learning_rate": 0.0005,
            "epochs": epochs,
            "model": model,
        })
            
    for epoch in range(epochs):
        # Training loop
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)
        print(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f}")

        # Validation step
        model.eval()
        correct = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                predicted = outputs.argmax(dim=1)
                correct += (predicted == labels).sum().item()
                val_loss += loss.item()
        
            val_loss /= len(val_loader)
        val_accuracy = 100 * (correct / len(val_dataset))
        print(f"\nValidation loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}%")
        
        wandb.log({"val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "train_loss": train_loss,
                "epoch": epoch})

        # Clear cache and collect garbage
        gc.collect()
        torch.cuda.empty_cache()

    wandb.finish()
    return val_loss, val_accuracy # Return the final best model


In [None]:
input_size = samplerate * 5

num_classes = len(dataset.classes)  # Número de clases (géneros musicales)

# Define nuevos valores de hiperparámetros para experimentar
layers_list = [2, 3, 5, 7]       # Pruebas con más capas
sizes_list = [32, 64, 128, 256]  # Pruebas con más unidades en cada capa

best_val_loss = float("inf")
best_val_accuracy = 0
learning_rate = 0.0005
weight_decay = 1e-4
best_model = None

# Loop de experimentación
for layers in layers_list:
    for size in sizes_list:
        print(f"Experimentando con {layers} capas y {size} unidades por capa...")
        
        # Inicializar el modelo con la configuración actual
        model = ExperimentNN(input_size,
                             num_classes,
                             size,
                             layers).to(device)

        # Definir el criterio y optimizador con los pesos de las clases
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

        # Define el scheduler
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

        # Llama a train_model pasándole el scheduler
        validation_loss, validation_accuracy = train_model_architecture(
            model, criterion, optimizer, epochs=30,
            train_loader=train_loader, val_loader=val_loader, device=device
        )

        print(f"Loss de validación para {layers} capas y {size} unidades: {validation_loss}")
        print(f"Precisión de validación: {validation_accuracy:.2f}%")

        # Guardar el modelo con mejor precisión y menor pérdida
        if validation_accuracy > best_val_accuracy or (validation_accuracy == best_val_accuracy and validation_loss < best_val_loss):
            best_val_loss = validation_loss
            best_val_accuracy = validation_accuracy
            best_model = model  # Store the model with the best configuration
            print(f"Nuevo mejor modelo encontrado con {layers} capas y {size} unidades.")

print(f"Mejor modelo: {len(best_model.fc_layers)} capas y {best_model.fc_layers[0].out_features} unidades, con precisión de validación de {best_val_accuracy:.2f}% y pérdida de validación de {best_val_loss:.4f}")


In [None]:
# Evaluación final en el conjunto de prueba
print("Evaluando el mejor modelo en el conjunto de prueba...")
best_model.eval()
test_loss = 0.0
correct = 0
total = len(test_dataset.dataset)
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = best_model(inputs)

        loss = criterion(outputs, labels)
        test_loss += loss.item()

        predicted = outputs.argmax(dim=1)
        correct += (predicted == labels).sum().item()

test_loss /= len(test_dataset)
test_accuracy = 100 * correct / total

print(f"Loss en el conjunto de prueba: {test_loss:.4f}")
print(f"Precisión en el conjunto de prueba: {test_accuracy:.2f}%")

### 4. Clasificación

In [21]:
class CNN(nn.Module):
    def __init__(self, input_channels, num_classes, conv_layers_config):
        super(CNN, self).__init__()

        # Initialize the list to hold the convolutional layers
        self.conv_layers = nn.ModuleList()

        # Initialize the number of input channels for the first layer
        in_channels = input_channels

        # Dynamically create convolutional layers based on the configuration
        for (out_channels, kernel_size, stride, padding) in conv_layers_config:
            self.conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding))
            in_channels = out_channels  # Update in_channels for the next layer

        # Calculate the size after convolution and pooling to define the fully connected layer
        # Assuming pooling reduces the size by a factor of 2 at each layer
        self.pool = nn.MaxPool2d(2, 2)
        
        # Get the final feature map size (after all conv and pooling layers)
        self.final_feature_map_size = self._get_conv_output_size(conv_layers_config)
        
        # Define 9 fully connected layers with 256 nodes each
        self.fc_layers = nn.ModuleList()
        self.fc_layers.append(nn.Linear(self.final_feature_map_size, 256))  # First fully connected layer
        for _ in range(4):  # Add 2 more fully connected layers with 256 nodes
            self.fc_layers.append(nn.Linear(256, 256))
        
        # Output layer
        self.fc_out = nn.Linear(256, num_classes)  # Output layer for classification
        
    def _get_conv_output_size(self, conv_layers_config):
        # Sample input size (height x width) to calculate the final feature map size
        # You can adjust these values based on your actual input size
        height = 201  # Replace with your actual input height
        width = 552   # Replace with your actual input width
        
        # Apply each convolutional and pooling layer
        for (out_channels, kernel_size, stride, padding) in conv_layers_config:
            height = (height + 2 * padding - kernel_size) // stride + 1
            width = (width + 2 * padding - kernel_size) // stride + 1
            height = height // 2  # Max pooling halves the height
            width = width // 2    # Max pooling halves the width
        
        # Return the total number of features after all convolutional and pooling layers
        return out_channels * height * width

    def forward(self, x):
        # Apply each convolutional layer followed by ReLU and pooling
        for conv_layer in self.conv_layers:
            x = F.relu(conv_layer(x))
            x = self.pool(x)
        
        # Flatten the output before passing it to the fully connected layers
        x = x.view(x.size(0), -1)  # Flatten the feature map

        # Apply the fully connected layers
        for fc in self.fc_layers:
            x = F.relu(fc(x))
        
        # Output layer (classification)
        x = self.fc_out(x)

        return x


In [18]:
import gc

def train_model_convolutional(model, criterion, optimizer, epochs, train_loader, val_loader, device, model_number):
    
    model_name = f"Test convolucional {model_number}"
    
    wandb.init(
        name = model_name,
        config = {
            "learning_rate": 0.0005,
            "epochs": epochs,
            "model": model,
        })
            
    for epoch in range(epochs):
        # Training loop
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

            train_loss /= len(train_loader)

        print(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f}")

        # Validation step
        model.eval()
        correct = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                predicted = outputs.argmax(dim=1)
                correct += (predicted == labels).sum().item()
                val_loss += loss.item()

                val_loss /= len(val_loader)

        val_accuracy = 100 * (correct / len(val_dataset))
        print(f"\nValidation loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}%")
        
        wandb.log({
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "train_loss": train_loss,
                "epoch": epoch})

        # Clear cache and collect garbage
        gc.collect()
        torch.cuda.empty_cache()

    wandb.finish()
    return val_loss, val_accuracy # Return the final best model

def test_multiple_configurations(train, val, test, criterion, device, num_epochs=10):
    """
    Test multiple model configurations, applying the best gradients for evaluation,
    and save the best model based on accuracy or validation loss.
    """
    # Different configurations for the CNN model
    configurations = [
        [(32, 3, 1, 1), (64, 3, 1, 1), (128, 3, 1, 1)],  # Configuration 1
        #[(32, 5, 1, 2), (64, 5, 1, 2)],                  # Configuration 2
        #[(16, 3, 1, 1), (32, 3, 1, 1), (64, 3, 1, 1)],   # Configuration 3
        [(64, 3, 1, 1), (128, 3, 1, 1), (256, 3, 1, 1)]  # Configuration 4
    ]
    
    best_model = None
    best_accuracy = 0.0
    best_loss = float("inf")
    
    for idx, conv_layers_config in enumerate(configurations):
        print(f"\nTesting Configuration {idx + 1} with convolutional layers: {conv_layers_config}")
        
        # Initialize the model with the current configuration
        model = CNN(input_channels=1, num_classes=10, conv_layers_config=conv_layers_config)
        model.to(device)
        
        # Initialize optimizer (e.g., Adam) and train the model
        optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

        criterion = nn.CrossEntropyLoss()
        
        # Train the model on the training set
        val_loss, val_accuracy = train_model_convolutional(model, criterion, optimizer, num_epochs, train, val,device,idx + 1)
        
        # Print the results for the current configuration
        print(f"Configuration {idx + 1} Test Loss: {val_loss:.4f}, Test Accuracy: {val_accuracy:.2f}%")
        
        # Save the best model based on accuracy; if accuracy is the same, use loss as the tie-breaker
        if val_accuracy > best_accuracy or (val_accuracy == best_accuracy and val_loss < best_loss):
            best_accuracy = val_accuracy
            best_loss = test_loss
            best_model = model
            print(f"New best model found with accuracy: {best_accuracy:.2f}% and loss: {best_loss:.4f}")

    print(f"\nBest Model Test Accuracy: {best_accuracy:.2f}% with Loss: {best_loss:.4f}, model: {best_model}")
    return best_loss, best_model

In [22]:
criterion = nn.CrossEntropyLoss()
# Assume 'test_loader' is the DataLoader for your test set, and 'criterion' is the loss function (e.g., CrossEntropyLoss)
best_model = test_multiple_configurations(train_spectogram, val_spectogram, test_spectogram, criterion, device, 20)

Epoch 14/20 - Train loss: 11.1216


KeyboardInterrupt: 

In [None]:
print(f"Loss: {best_model[0]} - Model: {best_model[1]}")

In [None]:
cnn_best_model = best_model[1]

# Evaluación final en el conjunto de prueba
print("Evaluando el mejor modelo en el conjunto de prueba...")
cnn_best_model.eval()
test_loss = 0.0
correct = 0
total = len(test_dataset.dataset)
with torch.no_grad():
    for inputs, labels in test_spectogram:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = cnn_best_model(inputs)

        loss = criterion(outputs, labels)
        test_loss += loss.item()

        predicted = outputs.argmax(dim=1)
        correct += (predicted == labels).sum().item()

test_loss /= len(test_dataset)
test_accuracy = 100 * correct / total

print(f"Loss en el conjunto de prueba: {test_loss:.4f}")
print(f"Precisión en el conjunto de prueba: {test_accuracy:.2f}%")

In [27]:
class different_act_CNN(nn.Module):
    def __init__(self, input_channels=1, num_classes=10, activation_function="relu"):
        super(different_act_CNN, self).__init__()

        # Define the convolutional layers (fixed as per example)
        self.conv_layers = nn.ModuleList([
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),  # Conv layer 1
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),              # Conv layer 2
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)              # Conv layer 3
        ])
        
        # Max Pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Calculate the output feature size after conv layers and pooling
        self.final_feature_map_size = self._get_conv_output_size()

        # Define fully connected layers (fixed as per example)
        self.fc_layers = nn.ModuleList()
        self.fc_layers.append(nn.Linear(self.final_feature_map_size, 256))  # First fully connected layer
        for _ in range(4):  # Add 2 more fully connected layers with 256 nodes
            self.fc_layers.append(nn.Linear(256, 256))

        # Capa de salida
        self.fc_out = nn.Linear(256, num_classes)

        # Set activation function
        self.activation = self._get_activation_function(activation_function)

    def _get_conv_output_size(self):
        # Sample input size (height x width)
        height, width = 201, 552  # Replace with actual input height and width

        # Pass through convolutional and pooling layers to determine final size
        for layer in self.conv_layers:
            height = (height + 2 * layer.padding[0] - layer.kernel_size[0]) // layer.stride[0] + 1
            width = (width + 2 * layer.padding[1] - layer.kernel_size[1]) // layer.stride[1] + 1
            height //= 2  # Max pooling halves the height
            width //= 2   # Max pooling halves the width

        # Output feature map size
        return height * width * 128

    def _get_activation_function(self, activation_function):
        # Map the string to the appropriate activation function
        if activation_function == "relu":
            return F.relu
        elif activation_function == "leaky_relu":
            return F.leaky_relu
        elif activation_function == "tanh":
            return torch.tanh
        elif activation_function == "sigmoid":
            return torch.sigmoid
        elif activation_function == "softmax":
            return F.softmax
        elif activation_function == "elu":
            return F.elu
        elif activation_function == "selu":
            return F.selu
        elif activation_function == "gelu":
            return F.gelu
        elif activation_function == "swish":
            return lambda x: x * torch.sigmoid(x)  # Swish activation: x * sigmoid(x)
        elif activation_function == "hard_sigmoid":
            return F.hardsigmoid
        else:
            raise ValueError(f"Unsupported activation function: {activation_function}")

    def forward(self, x):
        # Forward pass through conv layers with the selected activation function and pooling
        for conv_layer in self.conv_layers:
            x = self.activation(conv_layer(x))
            x = self.pool(x)
        
        # Flatten the output from conv layers
        x = x.view(x.size(0), -1)

        # Forward pass through fully connected layers with the selected activation function
        for fc_layer in self.fc_layers:
            x = self.activation(fc_layer(x))
        
        # Final output layer
        x = self.fc_out(x)

        return x

In [24]:
def train_model_activation(model, criterion, optimizer, epochs, train_loader, val_loader, device, act_name):
    
    model_name = f"Test de {act_name}"
    
    wandb.init(
        name = model_name,
        config = {
            "learning_rate": 0.0005,
            "epochs": epochs,
            "model": model,
        })
            
    for epoch in range(epochs):
        # Training loop
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        print(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f}")

        # Validation step
        model.eval()
        correct = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                predicted = outputs.argmax(dim=1)
                correct += (predicted == labels).sum().item()
                val_loss += loss.item()

            val_loss /= len(val_loader)

        val_accuracy = 100 * (correct / len(val_dataset))
        print(f"\nValidation loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}%")
        
        wandb.log({
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "train_loss": train_loss,
                "epoch": epoch})

        # Clear cache and collect garbage
        gc.collect()
        torch.cuda.empty_cache()

    wandb.finish()
    return val_loss, val_accuracy # Return the final best model


In [30]:
# Function to test different activation functions
def test_multiple_activation_functions(train, val, test, criterion, device, num_epochs=10):
    #activation_functions = ["relu", "leaky_relu", "tanh", "sigmoid", "softmax", "elu", "selu", "gelu", "swish", "hard_sigmoid"]
    activation_functions = ["elu"]
    
    best_model = None
    best_accuracy = 0.0
    best_loss = float("inf")
    
    for activation_function in activation_functions:
        print(f"\nTesting activation function: {activation_function}")
        
        model = different_act_CNN(input_channels=1, num_classes=10, activation_function=activation_function)
        model.to(device)
        
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        val_loss, val_accuracy = train_model_activation(model, criterion, optimizer, 20, train, val, device, activation_function)
        
        
        print(f"Activation function: {activation_function} Test Loss: {val_loss:.4f}, Test Accuracy: {val_accuracy:.2f}%")
        
        if val_accuracy > best_accuracy or (val_accuracy == best_accuracy and val_loss < best_loss):
            best_accuracy = val_accuracy
            best_loss = test_loss
            best_model = model
            print(f"New best model found with accuracy: {best_accuracy:.2f}% and loss: {best_loss:.4f}")

    print(f"\nBest Model Test Accuracy: {best_accuracy:.2f}% with Loss: {best_loss:.4f}")
    return best_loss, best_model

In [31]:
criterion = nn.CrossEntropyLoss()

# Assume 'test_loader' is the DataLoader for your test set, and 'criterion' is the loss function (e.g., CrossEntropyLoss)
best_loss, best_model = test_multiple_activation_functions(train_spectogram, val_spectogram, test_spectogram, criterion, device, 10)


Testing activation function: elu


Epoch 1/20 - Train loss: 11.9935

Validation loss: 2.0347, Accuracy: 22.2973%
Epoch 2/20 - Train loss: 1.9352

Validation loss: 1.7954, Accuracy: 31.7568%
Epoch 3/20 - Train loss: 1.3312

Validation loss: 1.9604, Accuracy: 35.1351%
Epoch 4/20 - Train loss: 1.0171

Validation loss: 2.1201, Accuracy: 35.8108%
Epoch 5/20 - Train loss: 0.6685

Validation loss: 2.7892, Accuracy: 40.5405%
Epoch 6/20 - Train loss: 0.4226

Validation loss: 3.2479, Accuracy: 34.4595%
Epoch 7/20 - Train loss: 0.4022

Validation loss: 3.3494, Accuracy: 38.5135%
Epoch 8/20 - Train loss: 0.3848

Validation loss: 4.0193, Accuracy: 34.4595%
Epoch 9/20 - Train loss: 0.3797

Validation loss: 3.3629, Accuracy: 40.5405%
Epoch 10/20 - Train loss: 0.2332

Validation loss: 3.0276, Accuracy: 37.8378%
Epoch 11/20 - Train loss: 0.3337

Validation loss: 2.9943, Accuracy: 38.5135%
Epoch 12/20 - Train loss: 0.1332

Validation loss: 5.0626, Accuracy: 31.7568%
Epoch 13/20 - Train loss: 0.0840

Validation loss: 4.7872, Accuracy: 36.

0,1
epoch,▁▁▂▂▂▃▃▄▄▄▅▅▅▆▆▇▇▇██
train_loss,█▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
val_accuracy,▁▄▅▆▇▅▇▅▇▆▇▄▆▅▅▇█▆▆▇
val_loss,▁▁▁▂▃▄▄▅▄▃▃▇▆█▅▅▄▆▆▇

0,1
epoch,19.0
train_loss,0.01361
val_accuracy,38.51351
val_loss,5.08911


Activation function: elu Test Loss: 5.0891, Test Accuracy: 38.51%
New best model found with accuracy: 38.51% and loss: 0.1220

Best Model Test Accuracy: 38.51% with Loss: 0.1220


In [32]:
# Evaluación final en el conjunto de prueba
print("Evaluando el mejor modelo en el conjunto de prueba...")
best_model.eval()
test_loss = 0.0
correct = 0
total = len(test_dataset.dataset)
with torch.no_grad():
    for inputs, labels in test_spectogram:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = best_model(inputs)

        loss = criterion(outputs, labels)
        test_loss += loss.item()

        predicted = outputs.argmax(dim=1)
        correct += (predicted == labels).sum().item()

test_loss /= len(test_dataset)
test_accuracy = 100 * correct / total

print(f"Loss en el conjunto de prueba: {test_loss:.4f}")
print(f"Precisión en el conjunto de prueba: {test_accuracy:.2f}%")

Evaluando el mejor modelo en el conjunto de prueba...
Loss en el conjunto de prueba: 0.2564
Precisión en el conjunto de prueba: 18.86%


In [33]:
class DifferentOptCNN(nn.Module):
    def __init__(self, optimizer_class, scheduler_fn, learning_rate, scheduler_lr, input_channels=1, num_classes=10):
        super(DifferentOptCNN, self).__init__()

        # Capas convolucionales con ajustes en kernel_size y stride para evitar reducción excesiva
        self.conv_layers = nn.ModuleList([
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),  # Conv layer 1
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),              # Conv layer 2
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)              # Conv layer 3
        ])
        
        # Max Pooling layer con tamaño de kernel ajustado
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Calcular el tamaño final después de las convoluciones
        self.final_feature_map_size = self._get_conv_output_size(input_channels)

        # Capas densas (sin cambios)
        self.fc_layers = nn.ModuleList()
        self.fc_layers.append(nn.Linear(self.final_feature_map_size, 256))  # First fully connected layer
        for _ in range(4):  # Add 2 more fully connected layers with 256 nodes
            self.fc_layers.append(nn.Linear(256, 256))

        # Capa de salida
        self.fc_out = nn.Linear(256, num_classes)

        # Definir función de activación ELU
        self.activation = F.elu

        # Configuración del optimizador y scheduler
        self.optimizer = optimizer_class(self.parameters(), lr=learning_rate)
        self.scheduler = scheduler_fn(self.optimizer, scheduler_lr)

    def _get_conv_output_size(self, input_channels):
        # Tamaño de entrada (ajustar si es necesario)
        height, width = 201, 552  # Reemplaza con el tamaño correcto de entrada si cambió

        # Cálculo del tamaño de salida
        for layer in self.conv_layers:
            height = (height + 2 * layer.padding[0] - layer.kernel_size[0]) // layer.stride[0] + 1
            width = (width + 2 * layer.padding[1] - layer.kernel_size[1]) // layer.stride[1] + 1
            height //= 2  # Max pooling reduce la altura a la mitad
            width //= 2   # Max pooling reduce el ancho a la mitad

        return height * width * 128  # Tamaño de la característica de salida final


    def forward(self, x):
        # Pasar por las capas convolucionales con la función de activación ELU y pooling
        for conv_layer in self.conv_layers:
            x = self.activation(conv_layer(x))
            x = self.pool(x)
        
        # Aplanar la salida de las capas convolucionales antes de pasar por las densas
        x = x.view(x.size(0), -1)

        # Pasar por las capas densas con la función de activación ELU
        for fc_layer in self.fc_layers:
            x = self.activation(fc_layer(x))
        
        # Capa de salida final
        x = self.fc_out(x)

        return x

In [35]:
def train_model_opt_sched_lr(model, criterion, optimizer, epochs, train_loader, val_loader, device, model_name):
    
    wandb.init(
        name = model_name,
        config = {
            "learning_rate": 0.0005,
            "epochs": epochs,
            "model": model,
        })
            
    for epoch in range(epochs):
        # Training loop
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        print(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f}")

        # Validation step
        model.eval()
        correct = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                predicted = outputs.argmax(dim=1)
                correct += (predicted == labels).sum().item()
                val_loss += loss.item()

            val_loss /= len(val_loader)

        val_accuracy = 100 * (correct / len(val_dataset))
        print(f"\nValidation loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}%")
        
        wandb.log({
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "train_loss": train_loss,
                "epoch": epoch})

        # Clear cache and collect garbage
        gc.collect()
        torch.cuda.empty_cache()

    wandb.finish()
    return val_loss, val_accuracy # Return the final best model


In [37]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ExponentialLR, ReduceLROnPlateau
import copy

def grid_search_model_configurations(train, val, test, criterion, device, num_epochs=10):
    # Hiperparámetros a probar
    optimizers = {
        "SGD": optim.SGD,
        "Adam": optim.Adam,
        "RMSprop": optim.RMSprop
    }
    schedulers = {
        "StepLR": lambda opt, lr: StepLR(opt, step_size=5, gamma=lr),
        "ExponentialLR": lambda opt, lr: ExponentialLR(opt, gamma=lr),
        "ReduceLROnPlateau": lambda opt, lr: ReduceLROnPlateau(opt, mode='min', patience=3, factor=lr)
    }
    learning_rates = [0.001, 0.0005, 0.01]

    best_model = None
    best_accuracy = 0.0
    best_loss = float("inf")
    best_config = {}


    for optimizer_name, optimizer_class in optimizers.items():
        for scheduler_name, scheduler_fn in schedulers.items():
            for lr in learning_rates:
                print(f"\nTesting config: Optimizer={optimizer_name}, Scheduler={scheduler_name}, Learning Rate={lr}")

                # Inicializar el modelo con la función de activación fija
                model = DifferentOptCNN(
                    optimizer_class=optimizer_class,
                    scheduler_fn=scheduler_fn,
                    learning_rate=lr,
                    scheduler_lr=lr,
                    input_channels=20,  # Ajuste según el tamaño de los datos
                    num_classes=10
                )
                model.to(device)

                model_name = f"Opt: {optimizer_name}, Sch: {scheduler_name}, Lr: {lr}"

                # Entrenar el modelo
                val_loss, val_accuracy = train_model_opt_sched_lr(model, criterion, model.optimizer, num_epochs, train, val, device, model_name)

                print(f"Config: Optimizer={optimizer_name}, Scheduler={scheduler_name}, LR={lr} | Test Loss: {test_loss:.4f}, Test Accuracy: {val_accuracy:.2f}%")

                # Guardar el mejor modelo según la precisión y la pérdida en el conjunto de prueba
                if val_accuracy > best_accuracy or (val_accuracy == best_accuracy and val_loss < best_loss):
                    best_accuracy = val_accuracy
                    best_loss = val_loss
                    best_model = copy.deepcopy(model)
                    best_config = {
                        "optimizer": optimizer_name,
                        "scheduler": scheduler_name,
                        "learning_rate": lr
                    }
                    print(f"New best model found with accuracy: {best_accuracy:.2f}% and loss: {best_loss:.4f}")

    print(f"\nBest Model Test Accuracy: {best_accuracy:.2f}% with Loss: {best_loss:.4f}")
    print(f"Best Configuration: {best_config}")
    return best_loss, best_model, best_config

In [38]:
# Ejemplo de uso:
_, best_model, best_config = grid_search_model_configurations(train_loader, val_loader, test_loader, criterion, device, num_epochs=10)


Testing config: Optimizer=SGD, Scheduler=StepLR, Learning Rate=0.001


RuntimeError: Given input size: (32x1x110250). Calculated output size: (32x0x55125). Output size is too small