### Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchvision.transforms as transforms
import torch.utils.data as data
import matplotlib.pyplot as plt
import IPython.display
from IPython.display import Audio
import torch.optim as optim
from types import SimpleNamespace
import scipy as sc

from trainDataset import TrainDataset
from testDataset import TestDataset

### Parameters

In [None]:
validation_split = 0.2
args = SimpleNamespace(batch_size=64, test_batch_size=64, epochs=3,
                       lr=0.01, momentum=0.5, seed=1, log_interval=200)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if not torch.cuda.is_available(): # adapt those paths on other machine
    print('no cuda')
    path_train = './../data/train-small/'
    path_test =  './../data/test/kaggle-test/'
else:
    print('with cuda')
    path_train = './../../../../challenge_data/kaggle-train/'
    path_test =  './../../../../challenge_data/kaggle-test/'
    
    
sample_rate = 16000

### Dataset and -loaders

In [None]:
# todo add in the classes the features and the fft data

toFloat = transforms.Lambda(lambda x: x / np.iinfo(np.int16).max)

trainDataset = TrainDataset(path_train, transform=toFloat)
print(len(trainDataset))

testDataset = TestDataset(path_test, transform=toFloat)
print(len(testDataset))

In [None]:
input_size = len(trainDataset[0][0])
print('input size: ',input_size)

In [None]:
# todo do a validation split here

train_loader = data.DataLoader(trainDataset, batch_size=args.batch_size, shuffle=True)
for samples, instrument_family_target in trainLoader:
        print(samples.shape, instrument_family_target.shape,
              instrument_family_target.data)
        print(torch.min(samples), torch.max(samples))
        print(trainDataset.transformInstrumentsFamilyToString(instrument_family_target.data))
        break

In [None]:
test_loader = data.DataLoader(testDataset, batch_size=args.batch_size, shuffle=False) #!!! shuffle should be false
for samples in testloader:
        print(samples.shape)
        print(torch.min(samples), torch.max(samples))
        break

### Look at Data

In [None]:
# how many instruments are there?
dummy_count = np.zeros(20)

for sample in trainDataset:
    dummy_count[sample[1]] += 1
    
labels_count = []
for elem in dummy_count:
    if elem != 0:
        labels_count.append(elem)
        
print(labels_count)

In [None]:
nmbr_classes = len(labels_count)
print('nmbr_classes: ', nmbr_classes)

In [None]:
plt.plot(labels_count, '*')

In [None]:
# plot one of each

done = np.zeros(nmbr_classes)
examples = []

for sample in trainDataset:
    if done[sample[1]] == 0:
        examples.append(sample)
        done[sample[1]] = 1

In [None]:
plt.subplot(431)
plt.plot(examples[0][0])

plt.subplot(432)
plt.plot(examples[1][0])

plt.subplot(433)
plt.plot(examples[2][0])

plt.subplot(434)
plt.plot(examples[3][0])

plt.subplot(435)
plt.plot(examples[4][0])

plt.subplot(436)
plt.plot(examples[5][0])

plt.subplot(437)
plt.plot(examples[6][0])

plt.subplot(438)
plt.plot(examples[7][0])

plt.subplot(439)
plt.plot(examples[8][0])

plt.subplot(4,3,10)
plt.plot(examples[9][0])

plt.show()

In [None]:
### todo plot spectrogram

In [None]:
for sample in examples:
    display(Audio(sample[0], rate=sample_rate))

### CNN Model

In [None]:
def logMagStft(numpyArray, sample_rate, n_fft):
    f, t, sx = sc.signal.stft(numpyArray, fs=sample_rate, nperseg=n_fft, noverlap=n_fft//2) 
    return np.log(np.abs(sx)+np.e**-10)

In [None]:
# NN architecture (three conv and two fully connected layers)
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.first_conv = nn.Conv2d(1, 20, 5, 1)
        self.second_conv = nn.Conv2d(20, 50, 5, 2)
        self.third_conv = nn.Conv2d(50, 50, 5, 2)
        self.fc1 = nn.Linear(50*6*6, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        n_fft = 510
    
        spectrograms = np.zeros((len(x), n_fft//2+1, int(2*64000/n_fft)+2))
        for index, audio in enumerate(x.cpu().numpy()):
            spectrograms[index] = logMagStft(audio, 16000, n_fft)
        
        x = torch.from_numpy(spectrograms[:, np.newaxis, :, :]).to(device).float()
        
        # x.size is (batch_size, 1, 256, 252)
        x = F.relu(self.first_conv(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.second_conv(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.third_conv(x))
        x = F.max_pool2d(x, 2, 2)
        # x.size is (batch_size, 50, 6, 6)
        x = x.view(-1, 6*6*50)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

### Train Funktion

In [None]:
# This function trains the model for one epoch
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

### Test Funktion

In [None]:
# This function evaluates the model on the test data
def test(args, model, device, test_loader, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        familyPredictions = np.zeros(len(test_loader.dataset), dtype=np.int)
        for index, samples in enumerate(test_loader):
            samples = samples.to(device)
            familyPredictions[index*len(samples):(index+1)*len(samples)] = model(samples).max(1)[1].cpu() # get the index of the max log-probability
    
    familyPredictionStrings = trainDataset.transformInstrumentsFamilyToString(familyPredictions.astype(int))

    with open('NN-submission-' +str(epoch)+'.csv', 'w', newline='') as writeFile:
        fieldnames = ['Id', 'Expected']
        writer = csv.DictWriter(writeFile, fieldnames=fieldnames, delimiter=',',
                                quotechar='|', quoting=csv.QUOTE_MINIMAL)
        writer.writeheader()
        for index in range(len(testDataset)):
            writer.writerow({'Id': index, 'Expected': familyPredictionStrings[index]})
    print('saved predictions')

### Main

In [None]:
# Main
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=args.lr, 
                      momentum=args.momentum)

for epoch in range(1, args.epochs + 1):
    train(args, model, device, train_loader, optimizer, epoch)
    test(args, model, device, test_loader, epoch)
