# Convolutional Neural Network Filter Bank for Musical Instruments Classification
## Author: Renato de Castro Rabelo Profeta
### Date: October, 2020
#### Applied Media Systems Group, Ilmenau University of Technology, Germany

## Colab Runtime Configurations


In [None]:
# Install torchaudio
!pip install torchaudio -f https://download.pytorch.org/whl/torch_stable.html

## Imports and Configuration

In [None]:
# Imports

## Numerical Computing
import numpy as np
import itertools

## File System and Files Handling
import os
from zipfile import ZipFile

## Python Serialization
import pickle

## Audio Processing
import librosa.display, librosa
import torchaudio

## Plotting and Visualization
import matplotlib.pyplot as plt
import IPython.display as ipd
from IPython.core.display import HTML, display, Image

## Machine Learning ´
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.metrics import recall_score, precision_score, accuracy_score
from sklearn.metrics import confusion_matrix, f1_score, classification_report
from sklearn.metrics import mean_squared_error

## Deep Learning
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

In [None]:
# Seeds for reproducibility
torch.manual_seed(0)
np.random.seed(0)

In [None]:
# Configurations
## Create Directory for Checkpoints
!mkdir -p checkpoints

## Check CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Configure Tensorboard
%load_ext tensorboard

In [None]:
logs_base_dir = "runs"
os.makedirs(logs_base_dir, exist_ok=True)

## Download the dataset from Dropbox

In [None]:
# Download the dataset from Dropbox
!wget -O dataset.zip https://www.dropbox.com/s/su4rvaipccm1lit/all-samples_npy_pkl.zip?dl=0

## Extract .zip 

In [None]:
with ZipFile('dataset.zip', 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall()


## Get Dataset Metadata from GitHub

In [None]:
# Clone github in Google Colab
!git clone https://github.com/param1707/-Optimizing-a-neural-network-filter-bank-for-musical-instrument-classification-.git 

In [None]:
# Load Train Set
with open('./-Optimizing-a-neural-network-filter-bank-for-musical-instrument-classification-/datasetMetadata/train_set_dataframe.pkl', 'rb') as f:
  train_set = pickle.load(f)
#Load Test Set
with open('./-Optimizing-a-neural-network-filter-bank-for-musical-instrument-classification-/datasetMetadata/test_set_dataframe.pkl', 'rb') as f:
  test_set = pickle.load(f)

## Encode Labels

In [None]:
# Encode Labels Train Set
labelencoder = LabelEncoder()
labelencoder.fit(train_set['class'].values.tolist())
print(len(labelencoder.classes_), "classes:", ", ".join(list(labelencoder.classes_)))
classes_int_train = labelencoder.transform(train_set['class'].values.tolist())

#OneHotEncoding
encoder=OneHotEncoder(sparse=False, categories="auto")
onehot_labels_train=encoder.fit_transform(classes_int_train.reshape(len(classes_int_train),1))

In [None]:
# Encode Labels Test Set
labelencoder.fit(test_set['class'].values.tolist())
print(len(labelencoder.classes_), "classes:", ", ".join(list(labelencoder.classes_)))
classes_int_test = labelencoder.transform(test_set['class'].values.tolist())

#OneHotEncoding
encoder=OneHotEncoder(sparse=False, categories="auto")
onehot_labels_test=encoder.fit_transform(classes_int_test.reshape(len(classes_int_test),1))

## Create PyTorch Datasets for Training and Testing

In [None]:
# Dataset Class
class dataset(Dataset):
    """An abstract class representing a Dataset.

    Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau Germany
    """
    def __init__(self, files, labels):
        self.labels = labels
        self.files = files

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
      x_numpy = np.load(self.files[index])
      X = torch.from_numpy(x_numpy)
      X /= torch.abs(X).max()  # Normalize
      X = torch.reshape(X,(1,-1)) # Reshape for Model
      y_labels = torch.tensor(self.labels[index]) #Labels Out
      return X,y_labels

In [None]:
# Create Sets
train_set_torch = dataset(train_set['filename'].values.tolist(), onehot_labels_train)
test_set_torch = dataset(test_set['filename'].values.tolist(), onehot_labels_test)

In [None]:
# Data Loader.
training_generator = DataLoader(train_set_torch, batch_size=1, shuffle=False, num_workers=0)
validation_generator = DataLoader(test_set_torch, batch_size=1, shuffle=False, num_workers=0)

### Testing Dataset and Dataloader

In [None]:
# Get a random audio file and label from dataset
dataiter = iter(DataLoader(train_set_torch, batch_size=1, shuffle=True, num_workers=0))
audio_to_test, label_to_test = dataiter.next()

In [None]:
# Calculate Spectrogram of the audio file to test
specgram = torchaudio.transforms.Spectrogram(n_fft=2048)(audio_to_test[0,0,:])


In [None]:
# Plot Spectrogram of audio to test
plt.figure(figsize=(10,6))
plt.imshow(20*specgram.log10().numpy(), cmap='gray')
plt.grid()

In [None]:
# Plot Waveform of audio to test
plt.figure(figsize=(10,6))
plt.plot(audio_to_test[0,0,:])
plt.grid()

In [None]:
# Listen to audio
ipd.Audio(audio_to_test[0,0,:], rate=44100) # load a local WAV file

## PyTorch Analysis Filter Bank / Encoder Model

In [None]:
# Model Parameters
in_channels = 1
n_subbands = 1024
filter_length = 2048
downsampling = 1024
padding = 1024

In [None]:
# Analysis Filter Bank / Encoder
class Encoder(torch.nn.Module):
  """ Convolutional Neural Network Analysis Filter Bank for Musical Instruments Classification

  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  def __init__(self, in_channels=1,n_subbands=1024,filter_length=2048,downsampling = 1024, padding=1024, bias=False):
    super(Encoder, self).__init__()
    # Parameters
    self.in_channels = in_channels
    self.n_subbands = n_subbands
    self.filter_length=filter_length
    self.downsampling=downsampling
    self.padding = padding
    self.bias = bias
    # Layers
    self.conv1 = torch.nn.Conv1d(in_channels=self.in_channels, out_channels=self.n_subbands, kernel_size=self.filter_length, 
                                 stride=self.downsampling, padding=self.padding, bias=self.bias)

  def forward(self, x):
    x = self.conv1(x)
    return x

### Weights Initialization

In [None]:
# Weights Initialization Function
def weights_init(m):
    if isinstance(m, nn.Conv1d):
        torch.nn.init.xavier_uniform_(m.weight.data)
        #torch.nn.init.zeros_(m.bias.data)
    if isinstance(m, nn.ConvTranspose1d):
        torch.nn.init.xavier_uniform_(m.weight.data)
        #torch.nn.init.zeros_(m.bias.data)
    if isinstance(m, nn.Conv2d):
        torch.nn.init.xavier_uniform_(m.weight.data)
        torch.nn.init.zeros_(m.bias.data)
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight.data)
        torch.nn.init.zeros_(m.bias.data)

### Testing Encoder before Training

In [None]:
# Create Analysis Filter Bank model
analysisFB = Encoder()
# Initialize weights
analysisFB.apply(weights_init)
# Send to Device
analysisFB.to(device)

In [None]:
test_encoder = analysisFB(audio_to_test.to(device))
print('Input Shape', audio_to_test.shape)
print('Output Shape', test_encoder.shape)

In [None]:
# Display Number of Trainable Parameters
pytorch_total_params = sum(p.numel() for p in analysisFB.parameters() if p.requires_grad)
print("Number of Trainable Parameters:",pytorch_total_params)

In [None]:
# Plot of encoded audio to test
plt.figure(figsize=(10,6))
plt.imshow(test_encoder[0,:,:].detach().cpu().numpy(), cmap='gray')
plt.grid()

## PyTorch Synthesis Filter Bank / Decoder

In [None]:
# Model Parameters
out_channels = 1
n_subbands = 1024
filter_length = 2048
upsampling = 1024
dilatation=1
padding = 1

In [None]:
 # Decoder
class Decoder(torch.nn.Module):
  """ Convolutional Neural Network Analysis Filter Bank for Musical Instruments Classification

  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  def __init__(self, n_subbands=1024,out_channels=1, filter_length=2048, umpsampling=1024, dilatation=1, padding=1, bias=False):
    super(Decoder, self).__init__()
    self.n_subbands = n_subbands
    self.out_channels = out_channels
    self.filter_length=filter_length
    self.upsampling=upsampling
    self.dilatation=dilatation
    self.padding=padding
    self.bias=bias
    self.conv1 = torch.nn.ConvTranspose1d(in_channels=self.n_subbands, out_channels=self.out_channels , kernel_size=self.filter_length, 
                                          stride=self.upsampling, dilation=self.dilatation, padding=self.padding, bias=self.bias)
    
  def forward(self, x):
    x = self.conv1(x)
    return x

### Testing Decoder before Training

In [None]:
# Create Synthesis Filter Bank Model
synthesisFB = Decoder()
# Initialize weights
synthesisFB.apply(weights_init)
# Send to Device
synthesisFB.to(device)

In [None]:
# Display Number of Trainable Parameters
pytorch_total_params = sum(p.numel() for p in synthesisFB.parameters() if p.requires_grad)
print("Number of Trainable Parameters:",pytorch_total_params)

In [None]:
# Test
test_decoder = synthesisFB(test_encoder)
print('Input Shape: (Encoded)', test_encoder.shape)
print('Output Shape: (Decoded)', test_decoder.shape)
print("Audio_to_test Shape", audio_to_test.shape)

In [None]:
# Create Synthesis Filter Bank Model
synthesisFB = Decoder()
# Initialize weights
synthesisFB.apply(weights_init)
synthesisFB.to(device)

In [None]:
# Calculate Spectrogram of the reconstructed audio
specgram_decoded_audio = torchaudio.transforms.Spectrogram()(test_decoder[0,0,:].to('cpu'))

In [None]:
# Plot Spectrogram of reconstructed audio
plt.figure(figsize=(10,6))
plt.imshow(20*specgram_decoded_audio.log10().detach().numpy(), cmap='gray')
plt.grid()

In [None]:
# Plot Waveform of audio to test
plt.figure(figsize=(10,6))
plt.plot(test_decoder[0,0,:].detach().cpu().numpy())
plt.grid()

In [None]:

display(ipd.Audio(test_decoder[0,0,:].detach().cpu().numpy(), rate=44100))

## PyTorch CNN Classification Model

In [None]:
# Model Parameters
n_classes=20

In [None]:
class classificationModel(torch.nn.Module):
  """ CNN CLassifier for an Analysis Filter Bank for Musical Instruments Classification
  
  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  def __init__(self, n_classes=20):
    super(classificationModel, self).__init__()
    self.n_classes=n_classes
    
    self.conv1 = torch.nn.Conv2d(1, 1, kernel_size=3, stride=1, padding=1)
    self.pool1 = torch.nn.MaxPool2d(2)
    self.dropout1 = torch.nn.Dropout(p=0.4)
    self.conv2 = torch.nn.Conv2d(1, 1, kernel_size=3, stride=1, padding=1)
    self.dropout2 = torch.nn.Dropout(p=0.4)
    self.adpavpool = torch.nn.AdaptiveAvgPool2d((8,8))
    self.fc1 = torch.nn.Linear(8*8, 32)
    self.fc2 = torch.nn.Linear(32, self.n_classes)
    
  def forward(self, x):
    x = x.unsqueeze(0)
    x = self.conv1(x)
    x = self.pool1(x)
    x = self.dropout1(x)
    x = self.conv2(x)
    x = self.dropout2(x)
    x = self.adpavpool(x)
    x = x.view(x.size()[0], -1)
    x = torch.sigmoid(self.fc1(x))
    x = F.softmax(self.fc2(x), dim=-1)
    return x

### Test Classification Model before Training

In [None]:
# Create Classification Model
classify = classificationModel()
# Initialize weights
classify.apply(weights_init)
# Send to Device
classify.to(device)

In [None]:
# Display Number of Trainable Parameters
pytorch_total_params = sum(p.numel() for p in classify.parameters() if p.requires_grad)
print("Number of Trainable Parameters:",pytorch_total_params)

In [None]:
# Test
test_classify = classify(test_encoder)
print("Predicted Class:", np.argmax(test_classify.detach().cpu().numpy()))
print('Correct Class', np.argmax(label_to_test.detach().cpu().numpy()))
print("Output of Classifier Shape:", test_classify.shape)

## PyTorch CNN Autoencoder Model with Classification

In [None]:
# Audoencoder
class Autoencoder(nn.Module):
  """ CNN Autoencoder with an Embedded CLassifier for Musical Instruments Classification
  
  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  
  def __init__(self, encoder, classifier, decoder):
    super(Autoencoder, self).__init__()
    self.encoder = encoder
    self.classifier = classifier
    self.decoder = decoder
    
  def forward(self, x):
    encoded = self.encoder(x)
    classified = self.classifier(encoded)
    decoded = self.decoder(encoded)
    return encoded, decoded, classified

### Testing Autoencoder before Training

In [None]:
# Create Autoencoder Model
autoencoder = Autoencoder(analysisFB,classify,synthesisFB)
# Send to Device
autoencoder.to(device)

In [None]:
# Test
test_encoded, test_output, test_type = autoencoder(audio_to_test.to(device))
print('Encoded Shape', test_encoded.shape)
print('Decoded Shape', test_output.shape)
print('Predicted Class', np.argmax(test_type.detach().cpu().numpy()))

## Define a Loss Function

In [None]:
class categorical_cross_entropy(nn.Module):
    ''' Categorical Cross Entropy similar to Keras/TensorFlow
        "Categorical crossentropy between an output tensor and a target tensor" - https://www.tensorflow.org/api_docs/python/tf/keras/backend/categorical_crossentropy
        https://github.com/tensorflow/tensorflow/blob/r1.13/tensorflow/python/keras/backend.py
        target: A tensor of the same shape as `inputX`.
        inputX: A tensor resulting from a softmax

        Ported to PyTorch by Renato Profeta, October 2020
    '''
    def __init__(self):
        super(categorical_cross_entropy, self).__init__()

    def forward(self, inputX, target):
        eps=1e-10
        tmp = inputX.clone()
        tmp /= torch.sum(tmp)
        torch.clamp_(tmp, min=eps, max = 1-eps)
        return torch.mean(-torch.sum(target * torch.log(tmp.double()), dim=-1), dim=-1)

## Set a Loss Function and an Optimizer for the Experiment

In [None]:
# Loss Functions
loss_classification = categorical_cross_entropy()
loss_decoder = torch.nn.MSELoss()

# Optimizer
lr=0.01
optimizer = optim.SGD(autoencoder.parameters(), lr=lr)
#optimizer = optim.Adagrad(autoencoder.parameters())


## Auxiliary Functions for Progress Monitoring

In [None]:
# https://colab.research.google.com/drive/11v_mM2ImWdKDs_4qkoB9TdsQiPgVeeo2#scrollTo=NQgIwI5WC-07        
class ProgressMonitor(object):
    """
    Custom IPython progress bar
    """
    
    tmpl_train ="""
        <p> Train Epoch: {epoch} / {num_epochs} <br>
        Step: {value} / {length} - Train <br>
        Loss Classification: {loss_classify:0.4f} / Accuracy Classification: {accuracy_classify:0.4f} <br> 
        Loss Decoder: {loss_decoder:0.4f} <br>
        Total Loss: {loss_total:0.4f}</p>
        <progress value='{value}' max='{length}', style='width: 100%'>{value}</progress>
        <br>"""
    
    tmpl_test= """
        <p>Test Epoch: {epoch} / {num_epochs} <br>
        Step: {value} / {length} - Test <br>
        Loss Classification: {loss_classify:0.4f} / Accuracy Classification: {accuracy_classify:0.4f} <br> 
        Loss Decoder: {loss_decoder:0.4f} <br>
        Total Loss: {loss_total:0.4f}</p> </p>
        <progress value='{value}' max='{length}', style='width: 100%'>{value}</progress>
        <br>"""

    def __init__(self, length, mode):
        self.length = length
        self.count = 0
        self.mode = mode
        self.display = display(self.html(0, 0, 0, 0, 0, 0, 0, mode), display_id=True)   
        
    def html(self, count, epoch, num_epochs, loss_classify, accuracy_classify, loss_decoder, loss_total,mode="train"):
        if mode=="train":
            return HTML(self.tmpl_train.format(length=self.length, value=count, epoch=epoch, num_epochs=num_epochs, 
                                               loss_classify=loss_classify, accuracy_classify=accuracy_classify,
                                               loss_decoder=loss_decoder,loss_total=loss_total))
        else:
            return HTML(self.tmpl_test.format(length=self.length, value=count, epoch=epoch, num_epochs=num_epochs, 
                                               loss_classify=loss_classify, accuracy_classify=accuracy_classify,
                                               loss_decoder=loss_decoder, loss_total=loss_total))
            
    def update(self, count, epoch, num_epochs, loss_classify, accuracy_classify,loss_decoder,loss_total,mode="train"):
        self.count += count
        self.display.update(self.html(self.count, epoch, num_epochs, loss_classify, accuracy_classify, loss_decoder, loss_total, mode))

In [None]:
# https://colab.research.google.com/drive/1gJAAN3UI9005ecVmxPun5ZLCGu4YBtLo#scrollTo=ZvoPaJvs7Eem
class AverageBase(object):
    
    def __init__(self, value=0):
        self.value = float(value) if value is not None else None
       
    def __str__(self):
        return str(round(self.value, 4))
    
    def __repr__(self):
        return self.value
    
    def __format__(self, fmt):
        return self.value.__format__(fmt)
    
    def __float__(self):
        return self.value
    

class RunningAverage(AverageBase):
    """
    Keeps track of a cumulative moving average (CMA).
    """
    
    def __init__(self, value=0, count=0):
        super(RunningAverage, self).__init__(value)
        self.count = count
        
    def update(self, value):
        self.value = (self.value * self.count + float(value))
        self.count += 1
        self.value /= self.count
        return self.value

class MovingAverage(AverageBase):
    """
    An exponentially decaying moving average (EMA).
    """
    
    def __init__(self, alpha=0.99):
        super(MovingAverage, self).__init__(None)
        self.alpha = alpha
        
    def update(self, value):
        if self.value is None:
            self.value = float(value)
        else:
            self.value = self.alpha * self.value + (1 - self.alpha) * float(value)
        return self.value

## Auxiliary Functions to save Model Parameters

In [None]:
# Save a Model
def save_checkpoint(optimizer, loss_classification,loss_decoder, model, epoch, filename):
    global bestm
    checkpoint_dict = {
        'optimizer': optimizer.state_dict(),
        'model': model.state_dict(),
        'epoch': epoch,
        'loss_classification': loss_classification.state_dict(),
        'loss_decoder': loss_decoder.state_dict()
    }

    torch.save(checkpoint_dict, filename)

# Load a Model
def load_checkpoint(optimizer, loss_classification, loss_decoder, model, filename):
    checkpoint_dict = torch.load(filename)
    epoch = checkpoint_dict['epoch']
    model.load_state_dict(checkpoint_dict['model'])
    loss_classification.load_state_dict(checkpoint_dict['loss_classification'])
    loss_decoder.load_state_dict(checkpoint_dict['loss_decoder'])
    if optimizer is not None:
        optimizer.load_state_dict(checkpoint_dict['optimizer'])
    return epoch

## Function to Train a Model

In [None]:
# Training Function
def trainModel(model, epoch, num_epochs, monitoring=True):
  """ Function to Train a Model

  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  if monitoring:
    progress = ProgressMonitor(length=len(train_set), mode="train")
  
  # Initialize Metrics
  train_loss_classify = MovingAverage()
  train_loss_decoder = MovingAverage()
  train_loss_total = MovingAverage()
  train_acc = MovingAverage()
    
  # Train Stage
  model.train()
    
  # keep track of X predictions amd metrics
  x_pred = []

  for i, (batch, targets) in enumerate(training_generator):
    # Move the training data to the GPU
    batch = batch.to(device)
    targets = targets.to(device)

    # clear previous gradient computation
    optimizer.zero_grad()

    # forward propagation
    encoded, decoded, classified = model(batch)

    # Check the array with higher length
    if decoded.size() > batch.size():
      target = torch.zeros(decoded.shape).to(device)
      target[0,0,:batch.shape[-1]]=batch[0,0,:]
    else:
      target = batch[:,:,:decoded.shape[-1]]

    #target=target.to(device)
    # calculate the loss
    loss1 = loss_classification(classified, targets)
    loss2 = loss_decoder(decoded, target)

    loss = w_class*loss1 + w_decoder*loss2
    # backpropagate to compute gradients
    loss.backward()

    # update model weights
    optimizer.step()

    # update average loss
    train_loss_classify.update(loss1)
    train_loss_decoder.update(loss2)
    train_loss_total.update(loss)
            
    # save X predictions
    x_pred.extend(classified.argmax(dim=-1).cpu().numpy())
        
    # calculate accuracy
    x_pred_torch = torch.tensor(x_pred, dtype=torch.int64)
    accuracy = torch.mean((x_pred_torch == torch.tensor(np.argmax(onehot_labels_train[:len(x_pred)],axis=1), dtype=torch.int64)).float())
        
    # update average accuracy
    train_acc.update(accuracy)
            
    # Update Progress Bar
    if monitoring:
      progress.update(batch.shape[0], epoch, num_epochs, train_loss_classify, train_acc, train_loss_decoder, train_loss_total, mode="train")
  # Save a checkpoint
  checkpoint_filename = 'checkpoints/AutoencoderInstrumetClassif-{:03d}.pkl'.format(epoch)
  save_checkpoint(optimizer, loss_classification, loss_decoder, model, epoch, checkpoint_filename)
  return train_loss_classify.value, train_acc.value , train_loss_decoder.value, train_loss_total.value, x_pred

## Funtion to Test a Model

In [None]:
def testModel(model, epoch, num_epochs, monitoring=True):
  """ Function to Test a Model

  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  if monitoring:
    #Create a Progress Bar
    progress = ProgressMonitor(length=len(test_set), mode="test")
    
    
  # validation phase
  model.eval()
    
  # Initialize Metrics
  test_loss_classify = MovingAverage()
  test_loss_decoder = MovingAverage()
  test_loss_total = MovingAverage()
  test_acc = MovingAverage()

  # keep track of predictions
  y_pred = []
    
  with torch.no_grad():
    for batch, targets in validation_generator:
      
      # Move the training batch to the GPU
      batch = batch.to(device)
      targets = targets.to(device)

      # forward propagation
      encoded, decoded, classified = model(batch)

      # Check the array with higher length
      if decoded.size() > batch.size():
        target = torch.zeros(decoded.shape).to(device)
        target[0,0,:batch.shape[-1]]=batch[0,0,:]
      else:
        target = batch[:,:,:decoded.shape[-1]]

      # calculate the loss
      loss1 = loss_classification(classified, targets)
      loss2 = loss_decoder(decoded, target)

      loss = w_class*loss1 + w_decoder*loss2

      # update average loss
      test_loss_classify.update(loss1)
      test_loss_decoder.update(loss2)
      test_loss_total.update(loss)

      # save predictions
      y_pred.extend(classified.argmax(dim=1).cpu().numpy())

      # Validation Accuracy
      y_pred_torch = torch.tensor(y_pred, dtype=torch.int64)
      accuracy = torch.mean((y_pred_torch == torch.tensor(np.argmax(onehot_labels_test[:len(y_pred)],axis=1), dtype=torch.int64)).float())
      test_acc.update(accuracy)

      # Update Progress Bar
      if monitoring:
        progress.update(batch.shape[0], epoch, num_epochs, test_loss_classify, test_acc, test_loss_decoder, test_loss_total, mode="test")

        
    return test_loss_classify.value, test_loss_decoder.value, test_loss_total.value, test_acc.value, y_pred

## Experiment Parameters

In [None]:
# Training Parameters

n_epochs = 2

# Weights for Loss Functions
w_class = 1
w_decoder=100

In [None]:
writer = SummaryWriter()
writer.add_graph(autoencoder,audio_to_test.to(device) )
writer.flush()

In [None]:
%tensorboard --logdir {logs_base_dir}

## Function to Run an Experiment

In [None]:
def experiment(model, num_epochs = n_epochs, first_epoch = 1, monitoring=True, test_monitor=True):
  """ Function to Test a Model

  Author: Renato de Castro Rabelo Profeta, October 2020, TU Ilmenau, Germany
  """
  
  #Initialze Metrics
  train_losses = []
  train_accuracies = []
  train_losses_decoder = []
  train_losses_total = []
  valid_losses = []
  valid_accuracies = []
  valid_losses_decoder = []
  valid_losses_total=[]
  valid_predictions = []
  
  for epoch in range(first_epoch, num_epochs+1):
    #Train Model
    train_loss, train_acc, train_loss_decoder, train_loss_total, x_pred =trainModel(model, epoch, num_epochs, monitoring)
    #Save Test Losses and Accuracies
    train_losses.append(train_loss)
    train_losses_decoder.append(train_loss_decoder)
    train_losses_total.append(train_loss_total)
    train_accuracies.append(train_acc)

    if test_monitor:
      #Test Model
      valid_loss, test_loss_decoder, test_loss_total, valid_acc, y_pred = testModel(model, epoch, num_epochs, monitoring)
    else:
      valid_loss=0
      test_loss_decoder=0
      test_loss_total=0
      valid_acc = 0
      y_pred = 0
      
    #Save Test Losses and Accuracies
    valid_losses.append(valid_loss)
    valid_losses_decoder.append(test_loss_decoder)
    valid_losses_total.append(test_loss_total)
    valid_accuracies.append(valid_acc)
    valid_predictions.append(y_pred)


    writer.add_scalars('Losses', {'Train Classification Loss':train_loss,
                                    'Test Classification Loss':valid_loss,
                                    'Train Decoder Loss*w':train_loss_decoder*w_decoder,
                                    'Test Decoder Loss*w':test_loss_decoder*w_decoder,
                                    'Train Total Loss':train_loss_total,
                                    'Test Total Loss': test_loss_total}, epoch)
    
    writer.add_scalars('Classification Accuracy', {'Train Classification ':train_acc,
                                    'Test Classification':valid_acc}, epoch)

    writer.flush()

  writer.close()              
  return {"train_loss": train_losses, "train_loss_decoder": train_losses_decoder, "train_loss_total": train_losses_total, "train_accuracies": train_accuracies,
            "valid_loss": valid_losses, "valid_loss_decoder": valid_losses_decoder, "valid_loss_total": valid_losses_total, "valid_accuracies": valid_accuracies,
            "valid_predictions":  valid_predictions}

## Run an Experiment

In [None]:
# Run Experiment
hist = experiment(autoencoder, num_epochs=n_epochs, monitoring=True, test_monitor=True)

## Resume an Experiment from a saved Checkpoint

In [None]:
n_epochs=4
epoch = load_checkpoint(optimizer, loss_classification, loss_decoder, autoencoder, './checkpoints/AutoencoderInstrumetClassif-002.pkl')
print('Resuming training from epoch', epoch)
hist = experiment(autoencoder, num_epochs=n_epochs, first_epoch=epoch+1, test_monitor=True, monitoring=True)

## Experiment Results

In [None]:
epochs = range(1, len(hist["train_loss"]) + 1)

plt.figure(figsize=(10,6))
plt.plot(epochs, hist["train_loss"], '-o', label='Training loss Classification')
plt.plot(epochs, hist["valid_loss"], '-o', label='Validation loss CLassification')
plt.legend()
plt.title('Learning curves')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.xticks(epochs)
plt.grid()


plt.figure(figsize=(10,6))
plt.plot(epochs, hist["train_loss_decoder"], '-o', label='Training loss Decoder')
plt.plot(epochs, hist["valid_loss_decoder"], '-o', label='Validation loss Decoder')
plt.legend()
plt.title('Learning curves')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.xticks(epochs)
plt.grid()

plt.figure(figsize=(10,6))
plt.plot(epochs, hist["train_loss_total"], '-o', label='Training loss Total')
plt.plot(epochs, hist["valid_loss_total"], '-o', label='Validation loss Total')
plt.legend()
plt.title('Learning curves')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.xticks(epochs)
plt.grid()


plt.figure(figsize=(10,6))
plt.plot(epochs, hist["train_accuracies"], '-o', label='Training accuracy Classification')
plt.plot(epochs, hist["valid_accuracies"], '-o', label='Validation accuracy Classification')
plt.legend()
plt.title('Learning curves')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.xticks(epochs)
plt.grid()

In [None]:
# Number of Validation Classification Errors
num_errors = torch.sum((torch.tensor(hist["valid_predictions"]) != torch.tensor(np.argmax(onehot_labels_test,axis=1))).float())
print('Validation errors {} (out of {})'.format(int(num_errors), len(test_set)))

In [None]:
# Validation Classification Mistakes
error_indicator = torch.tensor(hist["valid_predictions"]) != torch.tensor(np.argmax(onehot_labels_test,axis=1))
print("Wrongly predicted Audio Files", test_set['filename'].values[np.where(error_indicator[0,:])])
print("Correct Classes:", test_set['class'].values[np.where(error_indicator[0,:])])
print("Predicted as:",labelencoder.inverse_transform(np.ravel(np.array(hist["valid_predictions"])[np.where(error_indicator)])))

In [None]:
# Back to Labels
predictions_labels=labelencoder.inverse_transform(np.ravel(hist["valid_predictions"]));

In [None]:
# Recall - the ability of the classifier to find all the positive samples
print("Recall: ", recall_score(classes_int_test, np.array(hist["valid_predictions"])[0,:],average=None))

# Precision - The precision is intuitively the ability of the classifier not to 
#label as positive a sample that is negative
print("Precision: ", precision_score(classes_int_test, np.array(hist["valid_predictions"])[0,:],average=None, zero_division=0))

# F1-Score - The F1 score can be interpreted as a weighted average of the precision 
#and recall
print("F1-Score: ", f1_score(classes_int_test, np.array(hist["valid_predictions"])[0,:], average=None))

# Accuracy - the number of correctly classified samples
print("Accuracy: %.2f  ," % accuracy_score(classes_int_test, np.array(hist["valid_predictions"])[0,:] ,normalize=True), accuracy_score(classes_int_test, np.array(hist["valid_predictions"])[0,:],normalize=False) )
print("Number of samples:",classes_int_test.shape[0])

print(classification_report(classes_int_test, np.array(hist["valid_predictions"])[0,:], zero_division=0))

In [None]:
# Compute confusion matrix
cnf_matrix = confusion_matrix(classes_int_test, np.array(hist["valid_predictions"])[0,:])
np.set_printoptions(precision=2)

In [None]:
# Function to Plot Confusion Matrix
# http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
    """
    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# Plot non-normalized confusion matrix
plt.figure(figsize=(16,12))
plot_confusion_matrix(cnf_matrix, classes=labelencoder.classes_,
                      title='Confusion matrix, without normalization')

### Playback of Audio Files after Training


In [None]:
# Test
test_encoded, test_output, test_type = autoencoder(audio_to_test.to(device))
print('Original Shape', audio_to_test.shape)
print('Encoded Shape', test_encoded.shape)
print('Decoded Shape', test_output.shape)
print('Predicted Class', np.argmax(test_type.detach().cpu().numpy()))
print('Correct Class', np.argmax(label_to_test.detach().cpu().numpy()))
print('MSE', mean_squared_error(audio_to_test[0,0,:], test_output[0,0,:audio_to_test.shape[-1]].detach().cpu()))


In [None]:
# Plot Waveform of audio to test
plt.figure(figsize=(10,6))
plt.plot(audio_to_test[0,0,:])
plt.plot(test_output[0,0,:].detach().cpu().numpy())
plt.grid()

In [None]:
display(ipd.Audio(test_output[0,0,:].detach().cpu().numpy(), rate=44100))

In [None]:
# Plot of encoded audio to test
plt.figure(figsize=(10,6))
plt.imshow(test_encoded[0,:,:].detach().cpu().numpy(), cmap='gray')
plt.grid()

In [None]:
# Get a random audio file and label from dataset
dataiter = iter(DataLoader(test_set_torch, batch_size=1, shuffle=True, num_workers=0))
audio_to_test, label_to_test = dataiter.next()

In [None]:
# Test
test_encoded, test_output, test_type = autoencoder(audio_to_test.to(device))
print('Original Shape', audio_to_test.shape)
print('Encoded Shape', test_encoded.shape)
print('Decoded Shape', test_output.shape)
print('Predicted Class', np.argmax(test_type.detach().cpu().numpy()))
print('Correct Class', np.argmax(label_to_test.detach().cpu().numpy()))
print('MSE', mean_squared_error(audio_to_test[0,0,:], test_output[0,0,:audio_to_test.shape[-1]].detach().cpu()))


In [None]:
# Plot Waveform of audio to test
plt.figure(figsize=(10,6))
plt.plot(audio_to_test[0,0,:])
plt.plot(test_output[0,0,:].detach().cpu().numpy())
plt.grid()

In [None]:
# Calculate Spectrogram of the audio file to test
specgram = torchaudio.transforms.Spectrogram(n_fft=2048)(audio_to_test[0,0,:])


In [None]:
# Plot Spectrogram of audio to test
plt.figure(figsize=(6,12))
plt.subplot(1,2,1)
plt.imshow(specgram.numpy(), cmap='gray')
plt.colorbar()
plt.grid()
# Plot of encoded audio to test
plt.subplot(1,2,2)
plt.imshow(test_encoded[0,:,:].detach().cpu().numpy(), cmap='gray')
plt.colorbar()
plt.grid()

In [None]:
!ls checkpoints -l


In [None]:
#from google.colab import files
#files.download('./checkpoints/basicInstrumetClassif-382.pkl') 