Universidad Torcuato Di Tella

Licenciatura en Tecnología Digital\
**Tecnología Digital VI: Inteligencia Artificial**

Integrantes: Isabel Núñez, Camilo Suárez y Valentina Vitetta


In [None]:
import gc
import IPython
import matplotlib.pyplot as plt
import numpy as np
import os
import soundfile as sf
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import torchaudio.transforms as tt
from google.colab import drive
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from torch.utils.data import DataLoader, Dataset, random_split
from pydub import AudioSegment


# TP4: Encodeador de música

# Ejercicio 1

## Conectamos la notebook a gdrive y seteamos data_dir con el path a los archivos.





Modificar data_dir con el path adecuado que lleve a la carpeta genres

In [None]:
drive.mount('/content/drive')
data_dir='//content/drive/MyDrive/tp3tdvi/genres_5sec/'
list_files=os.listdir(data_dir)
classes=[]
for file in list_files:
  name='{}/{}'.format(data_dir,file)
  if os.path.isdir(name):
    classes.append(file)

## Creamos una clase para manejar los audios

In [None]:
samplerate=22050
def parse_genres(fname):
    parts = fname.split('/')[-1].split('.')[0]
    return parts

class MusicDataset(Dataset):
    def __init__(self, root):
        super().__init__()
        self.root = root
        self.files =[]
        for c in classes:
          self.files = self.files + [fname for fname in os.listdir(os.path.join(root,c)) if fname.endswith('.wav')]
        self.classes = list(set(parse_genres(fname) for fname in self.files))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, i):
        fname = self.files[i]
        genre = parse_genres(fname)
        fpath = os.path.join(self.root,genre, fname)
        class_idx = self.classes.index(genre)
        audio = torchaudio.load(fpath)[0]

        return audio, class_idx

dataset = MusicDataset(data_dir)

## Dividimos el conjunto de datos en entrenamiento, validación y test

In [None]:
random_seed = 42 # Semilla para reproducibilidad
torch.manual_seed(random_seed)
val_size = 100
test_size = 100
train_size = len(dataset) - val_size - test_size

train_ds, val_ds, test_ds = random_split(dataset, [train_size, val_size, test_size])
len(train_ds),len(val_ds),len(test_ds)

## Creamos los DataLoaders

In [None]:
batch_size = 20
num_workers = 2

train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
valid_dl = DataLoader(val_ds, batch_size*2, num_workers=num_workers, pin_memory=True)
test_dl = DataLoader(test_ds, 1, shuffle=True, num_workers=num_workers, pin_memory=True)

## Creamos el modelo

In [None]:
# Para el ejercicio 2 de análisis, cambiar la cantidad de FM en el vector latente
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        # Encoder
        self.enc_conv1 = nn.Conv1d(1, 64, kernel_size=1024, padding=512, stride=512) # Espacio latente = 64 x 216

        # Decoder
        self.dec_conv1 = nn.ConvTranspose1d(64, 1, kernel_size=1194, padding=512, stride=512)

    def forward_encoder(self, x):
        x = self.enc_conv1(x)
        return x

    def forward_decoder(self, x):
        x = torch.tanh(self.dec_conv1(x))
        return x

    def forward(self, x):
        x = self.forward_encoder(x)
        x = self.forward_decoder(x)
        return x

## Configuramos el dispositivo en el que se entrenará el modelo

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

autoencoder = Autoencoder().to(device)
print(autoencoder)

## Seteamos algunos hiperparámetros y comenzamos a entrenar

In [None]:
learning_rate = 0.0005
num_epochs = 30
loss_function = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=learning_rate)

torch.cuda.empty_cache()
gc.collect() #importante para ir liberando memoria ram

for epoch in range(num_epochs):
    train_losses = []

    for wav, genre_index in train_dl:
        wav = wav.to(device)

        # Forward
        out = autoencoder(wav)

        loss = loss_function(out, wav)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())

        del wav #importante para ir liberando memoria ram
        del genre_index #importante para ir liberando memoria ram
        del loss #importante para ir liberando memoria ram
        del out  #importante para ir liberando memoria ram
        torch.cuda.empty_cache()  #importante para ir liberando memoria ram
        gc.collect() #importante para ir liberando memoria ram

    train_loss = np.mean(train_losses)

    print('Epoch: [%d/%d], Train loss: %.4f' % (epoch+1, num_epochs, train_loss))

    # Validation
    val_losses = []
    with torch.no_grad():
        for wav, genre_index in valid_dl:
            wav = wav.to(device)

            out = autoencoder(wav)

            loss = loss_function(out, wav)

            val_losses.append(loss.item())

            del wav #importante para ir liberando memoria ram
            del genre_index #importante para ir liberando memoria ram
            del loss #importante para ir liberando memoria ram
            del out  #importante para ir liberando memoria ram
            torch.cuda.empty_cache()  #importante para ir liberando memoria ram
            gc.collect() #importante para ir liberando memoria ram

    valid_loss = np.mean(val_losses)
    print('Epoch: [%d/%d], Valid loss: %.4f' % (epoch+1, num_epochs, valid_loss))

## Guardamos el modelo

In [None]:
torch.save(autoencoder.state_dict(), '64FM.ckpt')

## Con el modelo entrenado, probamos reconstruyendo alguna canción

In [None]:
# Cargar algun ejemplo del dataset
waveform, label = dataset[22]

# Mover el waveform al mismo dispositivo que el modelo (GPU o CPU)
waveform = waveform.to(device)

# Reproducir el audio original
IPython.display.Audio(waveform.cpu().numpy(), rate=samplerate)


In [None]:
# Ahora con
with torch.no_grad():
    outputs = autoencoder(waveform)  # La salida también estará en el mismo dispositivo
    latent_space = autoencoder.forward_encoder(waveform)

print(f'tamaño espacio latente: {latent_space.shape}')
# Mover las salidas a la CPU si están en la GPU para reproducción
outputs = outputs.cpu().numpy()

# Reproducir el audio descomprimido
IPython.display.Audio(outputs, rate=samplerate)

## Evaluamos el modelo con el conjunto de test

In [None]:
# Cargamos el modelo si es necesario
S = torch.load('64FM.ckpt')
autoencoder.load_state_dict(S)

# Test
autoencoder.eval()
test_losses = []
with torch.no_grad():
    for wav, genre_index in test_dl:
        wav = wav.to(device)

        out = autoencoder(wav)

        loss = loss_function(out, wav)

        test_losses.append(loss.item())

        del wav #importante para ir liberando memoria ram
        del genre_index #importante para ir liberando memoria ram
        del loss #importante para ir liberando memoria ram
        del out  #importante para ir liberando memoria ram
        torch.cuda.empty_cache()  #importante para ir liberando memoria ram
        gc.collect() #importante para ir liberando memoria ram

test_loss = np.mean(test_losses)
print(f"Test loss: {test_loss}")

# Ejercicio 2

## Cargamos los pesos de nuestro modelo entrenado

In [None]:
autoencoder.load_state_dict(torch.load('64FM.ckpt'))

In [None]:
num_samples = len(train_ds)  # Número de canciones
latent_vector_dim = 64 * 216  # Dimensión del vector latente aplanado

latent_vectors = np.zeros((num_samples, latent_vector_dim))

## Generamos los vectores latentes

In [None]:
i = 0
with torch.no_grad():
    for wav, genre_index in train_ds:
        wav = wav.to(device)

        # Forward_encoder
        latent_vector = autoencoder.forward_encoder(wav)

        latent_vectors[i, :] = latent_vector.flatten().cpu().numpy()

        i += 1

        del wav #importante para ir liberando memoria ram
        del genre_index #importante para ir liberando memoria ram
        del latent_vector  #importante para ir liberando memoria ram
        torch.cuda.empty_cache()  #importante para ir liberando memoria ram
        gc.collect() #importante para ir liberando memoria ram

## Método del codo

In [None]:
inertia = []
k_values = range(1, 14)

for k in k_values:
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(latent_vectors)
    inertia.append(kmeans.inertia_)

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(k_values, inertia, marker='o', linestyle='--')
plt.xlabel('Número de clusters (k)')
plt.ylabel('Inercia (SSE)')
plt.title('Método del Codo')
plt.grid(True)
plt.show()


## Clustering

In [None]:
n_clusters = 8  # Número de clusters

kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(latent_vectors)

#Cantidad de puntos en cada cluster
unique, counts = np.unique(clusters, return_counts=True)
for cluster_id, count in zip(unique, counts):
    print(f"Cluster {cluster_id}: {count} puntos")


## PCA

In [None]:
flattened_latent_vectors = latent_vectors.reshape(latent_vectors.shape[0], -1)

# Aplicar PCA
pca = PCA(n_components=2)
reduced_latent_vectors = pca.fit_transform(flattened_latent_vectors)

In [None]:
# Graficar los datos de PCA
plt.scatter(reduced_latent_vectors[:, 0], reduced_latent_vectors[:, 1], cmap='viridis')
plt.title(f'PCA')
plt.xlabel('Componente Principal 1')
plt.ylabel('Componente Principal 2')

plt.show()

## Buscamos los outliers del gráfico y los reproducimos

In [None]:
# EJ: Encontrar el índice del valor máximo en el primer componente (componente 0 en PCA)
max_index = np.argmax(reduced_latent_vectors[:, 0])

print(f"Índice del vector con el valor máximo en el primer componente: {max_index}")
print(f"Valor máximo en el primer componente: {reduced_latent_vectors[max_index, 0]}")

In [None]:
waveform, genre_index = train_ds[303]  # Usamos el indice obtenido para reproducir la canción
IPython.display.display(IPython.display.Audio(waveform.cpu().numpy(), rate=samplerate))

# Ejercicio 3

## Creamos una clase de dataset personalizado para cargar música nueva

In [None]:
# Dataset personalizado para cargar archivos WAV
class AudioDataset(Dataset):
    def __init__(self, audio_dir, transform=None):
        self.audio_dir = audio_dir
        self.audio_files = [os.path.join(audio_dir, f) for f in os.listdir(audio_dir) if f.endswith(".wav")]
        self.transform = transform

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        # Cargar el archivo de audio
        audio_path = self.audio_files[idx]
        audio = AudioSegment.from_file(audio_path).set_frame_rate(16000).set_channels(1)
        audio_data = np.array(audio.get_array_of_samples(), dtype=np.float32) / 32768.0  # Normalizar a [-1, 1]
        audio_data = torch.tensor(audio_data).unsqueeze(0)  # Convertir a tensor (1, L)

        if self.transform:
            audio_data = self.transform(audio_data)

        return audio_data, audio_path  # Devolvemos también el path para referencia si es necesario

## Cargamos audios nuevos

In [None]:
# Ruta al dataset
dataset_path = '/content/drive/MyDrive/Dataset/audios_wav'

# Crear dataset y DataLoader
val_dataset = AudioDataset(dataset_path)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)


## Reconstruimos y comparamos

In [None]:
from IPython.display import Audio, display

with torch.no_grad():
    for audio, audio_path in val_loader:
        audio = audio.to(device)

        # Pasar por el autoencoder
        reconstructed = autoencoder(audio)

        # Convertir de tensores a numpy
        original_audio = audio.squeeze().cpu().numpy()
        reconstructed_audio = reconstructed.squeeze().cpu().numpy()

        print(f"Reconstruyendo: {audio_path[0]}")

        # Reproducir el audio original
        print("Audio Original:")
        display(Audio(original_audio, rate=16000))

        # Reproducir el audio reconstruido
        print("Audio Reconstruido:")
        display(Audio(reconstructed_audio, rate=16000))



# Ejercicio 4

## Obtenemos los vectores latentes de dos canciones, que serán utilizados para generar música

In [None]:
with torch.no_grad():
    audio1, _ = dataset[555]  # Primera canción
    audio2, _ = dataset[443]  # Segunda canción

    audio1 = audio1.unsqueeze(0).to(device)  # Agregar batch dimension
    audio2 = audio2.unsqueeze(0).to(device)

    latent1 = autoencoder.forward_encoder(audio1)
    latent2 = autoencoder.forward_encoder(audio2)


## Probamos combinando los audios por mitades

In [None]:
midpoint = latent1.size(2) // 2

# Combinar la mitad inicial de la primera canción con la mitad final de la segunda
mixed_latent = torch.cat((latent1[:, :, :midpoint], latent2[:, :, midpoint:]), dim=2)


In [None]:
with torch.no_grad():
    mixed_audio = autoencoder.forward_decoder(mixed_latent)
    mixed_audio = mixed_audio.squeeze().cpu().numpy()  # Convertir a numpy para guardar o reproducir


In [None]:
# Reproducir el audio
display(Audio(mixed_audio, rate=16000))

# Guardar el audio combinado
sf.write("mixed_song.wav", mixed_audio, samplerate=16000)


## Probamos con interpolación

In [None]:
import IPython.display as ipd

# Interpolación lineal entre dos vectores latentes
alpha_values = torch.linspace(0, 1, steps=5)
interpolated_audios = []

with torch.no_grad():
    for alpha in alpha_values:
        # Generar interpolación en el espacio latente
        interpolated_latent = (1 - alpha) * latent1 + alpha * latent2

        # Decodificar el vector interpolado
        interpolated_audio = autoencoder.forward_decoder(interpolated_latent)
        interpolated_audios.append(interpolated_audio.squeeze().cpu().numpy())

# Reproducir cada audio generado
for i, audio in enumerate(interpolated_audios):
    print(f"Interpolación {i+1}/{len(interpolated_audios)}:")
    ipd.display(ipd.Audio(audio, rate=samplerate))

## Probamos alterando una dimensión del vector latente

In [None]:
# Alterar una dimensión específica del vector latente
latent_modified = latent1.clone()  # Copiar el vector latente original
latent_modified[:, :, 0] += 2.0  # Incrementar la primera dimensión en 2

# Reconstrucción
with torch.no_grad():
    modified_audio = autoencoder.forward_decoder(latent_modified)
    modified_audio = modified_audio.squeeze().cpu().numpy()

# Reproducir el audio modificado
print("Audio modificado tras alterar el vector latente:")
ipd.display(ipd.Audio(modified_audio, rate=samplerate))


## Probamos agregando ruido al vector latente

In [None]:
# Agregar ruido aleatorio al vector latente
noise_factor = 0.1
noise = torch.randn_like(latent1) * noise_factor

latent_with_noise = latent1 + noise  # Vector latente modificado con ruido

# Decodificación
with torch.no_grad():
    audio_with_noise = autoencoder.forward_decoder(latent_with_noise)
    audio_with_noise = audio_with_noise.squeeze().cpu().numpy()

# Reproducir el audio con ruido añadido
print("Audio generado con ruido añadido al vector latente:")
ipd.display(ipd.Audio(audio_with_noise, rate=samplerate))


## Probamos generando un vector latente aleatorio

In [None]:
# Generar un vector latente aleatorio con el mismo tamaño que los vectores latentes originales
random_latent = torch.randn_like(latent1).to(device)  # Asegúrate de que esté en el mismo dispositivo que el modelo

# Decodificación
with torch.no_grad():
    random_audio = autoencoder.forward_decoder(random_latent)
    random_audio = random_audio.squeeze().cpu().numpy()

# Reproducir el audio generado aleatoriamente
print("Audio generado completamente aleatorio:")
ipd.display(ipd.Audio(random_audio, rate=samplerate))  # Asegúrate de usar el mismo `samplerate`
