In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from collections import Counter
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns


# Parámetros globales
BATCH_SIZE = 32
NUM_CLASSES = 5
LEARNING_RATE = 0.001
NUM_EPOCHS = 10


In [None]:
# Transformaciones aplicadas a las imágenes
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Redimensionamos las imágenes a 128x128 píxeles
    transforms.ToTensor(),  # Convertimos las imágenes a tensores
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalizamos los canales RGB
])


In [None]:
class AudioDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir  # Directorio raíz de los datos
        self.transform = transform  # Transformaciones a aplicar
        self.samples = []  # Lista de rutas de las imágenes
        self.labels = []  # Lista de etiquetas correspondientes

        # Definir las clases y asignar índices numéricos
        self.classes = ['tones', 'chords', 'melodies', 'chord_melodies', 'superposed']
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}

        # Cargar los datos
        self._load_data()

    def _load_data(self):
        # Recorremos cada clase y cargamos las rutas de los archivos y sus etiquetas
        for cls_name in self.classes:
            cls_dir = os.path.join(self.data_dir, cls_name)
            if not os.path.isdir(cls_dir):
                continue
            for filename in os.listdir(cls_dir):
                if filename.endswith('_spectrogram.png'):
                    filepath = os.path.join(cls_dir, filename)
                    self.samples.append(filepath)
                    self.labels.append(self.class_to_idx[cls_name])

    def __len__(self):
        # Retorna el número total de muestras
        return len(self.samples)

    def __getitem__(self, idx):
        # Carga y retorna una muestra y su etiqueta correspondiente
        img_path = self.samples[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label


In [None]:
# Ruta al directorio 'data' desde el directorio actual ('notebooks/')
data_dir = '../data'  # Ajusta la ruta según la estructura de tus directorios

# Verificar el directorio de trabajo actual
print(f"Directorio de trabajo actual: {os.getcwd()}")

# Verificar que el directorio 'data' existe
if not os.path.isdir(data_dir):
    raise FileNotFoundError(f"El directorio '{data_dir}' no existe. Verifica la ruta.")
else:
    print(f"El directorio '{data_dir}' ha sido encontrado.")


In [None]:
# Crear instancia del dataset
dataset = AudioDataset(data_dir=data_dir, transform=transform)

# Verificar que el dataset no esté vacío
if len(dataset) == 0:
    raise ValueError("El dataset está vacío. Asegúrate de que hay archivos '_spectrogram.png' en las subcarpetas.")
else:
    print(f"Número total de muestras en el dataset: {len(dataset)}")


In [None]:
# Contar las muestras por clase
class_counts = Counter()
for label in dataset.labels:
    class_name = dataset.classes[label]
    class_counts[class_name] += 1

print("Distribución de muestras por clase:")
for class_name, count in class_counts.items():
    print(f"Clase '{class_name}': {count} muestras")


In [None]:
# Dividir el dataset en conjuntos de entrenamiento y validación (80% entrenamiento, 20% validación)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

# Fijar una semilla para reproducibilidad
torch.manual_seed(42)
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Crear DataLoaders para cargar los datos en lotes durante el entrenamiento
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# Mostrar estadísticas de los conjuntos
print(f"Muestras de entrenamiento: {len(train_dataset)}")
print(f"Muestras de validación: {len(val_dataset)}")


In [None]:
# Obtener un lote de datos para verificar
dataiter = iter(train_loader)
images, labels = next(dataiter)

print(f"Tamaño del lote de imágenes: {images.shape}")  # Debe ser [BATCH_SIZE, 3, 128, 128]
print(f"Tamaño del lote de etiquetas: {labels.shape}")  # Debe ser [BATCH_SIZE]


In [None]:
# Función para mostrar una imagen
def imshow(img, label):
    img = img / 2 + 0.5  # Desnormalizamos la imagen
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.title(f"Etiqueta: {dataset.classes[label]}")
    plt.axis('off')
    plt.show()

# Mostrar algunas imágenes del lote
for i in range(4):
    imshow(images[i], labels[i].item())


In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, num_classes):
        super(CNN_LSTM, self).__init__()
        
        # Definición de la parte CNN del modelo
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),  # Capa de convolución
            nn.BatchNorm2d(16),  # Normalización por lotes
            nn.ReLU(),  # Función de activación
            nn.MaxPool2d(2, 2),  # Capa de pooling

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        
        # Parámetros para la parte LSTM
        self.lstm_input_size = 32 * 32  # Tamaño de entrada para la LSTM
        self.hidden_size = 64  # Tamaño de las capas ocultas de la LSTM
        self.num_layers = 2  # Número de capas en la LSTM
        
        # Definición de la LSTM
        self.lstm = nn.LSTM(input_size=self.lstm_input_size, hidden_size=self.hidden_size,
                            num_layers=self.num_layers, batch_first=True)
        
        # Capa totalmente conectada para la clasificación final
        self.fc = nn.Linear(self.hidden_size, num_classes)
        
    def forward(self, x):
        batch_size = x.size(0)
        x = self.cnn(x)  # Pasamos por la CNN
        x = x.view(batch_size, -1, self.lstm_input_size)  # Remodelamos para la LSTM
        h_0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)  # Estado oculto inicial
        c_0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)  # Estado de celda inicial
        out, _ = self.lstm(x, (h_0, c_0))  # Pasamos por la LSTM
        out = out[:, -1, :]  # Tomamos la última salida de la secuencia
        out = self.fc(out)  # Pasamos por la capa final
        return out


In [None]:
# Crear una instancia del modelo
model = CNN_LSTM(num_classes=NUM_CLASSES)

# Mover el modelo al dispositivo disponible (GPU si está disponible)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Definir la función de pérdida
criterion = nn.CrossEntropyLoss()

# Definir el optimizador
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)


In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    # Historial para almacenar pérdidas y precisiones
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(num_epochs):
        # ---------- Entrenamiento ----------
        model.train()  # Ponemos el modelo en modo entrenamiento
        running_loss = 0.0
        correct = 0
        total = 0
        
        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()  # Limpiamos los gradientes
            outputs = model(images)  # Forward
            loss = criterion(outputs, labels)  # Calculamos la pérdida
            loss.backward()  # Backpropagation
            optimizer.step()  # Actualizamos los parámetros
            
            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = 100 * correct / total
        history['train_loss'].append(epoch_loss)
        history['train_acc'].append(epoch_acc)
        
        # ---------- Validación ----------
        model.eval()  # Ponemos el modelo en modo evaluación
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():  # Desactivamos el cálculo de gradientes
            for images, labels in val_loader:
                images = images.to(device)
                labels = labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        val_epoch_loss = val_loss / len(val_loader.dataset)
        val_epoch_acc = 100 * val_correct / val_total
        history['val_loss'].append(val_epoch_loss)
        history['val_acc'].append(val_epoch_acc)
        
        # Imprimimos las estadísticas de la época
        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.2f}%, '
              f'Val Loss: {val_epoch_loss:.4f}, Val Acc: {val_epoch_acc:.2f}%')
    
    return history  # Retornamos el historial para análisis posterior


In [None]:
# Entrenamos el modelo y almacenamos el historial
history = train_model(model, train_loader, val_loader, criterion, optimizer, NUM_EPOCHS)


In [None]:
# Graficar la pérdida durante el entrenamiento
plt.figure(figsize=(10,5))
plt.plot(history['train_loss'], label='Entrenamiento')
plt.plot(history['val_loss'], label='Validación')
plt.title('Pérdida durante el entrenamiento')
plt.xlabel('Época')
plt.ylabel('Pérdida')
plt.legend()
plt.show()

# Graficar la precisión durante el entrenamiento
plt.figure(figsize=(10,5))
plt.plot(history['train_acc'], label='Entrenamiento')
plt.plot(history['val_acc'], label='Validación')
plt.title('Precisión durante el entrenamiento')
plt.xlabel('Época')
plt.ylabel('Precisión (%)')
plt.legend()
plt.show()


In [None]:
# Guardar el modelo entrenado para uso futuro
model_save_path = 'cnn_lstm_model.pth'
torch.save(model.state_dict(), model_save_path)
print(f"Modelo guardado en {model_save_path}")


In [None]:
# Evaluación adicional
# Obtener todas las predicciones y etiquetas reales en el conjunto de validación
all_preds = []
all_labels = []

model.eval()
with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Especificar todas las etiquetas posibles
all_classes = list(range(len(dataset.classes)))  # [0, 1, 2, 3, 4]

# Calcular la matriz de confusión
cm = confusion_matrix(all_labels, all_preds, labels=all_classes)

plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=dataset.classes, yticklabels=dataset.classes, cmap='Blues')
plt.xlabel('Predicción')
plt.ylabel('Etiqueta Verdadera')
plt.title('Matriz de Confusión en el Conjunto de Validación')
plt.show()

# Mostrar reporte de clasificación
print("Reporte de Clasificación:")
print(classification_report(all_labels, all_preds, labels=all_classes, target_names=dataset.classes, zero_division=0))


In [None]:
# Importar torchinfo
from torchinfo import summary

# Mover el modelo a CPU si está en GPU
model_cpu = model.to('cpu')

# Obtener el resumen del modelo
print("Resumen del Modelo:")
summary(model_cpu, input_size=(BATCH_SIZE, 3, 128, 128))
