Universidad Torcuato Di Tella

Licenciatura en Tecnología Digital\
**Tecnología Digital VI: Inteligencia Artificial**

Integrantes: Isabel Núñez, Camilo Suárez y Valentina Vitetta


In [None]:
import gc
import os
import numpy as np
import torch
import torch.nn as nn
import torchaudio
import torchaudio.transforms as tt
from google.colab import drive
from torch.utils.data import DataLoader, Dataset, random_split


# TP3: Encodeador de música

## Conectamos la notebook a gdrive y seteamos data_dir con el path a los archivos.





Modificar data_dir con el path adecuado que lleve a la carpeta genres

In [None]:
drive.mount('/content/drive')
data_dir='//content/drive/MyDrive/tp3tdvi/genres_5sec/'
list_files=os.listdir(data_dir)
classes=[]
for file in list_files:
  name='{}/{}'.format(data_dir,file)
  if os.path.isdir(name):
    classes.append(file)

## Creamos una clase para manejar los audios

In [None]:
samplerate=22050
def parse_genres(fname):
    parts = fname.split('/')[-1].split('.')[0]
    return parts

class MusicDataset(Dataset):
    def __init__(self, root):
        super().__init__()
        self.root = root
        self.files =[]
        for c in classes:
          self.files = self.files + [fname for fname in os.listdir(os.path.join(root,c)) if fname.endswith('.wav')]
        self.classes = list(set(parse_genres(fname) for fname in self.files))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, i):
        fname = self.files[i]
        genre = parse_genres(fname)
        fpath = os.path.join(self.root,genre, fname)
        class_idx = self.classes.index(genre)
        audio = torchaudio.load(fpath)[0]

        return audio, class_idx

dataset = MusicDataset(data_dir)

## Dividimos el conjunto de datos en entrenamiento, validación y test

In [None]:
random_seed = 42 # Semilla para reproducibilidad
torch.manual_seed(random_seed)
val_size = 100
test_size = 100
train_size = len(dataset) - val_size - test_size

train_ds, val_ds, test_ds = random_split(dataset, [train_size, val_size, test_size])
len(train_ds),len(val_ds),len(test_ds)

## Creamos los DataLoaders

In [None]:
batch_size = 20
num_workers = 2

train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
valid_dl = DataLoader(val_ds, batch_size*2, num_workers=num_workers, pin_memory=True)
test_dl = DataLoader(test_ds, 1, shuffle=True, num_workers=num_workers, pin_memory=True)

## Creamos el modelo

In [None]:
#Función de activación utilizada que no está en pytorch
class ShiftedReLU(nn.Module):
    def __init__(self, shift=0.3):
        super(ShiftedReLU, self).__init__()
        self.shift = shift

    def forward(self, x):
        return torch.relu(x + self.shift) - self.shift


model = nn.Sequential( # dropout0.1cnn-0.2fc
    nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # Capa convolución 1
    nn.ELU(),
    nn.MaxPool2d(kernel_size=2, stride=2),                 # Reducir dimensión
    nn.Dropout(0.1),

    nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # Capa convolución 2
    ShiftedReLU(),
    nn.MaxPool2d(kernel_size=2, stride=2),                 # Otra reducción
    nn.Dropout(0.1),

    nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),# Capa convolución 3
    nn.ELU(),
    nn.MaxPool2d(kernel_size=2, stride=2),                 # Última reducción
    nn.Dropout(0.1),

    nn.Flatten(),                                          # Flatten para las capas fully connected

    nn.Linear(128 * (201 // 8) * (552 // 8), 256),         # Capa fully connected 1
    ShiftedReLU(),
    nn.Dropout(0.2),
    nn.Linear(256, 256),                                   # Capa fully connected 2
    nn.ELU(),
    nn.Dropout(0.2),
    nn.Linear(256, 256),                                   # Capa fully connected 3
    ShiftedReLU(),
    nn.Dropout(0.2),
    nn.Linear(256, 128),                                  # Capa fully connected 4
    nn.ELU(),
    nn.Dropout(0.2),

    nn.Linear(128, 10),                                    # Capa de salida
)

## Configuramos el dispositivo en el que se entrenará el modelo

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model.to(device)
print(model)

## Seteamos algunos hiperparámetros y comenzamos a entrenar

In [None]:
# Hiperparámetros generales.
learning_rate = 0.0005
num_epochs = 100

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

best_val_accuracy = 0.0

torch.cuda.empty_cache()
gc.collect() #importante para ir liberando memoria ram

# Early stopping
patience = 20  # Número de épocas sin mejora para dejar de entrenar
epochs_without_improvement = 0

for epoch in range(num_epochs):
    losses = []

    # Train
    model.train()
    for wav, genre_index in train_dl:
        optimizer.zero_grad()  # Clear gradients

        wav=wav.to(device)
        genre_index =torch.as_tensor(genre_index).to(device)
        specgram = tt.Spectrogram().to(device)
        specgram_wav = specgram(wav)  # Calcula el espectrograma de `wav`

        # Forward
        out = model(specgram_wav)

        loss = loss_function(out.squeeze(), genre_index)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss.item())

        del wav #importante para ir liberando memoria ram
        del genre_index #importante para ir liberando memoria ram
        del specgram #importante para ir liberando memoria ram
        del specgram_wav #importante para ir liberando memoria ram
        del loss #importante para ir liberando memoria ram
        del out  #importante para ir liberando memoria ram
        torch.cuda.empty_cache()  #importante para ir liberando memoria ram
        gc.collect() #importante para ir liberando memoria ram

    train_loss = np.mean(losses)

    print('Epoch: [%d/%d], Train loss: %.4f' % (epoch+1, num_epochs, train_loss))


    # Validation
    model.eval()
    y_true = []
    y_pred = []
    losses = []
    correct = 0
    for wav, genre_index in valid_dl:
        wav = wav.to(device)
        genre_index = genre_index.to(device)

        specgram = tt.Spectrogram().to(device)
        specgram_wav = specgram(wav)

        out = model(specgram_wav)

        loss = loss_function(out.squeeze(), genre_index)

        losses.append(loss.item())

        pred = out.argmax(dim=-1).flatten()

        # append labels and predictions
        correct += pred.eq(genre_index).sum().item()
        y_true.extend(genre_index)
        y_pred.extend(pred)

        del wav #importante para ir liberando memoria ram
        del genre_index #importante para ir liberando memoria ram
        del specgram #importante para ir liberando memoria ram
        del specgram_wav #importante para ir liberando memoria ram
        del loss #importante para ir liberando memoria ram
        del out  #importante para ir liberando memoria ram
        torch.cuda.empty_cache()  #importante para ir liberando memoria ram
        gc.collect() #importante para ir liberando memoria ram

    accuracy = correct / len(valid_dl.dataset)
    valid_loss = np.mean(losses)
    print('Epoch: [%d/%d], Valid loss: %.4f, Valid accuracy: %.4f' % (epoch+1, num_epochs, valid_loss, accuracy))

    # Guardamos el modelo
    if accuracy > best_val_accuracy:
        print(f'Guardando el modelo en la epoch {epoch + 1}')
        torch.save(model.state_dict(), 'best_model.ckpt')
        best_val_accuracy = accuracy
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement >= patience: # Early stopping
        print(f"Early stopping en la epoch {epoch+1} debido a que no hubo mejor accuracy en validación por {patience} epochs.")
        break

## Evaluamos el modelo con el conjunto de test

In [None]:
# Load the best model
S = torch.load('best_model.ckpt')
model.load_state_dict(S)
print('modelo cargado!')

# Run evaluation
model.eval()
y_true = []
y_pred = []
correct = 0
with torch.no_grad():
    for wav, genre_index in test_dl:
        wav = wav.to(device)
        genre_index = genre_index.to(device)

        specgram = tt.Spectrogram().to(device)
        specgram_wav = specgram(wav)

        out = model(specgram_wav)

        pred = out.argmax(dim=-1).flatten()

        # append labels and predictions
        correct += pred.eq(genre_index).sum().item()
        y_true.extend(genre_index)
        y_pred.extend(pred)

accuracy = correct / len(test_dl.dataset)
print(f'Accuracy en testing: {accuracy}')