In [4]:
# Import all necessary libraries

import numpy as np
import matplotlib.pyplot as plt
import math
import os
import torch
import torchaudio
import scipy.signal
import pydub
import librosa
import random
from torch.utils.data import Dataset
import pandas as pd
from torch.utils.data import DataLoader
import torchaudio.transforms as transforms
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.neighbors import KNeighborsClassifier
from IPython.display import Audio
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
from torch.utils.data import random_split
from torch.distributions.normal import Normal

import tools
import tests
import time





In [5]:
ANNOTATIONS_FILE = r'C:\shipsEar_AUDIOS\shipsEar.xlsx'
AUDIO_DIR = r'C:\shipsEar_AUDIOS'
SAMPLE_RATE = 52734
NUM_SAMPLES = 16000

metadata_file = ANNOTATIONS_FILE
df = pd.read_excel(metadata_file)
df["relative_path"] = df["Filename"]
type_mapping = {type_name: class_id for class_id, type_name in enumerate(df["Type"].unique())}
df["class_id"] = df["Type"].map(type_mapping)
df = df[["relative_path", "class_id"]]

df.head()



Unnamed: 0,relative_path,class_id
0,80__04_10_12_adricristuy.wav,0
1,10__10_07_13_marDeOnza_Sale.wav,1
2,14__10_07_13_piraCies_Espera.wav,1
3,15__10_07_13_radaUno_Pasa.wav,2
4,6__10_07_13_marDeCangas_Entra.wav,1


In [6]:
class AudioUtilization():

    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    @staticmethod
    def rechannel(audio, new_channel):

        # Convert audio to the desired number of channels
        sig, sr = audio

        if sig.shape[0] == new_channel:
            return audio
        
        if new_channel == 1:
            resig = sig[:1, :]
        
        else:
            resig = torch.cat([sig,sig])

        return ((resig, sr))
    
    @staticmethod
    def pad_trunc(audio, max_ms):
        sig, sr = audio
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if sig_len > max_len:
            sig = sig[:,:max_len]
        
        elif sig_len < max_len:
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            pad_begin = torch.zeros((num_rows,pad_begin_len))
            pad_end = torch.zeros((num_rows,pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        return (sig, sr)
    
    @staticmethod
    def time_shift(audio, shift_limit):
        sig, sr = audio
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    
    @staticmethod
    def spectro_gram(audio, n_mels = 64, n_fft = 1024, hop_len = None):
        sig, sr = audio
        top_db = 100

        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
        
        # Conversion to decibels:
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)

        return (spec)
    
    @staticmethod
    def spectro_augmen(spec, max_mask_pct = 0.1, n_freq_masks = 1, n_time_masks = 1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels

        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps

        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec
    
    @staticmethod
    def pad_spectrogram(spec, max_len):
        n_channels, n_mels, n_steps = spec.shape
        if n_steps < max_len:
            pad_amount = max_len - n_steps
            pad_spec = torch.nn.functional.pad(spec, (0, pad_amount), "constant", 0)
        else:
            pad_spec = spec[:, :, :max_len]
        return pad_spec

In [7]:
class AudioDataset(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 52734
        self.channel = 2
        self.shift_pct = 0.4

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        #audio_file = self.data_path + self.df.loc[idx, "relative_path"]
        #print(f"this is self.path: {self.data_path}")
        #rel_path = self.df.loc[idx, "relative_path"]

        audio_file = os.path.join(self.data_path, self.df.loc[idx, "relative_path"])

        class_id = self.df.loc[idx, "class_id"]


        audio = AudioUtilization.open(audio_file)
        re_channel = AudioUtilization.rechannel(audio, self.channel)
        dur_audio = AudioUtilization.pad_trunc(re_channel, self.duration)
        shift_audio = AudioUtilization.time_shift(dur_audio, self.shift_pct)
        spectrogram = AudioUtilization.spectro_gram(shift_audio, n_mels=64, n_fft=1024, hop_len=None)
        aug_spectrogram = AudioUtilization.spectro_augmen(spectrogram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_spectrogram, class_id

In [8]:
class AudioDataset_longer(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 10000
        self.sr = 52734
        self.channel = 2
        self.shift_pct = 0.4

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        #audio_file = self.data_path + self.df.loc[idx, "relative_path"]
        #print(f"this is self.path: {self.data_path}")
        #rel_path = self.df.loc[idx, "relative_path"]

        audio_file = os.path.join(self.data_path, self.df.loc[idx, "relative_path"])

        class_id = self.df.loc[idx, "class_id"]


        audio = AudioUtilization.open(audio_file)
        re_channel = AudioUtilization.rechannel(audio, self.channel)
        dur_audio = AudioUtilization.pad_trunc(re_channel, self.duration)
        shift_audio = AudioUtilization.time_shift(dur_audio, self.shift_pct)
        spectrogram = AudioUtilization.spectro_gram(shift_audio, n_mels=64, n_fft=1024, hop_len=None)
        aug_spectrogram = AudioUtilization.spectro_augmen(spectrogram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_spectrogram, class_id

In [13]:
# Neural network classifier

class AudioClassifier(nn.Module):
    def __init__(self):
        super().__init__()
    
    def custom_collate_fn(batch):
        max_len = max([item[0].shape[2] for item in batch])
        specs = [AudioUtilization.pad_spectrogram(item[0], max_len) for item in batch]
        labels = torch.tensor([item[1] for item in batch])
        return torch.stack(specs), labels



In [10]:
# Implementing dataloader for shorter spectrograms (4 s)

data_path = r"C:\shipsEar_AUDIOS"

myds = AudioDataset(df, data_path)

torch.manual_seed(42)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=AudioClassifier.custom_collate_fn)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False, collate_fn=AudioClassifier.custom_collate_fn)

In [11]:
# Implementing dataloader for longer spectrograms (10 s)

data_path = r"C:\shipsEar_AUDIOS"

myds = AudioDataset_longer(df, data_path)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds_long, val_ds_long = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl_long = torch.utils.data.DataLoader(train_ds_long, batch_size=16, shuffle=True, collate_fn=AudioClassifier.custom_collate_fn)
val_dl_long = torch.utils.data.DataLoader(val_ds_long, batch_size=16, shuffle=False, collate_fn=AudioClassifier.custom_collate_fn)

In [None]:
# define model hyperparameters
LR = 0.001
PATIENCE = 2
IMAGE_SIZE = 
CHANNELS = 2
BATCH_SIZE = 16
EMBEDDING_DIM = 13
EPOCHS = 100
SHAPE_BEFORE_FLATTENING = (128, IMAGE_SIZE // 8, IMAGE_SIZE // 8)

In [None]:
def vae_gaussian_kl_loss(mu, logvar):
    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # https://arxiv.org/abs/1312.6114
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
    return KLD.mean()

def reconstruction_loss(x_reconstructed, x):
    bce_loss = nn.BCELoss()
    return bce_loss(x_reconstructed, x)

def vae_loss(y_pred, y_true):
    mu, logvar, recon_x = y_pred
    recon_loss = reconstruction_loss(recon_x, y_true)
    kld_loss = vae_gaussian_kl_loss(mu, logvar)
    return 500 * recon_loss + kld_loss

In [12]:
# https://pyimagesearch.com/2023/10/02/a-deep-dive-into-variational-autoencoders-with-pytorch/

# VARIATIONAL AUTOENCODER

class Sampling(nn.Module):
    def forward(self, z_mean, z_log_var):
        # get the shape of the tensor for the mean and log variance
        batch, dim = z_mean.shape
        # generate a normal random tensor (epsilon) with the same shape as z_mean
        # this tensor will be used for reparameterization trick
        epsilon = Normal(0, 1).sample((batch, dim)).to(z_mean.device)
        # apply the reparameterization trick to generate the samples in the
        # latent space
        return z_mean + torch.exp(0.5 * z_log_var) * epsilon

In [None]:
class Encoder(nn.Module):
    def __init__(self, image_size, embedding_dim):
        super(Encoder, self).__init__()
        # define the convolutional layers for downsampling and feature
        # extraction
        self.conv1 = nn.Conv2d(1, 32, 3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, stride=2, padding=1)
        # define a flatten layer to flatten the tensor before feeding it into
        # the fully connected layer
        self.flatten = nn.Flatten()
        # define fully connected layers to transform the tensor into the desired
        # embedding dimensions
        self.fc_mean = nn.Linear(
            128 * (image_size // 8) * (image_size // 8), embedding_dim
        )
        self.fc_log_var = nn.Linear(
            128 * (image_size // 8) * (image_size // 8), embedding_dim
        )
        # initialize the sampling layer
        self.sampling = Sampling()
    def forward(self, x):
        # apply convolutional layers with relu activation function
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        # flatten the tensor
        x = self.flatten(x)
        # get the mean and log variance of the latent space distribution
        z_mean = self.fc_mean(x)
        z_log_var = self.fc_log_var(x)
        # sample a latent vector using the reparameterization trick
        z = self.sampling(z_mean, z_log_var)
        return z_mean, z_log_var, z

In [None]:
class Decoder(nn.Module):
    def __init__(self, embedding_dim, shape_before_flattening):
        super(Decoder, self).__init__()
        # define a fully connected layer to transform the latent vector back to
        # the shape before flattening
        self.fc = nn.Linear(
            embedding_dim,
            shape_before_flattening[0]
            * shape_before_flattening[1]
            * shape_before_flattening[2],
        )
        # define a reshape function to reshape the tensor back to its original
        # shape
        self.reshape = lambda x: x.view(-1, *shape_before_flattening)
        # define the transposed convolutional layers for the decoder to upsample
        # and generate the reconstructed image
        self.deconv1 = nn.ConvTranspose2d(
            128, 64, 3, stride=2, padding=1, output_padding=1
        )
        self.deconv2 = nn.ConvTranspose2d(
            64, 32, 3, stride=2, padding=1, output_padding=1
        )
        self.deconv3 = nn.ConvTranspose2d(
            32, 1, 3, stride=2, padding=1, output_padding=1
        )
    def forward(self, x):
        # pass the latent vector through the fully connected layer
        x = self.fc(x)
        # reshape the tensor
        x = self.reshape(x)
        # apply transposed convolutional layers with relu activation function
        x = F.relu(self.deconv1(x))
        x = F.relu(self.deconv2(x))
        # apply the final transposed convolutional layer with a sigmoid
        # activation to generate the final output
        x = torch.sigmoid(self.deconv3(x))
        return x

In [None]:
class VAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(VAE, self).__init__()
        # initialize the encoder and decoder
        self.encoder = encoder
        self.decoder = decoder
    def forward(self, x):
        # pass the input through the encoder to get the latent vector
        z_mean, z_log_var, z = self.encoder(x)
        # pass the latent vector through the decoder to get the reconstructed
        # image
        reconstruction = self.decoder(z)
        # return the mean, log variance and the reconstructed image
        return z_mean, z_log_var, reconstruction