In [None]:
import numpy as np
import sys
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
cuda = torch.cuda.is_available()
cuda

In [None]:
NUM_LABELS = 138
TRAIN_DATA_PATH = 'insert/path/to/data'
TRAIN_LABELS_PATH = 'insert/path/to/data'
TEST_DATA_PATH = 'insert/path/to/data'
num_workers = 4 if cuda else 0
k = 12

In [None]:
# Excluded the creation of the KSpectrograms dataset class
# May be private material for future DL students

In [None]:
train_dataset = KSpectrograms(TRAIN_DATA_PATH, TRAIN_LABELS_PATH, k=k)
test_dataset = KSpectrograms(TEST_DATA_PATH, k=k)

In [None]:
if cuda:
    train_loader_args = dict(shuffle=True, batch_size=256, num_workers=num_workers, pin_memory=True, drop_last=True)
    test_loader_args = dict(shuffle=False, batch_size=1, num_workers=num_workers, pin_memory=True)
else:
    train_loader_args = dict(shuffle=True, batch_size=256, drop_last=True)
    test_loader_args = dict(shuffle=False, batch_size=1, num_workers=num_workers, pin_memory=True)
train_loader = DataLoader(train_dataset, **train_loader_args)
test_loader = DataLoader(test_dataset, **test_loader_args)

In [None]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        layers = []
        self.net = nn.Sequential(
            nn.Linear((40*(2*k+1)), 4096),
            nn.BatchNorm1d(4096),
            nn.PReLU(),
            nn.Linear(4096, 2048),
            nn.BatchNorm1d(2048),
            nn.PReLU(),
            nn.Linear(2048, 2048),
            nn.BatchNorm1d(1024),
            nn.PReLU(),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.PReLU(),
            nn.Linear(1024, NUM_LABELS)
        )
    def forward(self, x):
        result = self.net(x)
        return result

In [None]:
input_size =  40 * (2 * k + 1)
mlp = MLP()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(mlp.parameters(), betas=(0.9,0.999),lr=1e-4, weight_decay=0)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.7)
device = torch.device("cuda" if cuda else "cpu")
mlp.to(device)
print(mlp)

In [None]:
def train(model, train_loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    start_time = time.time()
    for batch, (data, target) in enumerate(train_loader):
        if batch % 5000 == 0:
            print("Current batch ", batch)
        optimizer.zero_grad()
        data = data.to(device)
        target = target.to(device)

        outputs = model(data)
        loss = criterion(outputs, target.squeeze())

        running_loss += loss.item()
        loss.backward()
        scheduler.step()
    end_time = time.time()
    running_loss /= len(train_loader)
    print('Training loss: ', running_loss, 'Time: ', end_time - start_time, 's')
    return running_loss

In [None]:
def test_model(model, test_loader, criterion):
    with torch.no_grad():
        model.eval()

        running_loss = 0.0
        total_predictions = 0.0
        correct_predictions = 0.0

        for batch_idx, (data, target) in enumerate(test_loader):   
            data = data.to(device)
            target = target.to(device)

            outputs = model(data)

            predicted = torch.max(outputs.data, 0)[1]
            total_predictions += target.size(0)
            correct_predictions += (predicted == target).sum().item()

            loss = criterion(outputs, predicted)
            running_loss += loss.item()
        running_loss /= len(test_loader)
        acc = (correct_predictions/total_predictions)*100.0
        print('Testing Loss: ', running_loss)
        print('Testing Accuracy: ', acc, '%')
        return running_loss, acc

In [None]:
epochs = 8 # use 5-10
train_losses = []
test_losses = []
test_accs = []

In [None]:
for epoch in range(epochs):
    print("Epoch ", epoch + 1)
    train_loss = train(mlp, train_loader, criterion, optimizer)
    train_losses.append(train_loss)
    if epoch > 2 and train_losses[epoch] < train_losses[epoch - 1] and train_losses[epoch] < train_losses[epoch - 2]:
        print("loss has not decreased, break")
        break
    print('=' * 20)

In [None]:
%debug

In [None]:
def predict(model, test_loader):
    with torch.no_grad():
        preds = []
        model.eval()
        for batch_idx, data in enumerate(test_loader):   
            data = data.to(device)
            outputs = model(data)
            predicted = torch.max(outputs.data, 1)[1]
            preds.append(predicted.data)
    return preds

In [None]:
results = predict(mlp, test_loader)

In [None]:
values = [d.item() for d in results]