In [None]:
import os
import torch
import torchvision
import torchaudio
from torch.utils.data import Dataset
import pandas as pd
from torch import nn
import torch.nn.functional as F
from torchvision.datasets.utils import download_url
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.utils.data import random_split
from torchmetrics import Accuracy
import torch.optim as optim
import matplotlib
import matplotlib.pyplot as plt
from tqdm import tqdm
import torchvision.models as models

In [None]:
accuracy = Accuracy(task="multiclass", num_classes=5)

# Load and modify ResNet34
model = models.resnet34()
model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(in_features=512, out_features=5, bias=True)
)

In [None]:
def get_default_device(num):
    """Pick GPU if available else cpu"""
    if torch.cuda.is_available():
        return torch.device(f"cuda:{num}")
    else:
        return torch.device("cpu")

def to_device(data, device):
    """Move tensor to chosen device"""
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        """Yield a batch data after moving it to device"""
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
import torchaudio.transforms as T

# Define data augmentation transformations
data_augmentation_transforms = T.Compose([
    T.TimeMasking(time_mask_param=30),
    T.FrequencyMasking(freq_mask_param=15),
    T.TimeStretch()
])


In [None]:
class ShipsEarDataset(Dataset):
    def __init__(self, annotation_file, audio_dir, transformation, target_sample_rate, num_samples, augmentations=None):
        self.annotations = pd.read_csv(annotation_file)
        self.audio_dir = audio_dir
        self.transformation = transformation
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        self.augmentations = augmentations

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)

        # Apply augmentations
        if self.augmentations:
            signal = self.augmentations(signal)

        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _get_audio_sample_path(self, index):
        fold = f"{self.annotations.iloc[index, 1]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[index, 0])
        return path

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 5]

In [None]:
spectrogram = torchaudio.transforms.Spectrogram(n_fft=1024)

ANNOTATIONS_FILE = "Acoustic signal classification/AutoEncoder/AnnotationsAutoEncoder.csv"
AUDIO_DIR = "Acoustic signal classification/AutoEncoder/Denoised_Audio50"
SAMPLE_RATE = 44100
NUM_SAMPLES = 5 * SAMPLE_RATE
BATCH_SIZE = 30

In [None]:
# Apply the data augmentation only to the training dataset
dataset = ShipsEarDataset(ANNOTATIONS_FILE, AUDIO_DIR, spectrogram, SAMPLE_RATE, NUM_SAMPLES)


In [None]:
val_size = int(0.1 * len(sed))
test_size = int(0.1 * len(sed))
train_size = len(sed) - val_size - test_size
train_ds, val_ds, test_ds = random_split(sed, [train_size, val_size, test_size])

# Create augmented and non-augmented datasets
train_ds.dataset.augmentations = data_augmentation_transforms
val_ds.dataset.augmentations = None
test_ds.dataset.augmentations = None

train_dl = DataLoader(train_ds, 
                      batch_size=BATCH_SIZE, 
                      shuffle=True, 
                      num_workers=32, 
                      pin_memory=True)
val_dl = DataLoader(val_ds, 
                    batch_size=BATCH_SIZE, 
                    num_workers=32, 
                    pin_memory=True)
test_dl = DataLoader(test_ds, 
                     batch_size=BATCH_SIZE, 
                     num_workers=32, 
                     pin_memory=True)

In [None]:
device = get_default_device(0)

train_loader = DeviceDataLoader(train_dl, device)
val_loader = DeviceDataLoader(val_dl, device)
test_loader = DeviceDataLoader(test_dl, device)
accuracy = accuracy.to(device)

In [None]:
def train_and_validate(model, train_loader, valid_loader, num_epochs, device, clip_value=1.0):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5, verbose=True)

    train_errors = []
    valid_errors = []

    for epoch in tqdm(range(num_epochs)):
        model.train()
        total_train_loss = 0.0
        correct_train = 0
        total_train = 0

        for inputs, targets in tqdm(train_loader):
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), clip_value)  # Gradient clipping
            optimizer.step()

            total_train_loss += loss.item()
            _, predicted = outputs.max(1)
            total_train += targets.size(0)
            correct_train += predicted.eq(targets).sum().item()

        train_accuracy = correct_train / total_train
        train_errors.append(1 - train_accuracy)

        model.eval()
        total_valid_loss = 0.0
        correct_valid = 0
        total_valid = 0

        with torch.no_grad():
            for inputs, targets in valid_loader:
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                total_valid_loss += loss.item()
                _, predicted = outputs.max(1)
                total_valid += targets.size(0)
                correct_valid += predicted.eq(targets).sum().item()

        valid_loss = total_valid_loss / len(valid_loader)
        valid_accuracy = correct_valid / total_valid
        valid_errors.append(1 - valid_accuracy)
        
        scheduler.step(valid_loss)

        torch.save(model.state_dict(), f'model_weights_{epoch+1}.pth')
        print(f"Epoch [{epoch+1}/{num_epochs}] Train Loss: {total_train_loss:.4f} Train Accuracy: {train_accuracy:.4f} Valid Loss: {valid_loss:.4f} Valid Accuracy: {valid_accuracy:.4f}")

    return train_errors, valid_errors

In [None]:
# Example usage
train_errors, valid_errors = train_and_validate(model, train_loader, val_loader, num_epochs=100, device=device)