In [51]:
import io

import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import torchaudio.transforms as aT
import torchvision.models as models
import torchvision.transforms as vT
from torch.utils.data import DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from torchvision.datasets import DatasetFolder
from tqdm import tqdm

from models.__init__ import DenseNet, ResNet


In [55]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

writer = SummaryWriter(comment="audio_classification_densenet")


cuda


## Load dataset

In [22]:
def audio_loader(path, max_length_in_seconds=4):
    waveform, sample_rate = torchaudio.load(path)
    num_channels, num_frames = waveform.shape
    max_frames = sample_rate * max_length_in_seconds

    # ? Pad audio with zeros if too short or cut audio if too long
    if num_frames < max_frames:
        waveform = torch.nn.functional.pad(waveform, (0, max_frames - num_frames))
    elif num_frames > max_frames:
        waveform = waveform.narrow(dim=1, start=0, length=max_frames)

    return waveform

n_mels = 320
original_rate = 44100
sample_rate = 22050

transforms = nn.Sequential(
    aT.Resample(original_rate, sample_rate),
    # aT.MFCC(sample_rate=44100, n_mfcc=64),
    aT.MelSpectrogram(sample_rate=sample_rate, n_mels=n_mels),
    aT.AmplitudeToDB(),
)

dataset = DatasetFolder(root='./dataset', loader=audio_loader, extensions='wav', transform=transforms)
print(f"Input shape: {tuple(dataset[0][0].shape)}")

classes = dataset.classes
print("Classes:", classes)

# unique, counts = np.unique([dataset[i][0].shape for i in range(len(dataset))], return_counts=True)
# print(dict(zip(unique, counts)))



Input shape: (1, 320, 442)
Classes: ['1', '2', '3', '4', '5', '6', '7']


In [12]:
train_dataset, valid_dataset = random_split(
    dataset,
    lengths=[int(0.9 * len(dataset)), len(dataset) - int(0.9 * len(dataset))]
)

# ? dataloader
train_dataloader = DataLoader(
    train_dataset.dataset,
    batch_size=4,
    shuffle=False,
    # pin_memory=True,
    # num_workers=2,
)

valid_dataloader = DataLoader(
    valid_dataset.dataset,
    batch_size=4,
    shuffle=True,
    # pin_memory=False,
    # num_workers=2,
)


In [54]:
# bands = 60
# frames = 41
# num_features = 1 # 2
# hidden_size = 5300
# num_layers = 1

# input_size = bands * frames * num_features
# model = AudioRNN(input_size, hidden_size, num_layers, num_classes)
# model = AudioRNN(input_size, 5300, 1, len(dataset.classes))

# model = AudioCNN(num_classes)

num_classes = len(classes)

# model = ResNet(num_classes)
model = DenseNet(num_classes)
model.to(device)


DenseNet(
  (model): DenseNet(
    (features): Sequential(
      (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu0): ReLU(inplace=True)
      (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (denseblock1): _DenseBlock(
        (denselayer1): _DenseLayer(
          (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu1): ReLU(inplace=True)
          (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu2): ReLU(inplace=True)
          (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        )
        (denselayer2): _DenseLayer(
          (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1,

In [36]:
# ? hyperparams

lr = 0.001
num_epochs = 20


In [37]:
optimizer = optim.Adam(model.parameters(), lr=lr)

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=num_epochs//3, gamma=0.1)
criterion = nn.CrossEntropyLoss()


In [32]:
def train(model, train_loader, criterion, optimizer, epoch, log_interval=10, debug_interval=25):
    model.train()
    train_loss = 0
    num_correct = 0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        # ? zero the parameter gradients
        optimizer.zero_grad()

        # ? forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs, 1)

        # ? backward pass
        loss.backward()

        # ? optimize
        optimizer.step()

        train_loss += loss.item()
        num_correct += (predicted == labels).sum().item()

        # ? print training stats
        iteration = epoch * len(train_loader) + batch_idx

        if (batch_idx + 1) % log_interval == 0:
            writer.add_scalar('training loss/loss', loss, iteration)
            writer.add_scalar('learning rate/lr', optimizer.param_groups[0]['lr'], iteration)
            writer.add_scalar('training accuracy/acc', num_correct / (batch_idx * len(labels)), iteration)

        #     print(
        #         f'Epoch: {epoch}\tLoss: {loss:.6f}'
        #         f'[{batch_idx * len(inputs)}/{len(train_loader.dataset)} '
        #         f'({100. * batch_idx / len(train_loader):.0f}%)]'
        #     )

        # ? report debug image every `debug_interval` mini-batches
        # if batch_idx % debug_interval == 0:
        #     for n, (inp, pred, label) in enumerate(zip(inputs, predicted, labels)):
        #         series = (
        #             f'label_{classes[label.cpu()]}'
        #             f'_pred_{classes[pred.cpu()]}'
        #         )

        #         writer.add_image(
        #             f'Train MelSpectrogram samples/{batch_idx}_{n}_{series}',
        #             plot_signal(inp.cpu().numpy().squeeze(), series, 'hot'),
        #             iteration
        #         )

    train_loss /= len(train_loader)
    accuracy = 100 * num_correct / len(train_loader.dataset)

    return train_loss, accuracy

def valid(model, valid_loader, criterion, debug_interval=25):
    model.eval()
    valid_loss = 0
    num_correct = 0

    with torch.no_grad():
        for batch_idx, (inputs, labels) in enumerate(valid_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs, 1)

            valid_loss += loss.item()
            num_correct += (predicted == labels).sum().item()

            # iteration = (epoch + 1) * len(valid_loader)
            # if batch_idx % debug_interval == 0:
            #     for n, (inp, pred, label) in enumerate(zip(inputs, predicted, labels)):
            #         series = f'label_{classes[label.cpu()]}_pred_{classes[pred.cpu()]}'

            #         writer.add_image(
            #             f'Test MelSpectrogram samples/{batch_idx}_{n}_{series}', 
            #             plot_signal(inp.cpu().numpy().squeeze(), series, 'hot'), iteration
            #         )

    valid_loss /= len(valid_loader)
    accuracy = 100 * num_correct / len(valid_loader.dataset)

    return valid_loss, accuracy


def predict(model, data):
    model.eval()

    with torch.no_grad():
        data = data.unsqueeze(1).to(device)
        output = model(data)
        accuracy, [predicted] = torch.max(output.data, 1)
        accuracy = 100 * accuracy.item()
        predicted = classes[predicted.item()]

        return predicted, accuracy


In [38]:
train_losses, train_accuracies = [], []
valid_losses, valid_accuracies = [], []

for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_dataloader, criterion, optimizer, epoch=epoch)
    valid_loss, valid_accuracy = valid(model, valid_dataloader, criterion)
    scheduler.step()

    print(
        f'Epoch: {epoch+1}/{num_epochs}\t'
        f'Training (Loss, Accuracy): ({train_loss:.6f}, {train_accuracy:.2f}%)\t'
        f'Validation (Loss, Accuracy): ({valid_loss:.6f}, {valid_accuracy:.2f}%)\t'
    )

    train_losses.append(train_loss), train_accuracies.append(train_accuracy)
    valid_losses.append(valid_loss), valid_accuracies.append(valid_accuracy)


Epoch: 1/20	Training (Loss, Accuracy): (2.072977, 13.45%)	Validation (Loss, Accuracy): (2.785317, 11.76%)	
Epoch: 2/20	Training (Loss, Accuracy): (1.961813, 8.40%)	Validation (Loss, Accuracy): (2.037674, 13.45%)	
Epoch: 3/20	Training (Loss, Accuracy): (1.944716, 14.29%)	Validation (Loss, Accuracy): (2.013979, 10.92%)	
Epoch: 4/20	Training (Loss, Accuracy): (1.935470, 15.97%)	Validation (Loss, Accuracy): (2.024942, 10.08%)	
Epoch: 5/20	Training (Loss, Accuracy): (1.925825, 18.49%)	Validation (Loss, Accuracy): (2.035478, 10.08%)	
Epoch: 6/20	Training (Loss, Accuracy): (1.925341, 14.29%)	Validation (Loss, Accuracy): (2.106212, 11.76%)	
Epoch: 7/20	Training (Loss, Accuracy): (1.868344, 26.89%)	Validation (Loss, Accuracy): (2.024980, 12.61%)	
Epoch: 8/20	Training (Loss, Accuracy): (1.860165, 28.57%)	Validation (Loss, Accuracy): (2.023788, 12.61%)	
Epoch: 9/20	Training (Loss, Accuracy): (1.852530, 29.41%)	Validation (Loss, Accuracy): (2.019714, 12.61%)	
Epoch: 10/20	Training (Loss, Accuracy)

In [26]:
sample = valid_dataloader.dataset[-1]
predicted, accuracy = predict(model, sample[0])

print(f"Target: {classes[sample[1]]}, Prediction: {predicted}")


Target: 7, Prediction: 4
