# 3. Distinguir géneros musicales utilizando modelos de aprendizaje

## Importaciones

In [1]:
import torch
import torchaudio

import numpy as np
import os
import csv

# import pandas as pd
# import torch
# from torch.utils.data import DataLoader, TensorDataset
# from torch import nn, optim
# import torch.nn.functional as F
# from sklearn.preprocessing import LabelEncoder, StandardScaler
# from sklearn.metrics import accuracy_score, f1_score

____

## 3.2. Modelos de aprendizaje basados en espectograma

In [2]:
BATCH_SIZE = 32
LEARNING_RATE = 0.001
EPOCHS = 10

### 3.2.1. Creación del corpus de espectogramas

In [3]:
# Clase para la creación del corpus

class SpectrogramDataset(torch.utils.data.Dataset):

    def __init__(self, annotations_file, audios_dir, transformations=None):
        super().__init__
        self.annotations = self._leer_csv(annotations_file)
        self.audios_dir = audios_dir
        self.transformations = transformations
        
    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        signal, sr = self._recupera_signal(index)
        label = self._recupera_label(index)
        if self.transformations:
            signal = self.transformations(signal, sr)
        return signal, label
        
    def mapa_label_classes(self):
        pares=set([(id_label,label) for [_,id_label,label] in self.annotations])
        mapa={}
        for (a,b) in pares:
            mapa[a]=b
        return mapa
    
    def label_class(self, index):
        return self.annotations[index][2]
    
    def get_audio_file(self, index): 
        name = self.annotations[index][0]
        return os.path.join(name)

    def _leer_csv(self,annotations_file):
        with open(annotations_file, 'r', encoding='utf-8') as f:
            lector = csv.reader(f)
            next(lector) 
            data = [ (file_name, classID, classLabel)  
                for file_name, classID, classLabel in lector]
        return data

    def _recupera_signal(self,index=0):
        audio_file=self.get_audio_file(index)        
        signal, sr = torchaudio.load(audio_file)
        return signal, sr
    
    def _recupera_label(self, index):
        return torch.tensor(int(self.annotations[index][1]))

In [4]:
def get_spectrogram(signal, sr):
    spectrogram_transform = torchaudio.transforms.Spectrogram(power=2)
    spec_amplitud_to_db_transform = torchaudio.transforms.AmplitudeToDB()
    spect = spectrogram_transform(signal)
    spect=spec_amplitud_to_db_transform(spect)
    return spect

In [5]:
ccmusic_train = SpectrogramDataset(annotations_file='./ccmusic/train/annotations.csv', 
                                    audios_dir='./ccmusic/train/audios', 
                                    transformations=get_spectrogram)
ccmusic_train_dataloader = torch.utils.data.DataLoader(ccmusic_train, 
                                                       batch_size=BATCH_SIZE, 
                                                       shuffle=True)

ccmusic_validation = SpectrogramDataset(annotations_file='./ccmusic/validation/annotations.csv',
                                        audios_dir='./ccmusic/validation/audios',
                                        transformations=get_spectrogram)
ccmusic_validation_dataloader = torch.utils.data.DataLoader(ccmusic_validation, 
                                                            batch_size=BATCH_SIZE, 
                                                            shuffle=True)

ccmusic_test = SpectrogramDataset(annotations_file='./ccmusic/test/annotations.csv',
                                    audios_dir='./ccmusic/test/audios',
                                    transformations=get_spectrogram)
ccmusic_test_dataloader = torch.utils.data.DataLoader(ccmusic_test, 
                                                      batch_size=BATCH_SIZE, 
                                                      shuffle=True)

In [6]:
ccmusic2_train = SpectrogramDataset(annotations_file='./ccmusic2/train/annotations.csv',
                                    audios_dir='./ccmusic2/train/audios',
                                    transformations=get_spectrogram)
ccmusic2_train_dataloader = torch.utils.data.DataLoader(ccmusic2_train,
                                                            batch_size=BATCH_SIZE,
                                                            shuffle=True)

ccmusic2_validation = SpectrogramDataset(annotations_file='./ccmusic2/validation/annotations.csv',
                                        audios_dir='./ccmusic2/validation/audios',
                                        transformations=get_spectrogram)
ccmusic2_validation_dataloader = torch.utils.data.DataLoader(ccmusic2_validation,
                                                            batch_size=BATCH_SIZE,
                                                            shuffle=True)

ccmusic2_test = SpectrogramDataset(annotations_file='./ccmusic2/test/annotations.csv',
                                    audios_dir='./ccmusic2/test/audios',
                                    transformations=get_spectrogram)
ccmusic2_test_dataloader = torch.utils.data.DataLoader(ccmusic2_test,
                                                            batch_size=BATCH_SIZE,
                                                            shuffle=True)

In [7]:
espectograma, etiqueta = ccmusic_train[0]
print(espectograma.shape)
print(etiqueta)

SPECTOGRAM_H = 201
SPECTOGRAM_W = 3308

torch.Size([1, 201, 3308])
tensor(1)


### 3.2.2. Definición del modelo CNN para clasificación de espectogramas

In [8]:
def calcula_tam_capa_lineal1(spect_height, spect_width):

    def pool_size(size, kernel_size, stride):
        return (size - kernel_size) // stride + 1
    
    def conv_size(size, kernel_size, stride, padding):
        return (size + 2*padding - kernel_size) // stride + 1

    height_salida = spect_height
    width_salida = spect_width

    # Bloque convolucional: kernel_size=3, stride=1, padding=1

    for _ in range(1, 4):
        height_salida = pool_size(conv_size(height_salida, 3, 1, 1), 2, 2)
        width_salida = pool_size(conv_size(width_salida, 3, 1, 1), 2, 2)

    return height_salida * width_salida

In [9]:
class CNNModel(torch.nn.Module):

    def __init__(self, spect_height, spect_width, num_labels):
        super(CNNModel, self).__init__()

        # Definir bloque convolucional
        self.conv_block1 = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=1, out_channels=32, 
                            kernel_size=3, 
                            stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.25),
            torch.nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_block2 = torch.nn.Sequential( 
            torch.nn.Conv2d(in_channels=32, out_channels=64,
                            kernel_size=3, 
                            stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_block3 = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=64, out_channels=128,
                            kernel_size=3, 
                            stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2)
        )

        height_width_salida = calcula_tam_capa_lineal1(spect_height, spect_width)

        self.mlp_block = torch.nn.Sequential(
            torch.nn.Flatten(1, -1),
            torch.nn.Linear( 128 * height_width_salida, 512), 
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),  
            torch.nn.Linear(512, num_labels if num_labels > 2 else 1),    
            torch.nn.Softmax(dim=1) if num_labels > 2 else torch.nn.Sigmoid()
        )

    def forward(self, input):
        x = self.conv_block1(input)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        predictions = self.mlp_block(x)
        return predictions

In [12]:
def train_single_epoch(model, dataloader, loss_fn, optimizer):
    model.train()
    for inputs, targets in dataloader:
        print('.', end='')
        predictions = model(inputs)
        loss = loss_fn(predictions, targets.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return(loss.item())


def train(model, dataloader, loss_fn, optimizer, epochs):
    print("Inicio del entrenamiento")
    for i in range(epochs):
        print(f"Época {i+1} " ,end='') 
        print('.', end='')
        loss=train_single_epoch(model, dataloader, loss_fn, optimizer)
        print(f"\nLoss: {loss}")
    print("Fin del entrenamiento")

### 3.3.3. Entrenamiento e inferencia

In [None]:
modelo = CNNModel(spect_height=SPECTOGRAM_H, spect_width=SPECTOGRAM_W, num_labels=2)
loss_fn = torch.nn.BCELoss()
optimizer = torch.optim.Adam(modelo.parameters(), 
                             lr=LEARNING_RATE)
train(modelo, ccmusic_train_dataloader, loss_fn, optimizer, EPOCHS)

In [None]:
modelo = CNNModel(spect_height=SPECTOGRAM_H, spect_width=SPECTOGRAM_W, num_labels=9)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(modelo.parameters(), 
                             lr=LEARNING_RATE)
train(modelo, ccmusic2_train_dataloader, loss_fn, optimizer, EPOCHS)

____

## 3.1. Modelos de aprendizaje basados en extracción de características (*bag of songs*)

In [None]:
# Cargar los datasets
train_df = pd.read_csv('ccmusic/train/features.csv')
test_df = pd.read_csv('ccmusic/test/features.csv')
validation_df = pd.read_csv('ccmusic/validation/features.csv')

# Eliminar columnas no necesarias
train_df.drop(["audio_file"], axis=1, inplace=True)
test_df.drop(["audio_file"], axis=1, inplace=True)
validation_df.drop(["audio_file"], axis=1, inplace=True)
# Codificar las etiquetas
label_encoder = LabelEncoder()
train_df['label'] = label_encoder.fit_transform(train_df['label'])
test_df['label'] = label_encoder.transform(test_df['label'])
validation_df['label'] = label_encoder.transform(validation_df['label'])

# Separar las características y las etiquetas
X_train = train_df.drop('label', axis=1).values
y_train = train_df['label'].values
X_test = test_df.drop('label', axis=1).values
y_test = test_df['label'].values
X_val = validation_df.drop('label', axis=1).values
y_val = validation_df['label'].values

# Normalizar los datos
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_val = scaler.transform(X_val)

# Convertir a tensores de PyTorch
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)

# Crear DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [None]:
# Definición del modelo
class Classifier(nn.Module):
    def __init__(self, num_features, num_classes):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(num_features, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

num_features = X_train.shape[1]
num_classes = len(np.unique(y_train))
model = Classifier(num_features, num_classes)
print(model)

# Configuración de la función de pérdida y el optimizador
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

Classifier(
  (fc1): Linear(in_features=21, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=2, bias=True)
)


In [None]:
# Función de entrenamiento con early stopping
def train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs, patience=5):
    best_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        val_loss /= len(val_loader)

        print(f'Epoch {epoch + 1}, Training Loss: {loss.item()}, Validation Loss: {val_loss}')

        if val_loss < best_loss:
            best_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
        if patience_counter >= patience:
            print('Stopping early due to no improvement')
            break

train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs=50)

# Cargar el mejor modelo para evaluación
model.load_state_dict(torch.load('best_model.pth'))

Epoch 1, Training Loss: 0.19214338064193726, Validation Loss: 0.24798855185508728
Epoch 2, Training Loss: 0.09617410600185394, Validation Loss: 0.1489162395397822
Epoch 3, Training Loss: 0.06668626517057419, Validation Loss: 0.125129667421182
Epoch 4, Training Loss: 0.06241731345653534, Validation Loss: 0.11546906580527623
Epoch 5, Training Loss: 0.06514333933591843, Validation Loss: 0.10933728764454524
Epoch 6, Training Loss: 0.061320483684539795, Validation Loss: 0.10551605621973674
Epoch 7, Training Loss: 0.07443602383136749, Validation Loss: 0.10577847560246785
Epoch 8, Training Loss: 0.05509546771645546, Validation Loss: 0.09459401667118073
Epoch 9, Training Loss: 0.048574186861515045, Validation Loss: 0.09193692977229755
Epoch 10, Training Loss: 0.052683793008327484, Validation Loss: 0.09190963084499042
Epoch 11, Training Loss: 0.040546663105487823, Validation Loss: 0.08386822541554768
Epoch 12, Training Loss: 0.0399075448513031, Validation Loss: 0.08391079927484195
Epoch 13, Tra

<All keys matched successfully>

In [None]:
def evaluate_model(loader):
    model.eval()  # Poner el modelo en modo evaluación
    predictions, labels_list = [], []

    with torch.no_grad():
        for inputs, labels in loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            predictions.extend(predicted.cpu().numpy())  # Guarda las predicciones para calcular el F1
            labels_list.extend(labels.cpu().numpy())  # Guarda las etiquetas verdaderas

    accuracy = accuracy_score(labels_list, predictions) * 100
    f1 = f1_score(labels_list, predictions, average='weighted')  # Puedes cambiar 'weighted' por 'macro' o 'micro' según tus necesidades

    print(f'Accuracy on test set: {accuracy:.2f}%')
    print(f'F1 Score on test set: {f1:.2f}')

# Llamamos a la función de evaluación con el conjunto de prueba
evaluate_model(test_loader)

Accuracy on test set: 98.26%
F1 Score on test set: 0.98
