In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os


In [2]:
device = torch.device('mps')


In [3]:
class SpectrogramDataset(Dataset):
    def __init__(self, csv_file, target_length=10, sample_rate=80):
        """
        Args:
            csv_file (string): Path to the csv file with annotations and file paths.
            target_length (int): Desired length of the spectrograms in seconds.
            sample_rate (int): Sample rate of the spectrograms.
        """
        self.annotations = pd.read_csv(csv_file)
        self.target_length = target_length
        self.sample_rate = sample_rate

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        spectrogram_path = self.annotations.iloc[idx]['spectrogram_filepath']
        spectrogram = np.load(f"../{spectrogram_path}")

        # Calculate the number of samples for target length
        num_samples = self.target_length * self.sample_rate

        # Pad or cut the spectrogram
        if spectrogram.shape[2] > num_samples:
            # Cut the spectrogram
            spectrogram = spectrogram[:, :, :num_samples]
        elif spectrogram.shape[2] < num_samples:
            # Pad the spectrogram with zeros
            padding_size = num_samples - spectrogram.shape[2]
            spectrogram = np.pad(spectrogram, ((0, 0), (0, 0), (0, padding_size)), mode='constant')

        spectrogram_tensor = torch.from_numpy(spectrogram).type(torch.float32)
        return spectrogram_tensor


In [4]:
def create_data_loader(csv_file, batch_size, target_length=10, sample_rate=80):
    dataset = SpectrogramDataset(csv_file=csv_file, target_length=target_length, sample_rate=sample_rate)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Example usage
train_data_loader = create_data_loader('../train.csv', batch_size=32)
test_data_loader = create_data_loader('../test.csv', batch_size=32)

print(len(train_data_loader))
print(len(test_data_loader))

print(next(iter(train_data_loader)).shape)


376
99
torch.Size([32, 1, 201, 800])


In [7]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        # Define the layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)  # Output: 16 x 201 x 800
        self.pool1 = nn.MaxPool2d(2, 2)  # Output: 16 x 100 x 400
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)  # Output: 32 x 100 x 400
        self.pool2 = nn.MaxPool2d(2, 2)  # Output: 32 x 50 x 200
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)  # Output: 64 x 50 x 200
        self.pool3 = nn.MaxPool2d(2, 2)  # Output: 64 x 25 x 100
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)  # Output: 128 x 25 x 100
        self.pool4 = nn.MaxPool2d(2, 2)  # Output: 128 x 12 x 50
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)  # Output: 256 x 12 x 50
        self.pool5 = nn.MaxPool2d(2, 2)  # Output: 256 x 6 x 25

        # Fully connected layer to get to the bottleneck
        self.fc = nn.Linear(256 * 6 * 25, 512)

    def forward(self, x):
        x = self.pool1(torch.relu(self.conv1(x)))
        x = self.pool2(torch.relu(self.conv2(x)))
        x = self.pool3(torch.relu(self.conv3(x)))
        x = self.pool4(torch.relu(self.conv4(x)))
        x = self.pool5(torch.relu(self.conv5(x)))

        # Flatten and pass through the fully connected layer
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.fc(x)
        return x


In [9]:
def test_Encoder_shapes():
    encoder = Encoder()

    # Create a random batch of input data with the shape (batch_size, channels, height, width)
    # Assuming the input spectrograms are single-channel (grayscale)
    x = torch.randn(32, 1, 201, 800)  # Batch size of 32
    y = encoder(x)

    # The expected output shape is (batch_size, bottleneck_size)
    expected_shape = torch.Size([32, 512])
    assert y.shape == expected_shape, f"Bad y.shape: {y.shape}"
    print('Success')

test_Encoder_shapes()


Success


In [10]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        # Fully connected layer to expand from the bottleneck
        self.fc = nn.Linear(512, 256 * 6 * 25)

        # Transposed convolution layers and upsampling
        self.conv_transpose1 = nn.ConvTranspose2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.upsample1 = nn.Upsample(scale_factor=2, mode='nearest')  # Output: 128 x 12 x 50
        self.conv_transpose2 = nn.ConvTranspose2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.upsample2 = nn.Upsample(scale_factor=2, mode='nearest')  # Output: 64 x 24 x 100
        self.conv_transpose3 = nn.ConvTranspose2d(64, 32, kernel_size=3, stride=1, padding=1)
        self.upsample3 = nn.Upsample(scale_factor=2, mode='nearest')  # Output: 32 x 48 x 200
        self.conv_transpose4 = nn.ConvTranspose2d(32, 16, kernel_size=3, stride=1, padding=1)
        self.upsample4 = nn.Upsample(scale_factor=2, mode='nearest')  # Output: 16 x 96 x 400
        self.conv_transpose5 = nn.ConvTranspose2d(16, 1, kernel_size=3, stride=1, padding=1)
        self.upsample5 = nn.Upsample(size=(201, 800), mode='nearest')  # Output: 1 x 201 x 800

    def forward(self, x):
        x = self.fc(x)
        x = x.view(-1, 256, 6, 25)  # Reshape to match the output shape of the encoder's last pooling layer

        x = self.upsample1(torch.relu(self.conv_transpose1(x)))
        x = self.upsample2(torch.relu(self.conv_transpose2(x)))
        x = self.upsample3(torch.relu(self.conv_transpose3(x)))
        x = self.upsample4(torch.relu(self.conv_transpose4(x)))
        x = self.upsample5(torch.relu(self.conv_transpose5(x)))

        return x


In [11]:
def test_Decoder_shapes():
    decoder = Decoder()

    # Create a random batch of input data with the shape (batch_size, bottleneck_size)
    z = torch.randn(3, 512)  # Batch size of 3
    y = decoder(z)

    # The expected output shape is (batch_size, channels, height, width)
    expected_shape = torch.Size([3, 1, 201, 800])
    assert y.shape == expected_shape, "Bad shape of y: y.shape={}".format(y.shape)
    print('Success')

test_Decoder_shapes()


Success


In [12]:
encoder = Encoder()
encoder.to(device)

decoder = Decoder()
decoder.to(device)

num_encoder_parameters = sum(p.numel() for p in encoder.parameters() if p.requires_grad)
print(f'Number of parameters in the encoder: {num_encoder_parameters}')

num_decoder_parameters = sum(p.numel() for p in decoder.parameters() if p.requires_grad)
print(f'Number of parameters in the decoder: {num_decoder_parameters}')


Number of parameters in the encoder: 20053632
Number of parameters in the decoder: 20091265


In [16]:
learning_rate = 0.001
epochs = 50

optimizer = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=learning_rate)
criterion = nn.MSELoss()

train_losses = []
test_losses = []

for epoch in range(epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_data_loader):
          data = data.to(device)
          optimizer.zero_grad()

          z = encoder(data)
          output = decoder(z)

          loss = criterion(output, data)

          loss.backward()
          optimizer.step()

          train_loss += loss.item()

    train_loss /= len(train_data_loader)
    train_losses.append(train_loss)

    test_loss = 0
    with torch.no_grad():
        for batch_idx, data in enumerate(test_data_loader):
            data = data.to(device)

            z = encoder(data)
            output = decoder(z)

            loss = criterion(output, data)
            test_loss += loss.item()

    test_loss /= len(test_data_loader)
    test_losses.append(test_loss)

    print(f'Epoch {epoch + 1}, Train Loss: {train_loss:.2f}, Test Loss: {test_loss:.2f}')


Epoch 1, Train Loss: 4.51, Test Loss: 3.85
Epoch 2, Train Loss: 4.51, Test Loss: 3.84
Epoch 3, Train Loss: 4.51, Test Loss: 3.85
Epoch 4, Train Loss: 4.51, Test Loss: 3.85
Epoch 5, Train Loss: 4.51, Test Loss: 3.86
Epoch 6, Train Loss: 4.51, Test Loss: 3.87
Epoch 7, Train Loss: 4.51, Test Loss: 3.85
Epoch 8, Train Loss: 4.51, Test Loss: 3.84
Epoch 9, Train Loss: 4.51, Test Loss: 3.84
Epoch 10, Train Loss: 4.51, Test Loss: 3.85
Epoch 11, Train Loss: 4.51, Test Loss: 3.85
Epoch 12, Train Loss: 4.51, Test Loss: 3.84
Epoch 13, Train Loss: 4.51, Test Loss: 3.85
Epoch 14, Train Loss: 4.51, Test Loss: 3.87
Epoch 15, Train Loss: 4.51, Test Loss: 3.84
Epoch 16, Train Loss: 4.51, Test Loss: 3.87
Epoch 17, Train Loss: 4.51, Test Loss: 3.83
Epoch 18, Train Loss: 4.51, Test Loss: 3.84
Epoch 19, Train Loss: 4.51, Test Loss: 3.84
Epoch 20, Train Loss: 4.51, Test Loss: 3.84
Epoch 21, Train Loss: 4.51, Test Loss: 3.84
Epoch 22, Train Loss: 4.51, Test Loss: 3.84
Epoch 23, Train Loss: 4.51, Test Loss: 3.

KeyboardInterrupt: 