# IPYNB controler

We provide few switch that activate/desactivate part of the IPYNB notebook.
This control are usefull when the notebook is run by a shell command, or when using the FULL run button of Jupyter/Colaboratory.

---

To run this notebook in background, you can use
```python
runipy mynotebook.ipybn
``` 
and
```python
jupyter nbconvert --to notebook --execute mynotebook.ipynb --output mynotebook.ipynb
```

In [None]:
build_network = False # Is the notebook run by the network building script ?
activate_tensorboard = False # Show tensorboard in the cell
prepare_dataset = True # Should we prepare the datasets if they do not exsits
erase_present_datasets = False # Should we erase and download datasets again ?
install_with_pip = False # Should we install dependecies with pip ?
update_git = True # Should we update the ssh keys and git repo ?
reduce_dataset_size = False # Should we reduce dataset size for debug ?
show_graphs = True # Should we display some of the data as pyplot graphs ?

# Environment setup

Select the GPU

In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "1" # 1

Set up image display for this notebook

In [None]:
%matplotlib inline

# Make pyplots BIGGERS!
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [12, 5]

In [None]:
if activate_tensorboard:
  if not build_network:
    !yes | pip3 uninstall tb-nightly tensorboardX tensorboard
    !pip3 install tensorboard pytorch-lightning
    
    !pkill tensorboard
    !yes | wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
    #!yes | unzip ngrok-stable-linux-amd64.zip
    LOG_DIR = "./runs"
    get_ipython().system_raw(
      'tensorboard --logdir {} --host 0.0.0.0 --port 6006 &'
      .format(LOG_DIR)
     )
    #get_ipython().system_raw('./ngrok http 6006 &')
    #! curl -s http://localhost:4040/api/tunnels | python3 -c \
    #  "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
    ! hostname -I | awk '{print "http://"$1":6006"}'

##### Download datasets
Download the 3 datasets if not already available on this machine.

In [None]:
import os.path

# Configure dataset names and directory
voice_dataset = "MonophonicVoiceDataset"
sampled_dataset = "MonophonicSampleBasedDataset"
synth_dataset = "MonophonicSynthDataset"
voice_dataset_directory = "../" + voice_dataset
sampled_dataset_directory = "../" + sampled_dataset
synth_dataset_directory = "../" + synth_dataset

# Set the build_id to retrieve datasets
build_id = "20191226-d5d7e5763e7cf589fd2c21aafe82f7223cce48a8-6bebfdcd32c948860969c251c36e0ab6d26d6b89"
#build_id = "20191227-e6c7f7f42cda7f8f2ab053b4a06c7d52c4d68c9e-6bebfdcd32c948860969c251c36e0ab6d26d6b89"


if prepare_dataset:
  dataset_list = [voice_dataset, sampled_dataset, synth_dataset]

  if erase_present_datasets:
    for dataset_name in dataset_list:
      !rm -rf $dataset_name $dataset_name".zip"

  for dataset_name in dataset_list:
    if not os.path.isdir("../" + dataset_name):
      if not os.path.isfile("../" + dataset_name + ".zip"):
        print("No ", dataset_name + ".zip")
        !cp "../dataset_builds/"$dataset_name"-"$build_id".zip" "../"$dataset_name".zip"
      !yes | unzip "../"$dataset_name".zip" -d "../"$dataset_name
    else:
      print(f"{dataset_name} already available :)")


Import and install required libraries

In [None]:
if install_with_pip:
    !python3 -m pip install Cython
    !python3 -m pip install torchinfo pytorch-lightning "torchvision>=0.4" "torch>=1.4"
    !yes | sudo aptitude install libsndfile1-dev

import torch
import torch.nn as nn
import torch.nn.functional as fn
from torch.utils.data import DataLoader
import pytorch_lightning as pl
import numpy as np
import math

from torchinfo import summary

# Import path
import sys
sys.path.append('../')

# Import pitchnet
import pitchnet
from pitchnet import *
from pitchnet.dataset import *
from pitchnet.io import *
from pitchnet.model import *
from pitchnet.preprocess import *
from pitchnet.visualization import *

# Allow auto-reload of the pitchnet library at each cell
%reload_ext autoreload
# %autoreload 1
# %aimport pitchnet
# %aimport pitchnet.io
%autoreload 2

# Torch version:
print("torch.__version__ = ", torch.__version__)
print("Cuda = ", torch.cuda.is_available())


# Create and display dataset

The dataset contains 4 layers of informations (called frames) as input.
The output is the segmentation followed by a one hot vector of probability of the given midi number. The dimension is Batch x Time x 1+128 = 129.


In [None]:
def split_dataset(dataset, ratio):
    """
    Split a dataset into two parts.
    """
    from torch.utils.data import Subset

    split_index = int(ratio * len(dataset))
    range_a = range(len(dataset))[:split_index]
    range_b = range(len(dataset))[split_index:]
    return Subset(dataset, range_a), Subset(dataset, range_b)

#Datasets
synth_dataset = import_dataset(synth_dataset_directory)
voice_dataset = import_dataset(voice_dataset_directory)
sampled_dataset = import_dataset(sampled_dataset_directory)

# Truncate the length to the original 3000
synth_dataset.length = 3000

# Reduce size for faster training (debug):
if reduce_dataset_size:
    voice_dataset, _   = split_dataset(voice_dataset, 0.2)
    sampled_dataset, _ = split_dataset(sampled_dataset, 0.2)
    synth_dataset, _   = split_dataset(synth_dataset, 0.2)

# Merge the datasets into a single one
def merge_datasets(dt_list, separated_test=True):
    """
    Merge a list of datasets into a single dataset
    """
    val = []
    train = []
    test_list=[]
    # Filter train and val set for each dataset
    for dt in dt_list:
        training_dt, validation_dt = split_dataset(dt, 0.5)
        validation_dt, test_dt = split_dataset(validation_dt, 0.5)
        val += [validation_dt]
        train += [training_dt]
        test_list += [test_dt]
    # Glue dataset together
    train = torch.utils.data.ConcatDataset(train)
    test = torch.utils.data.ConcatDataset(test_list)
    val = torch.utils.data.ConcatDataset(val)
    
    if separated_test:
        return train, val, test, test_list
    else:
        return train, val, test

training_set, validation_set, test_set, test_list = merge_datasets(
    [synth_dataset, voice_dataset, sampled_dataset]
)
test_synth, test_voice, test_sampled = test_list


# Generators
params = {'batch_size': 16,
          'shuffle': True,
          'num_workers': 6}

training_generator = DataLoader(training_set, **params)
validation_generator = DataLoader(validation_set, **{**params, 'shuffle': False})

print("Training set length:", len(training_set))
print("Validation set length:", len(validation_set))
print("Test set length:", len(test_set))

In [None]:
len(synth_dataset), len(voice_dataset), len(sampled_dataset)

## Visualize data

Display the 3 channels of the input, and the expected midi notes.

In [None]:
def show_image_data(xy):
  x, y = xy
  x = np.array(x)
  y = np.array(y)
  
  # Outputs
  segmentation_width = y[:, 0] # shape = T
  segmentation_offset = y[:, 1] # shape = T
  pitch = y[:, 2:] # shape = Tx128
  pitch = list(map(onehot_to_pitch, pitch)) # shape = T
  
  # x shape is 3xTxS, we reshape to Tx3xS
  plt.rcParams['figure.figsize'] = [15, 8]
  def normalize(x):
    from pitchnet.model import NormalizeCTS
    return NormalizeCTS()(torch.Tensor(x)[None]).numpy()[0]
  visualize_frames(signal=pitch, frames=normalize(x.transpose(1, 0, 2)), min_norm_amplitude=1, apply_log=False)

def show_batch_data(dataset, number=2, seed=22):
  np.random.seed(seed)
  rand_idx = [np.random.randint(len(dataset)) for _ in range(number)]
  for i in rand_idx:
      xy = dataset.__getitem__(i)
      show_image_data(xy, show=False)

def show_batch_data_paper():
    print("Synth")
    xy = synth_dataset.__getitem__(0)
    show_image_data(xy)
    print("Voice")
    xy = voice_dataset.__getitem__(0)
    show_image_data(xy)
    print("Sample Based")
    xy = sampled_dataset.__getitem__(0)
    show_image_data(xy)

plt.rcParams['figure.figsize'] = [12, 8]
if show_graphs:
  # show_batch_data(voice_dataset, number=4, seed=7)
    show_batch_data_paper()

# Create model

Create a custom `model` inspired from resnet. The blocks are defined in the `pitchnet` library.

The model does NOT output the exact same format as the dataset!

* The output of the network is a LogProbability, whereas the output of dataset is a probability.
* The model output a confidence and a presence factor that aren't in the target data (since fast to compute)

In [None]:
# Compute dimensions (with batch first)
inshape = [16] + list(training_set.__getitem__(0)[0].shape)
print("Input  shape:", inshape)

# Free as much memory from the graphic card as possible
import gc
gc.collect();

# Build model
from pitchnet.model import build_model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = build_model(ablation_autocorrelation=True).to(device)
summary(model, inshape, depth=3)


### Loss function:

The loss takes as pitch a log probability (the prediction) and a probability or one hot vector (the target).

It computes the kullback leibler distance between the distributions and mse of segmentations.

In [None]:
from pitchnet.loss import *

# Debug of the loss function:
# (We have a look at the value of loss function between two uncorelated samples)
y1 = training_set.__getitem__(0)[1][None,]
y2 = training_set.__getitem__(3)[1][None,]

print("Loss between two samples:", loss_fn(y1, y2)[0].item())
del y1
del y2

# Model Training

First, we write a function that display graphs and curves to monitor the training.

The following function allow to log the layers of the actual model into tensorboard.

Convolutional network weigths are turned into pictures. Both the distribution of weights and bias are also available.

In [None]:
def log_model(model, tensorboard, step=0):
    body = model[2]
    head = model[3]
  
  # Normalise images for better visualization
    def norm(patchs):
      patchs = torch.abs(patchs)
      max_val = patchs.max(dim=3, keepdim=True)[0].max(dim=2, keepdim=True)[0]
      patchs = patchs / max_val
      return patchs
  
    # Log a 2d conv layer
    def log_conv2d(summarywritter, conv, name, idx=0):
        conv = conv.weight
        c_out, c_in, w, h = conv.shape
        # Log kernels as images
        if c_in == 3:
            patchs = conv.view(c_out, c_in, w, h)
            summarywritter.add_images(name + "_%d/Patches_RED"   % idx, norm(patchs[:,0:1,:,:]), global_step=step, dataformats='NCWH')
            summarywritter.add_images(name + "_%d/Patches_GREEN" % idx, norm(patchs[:,1:2,:,:]), global_step=step, dataformats='NCWH')
            summarywritter.add_images(name + "_%d/Patches_BLUE"  % idx, norm(patchs[:,2:3,:,:]), global_step=step, dataformats='NCWH')
            summarywritter.add_images(name + "_%d/Patches_RGB"  % idx, norm(patchs), global_step=step, dataformats='NCWH')
        elif w == 1:
            patchs = conv.view(c_out, 1, c_in, h)
            summarywritter.add_images(name + "_%d/Patches" % idx, norm(patchs), global_step=step, dataformats='NCWH')
        else:
            patchs = conv.view(c_out * c_in, 1, w, h)
            summarywritter.add_images(name + "_%d/Patches" % idx, norm(patchs), global_step=step, dataformats='NCWH')
        # Log kernels distributions
        vector = patchs.view(-1)
        summarywritter.add_histogram(name + "_%d/Weight_distribution" % idx, vector, global_step=step)
        # Log bias distributions
        try:
            vector = conv.bias.view(-1)
            summarywritter.add_histogram(name + "_%d/Bias_distribution" % idx, vector, global_step=step)
        except AttributeError:
            pass

  
    # Log a 1d conv layer
    def log_conv1d(summarywritter, layer, name="model"):
        conv = layer.weight
        c_out, c_in, n = conv.shape
        print(conv.permute(2, 0, 1).shape)
        # Log kernels as images
        patchs = conv.permute(2, 0, 1)[:, None, :, :]
        summarywritter.add_image(name + "/Patches", norm(patchs), global_step=step, dataformats='NCWH')
        # Log kernels distribution
        vector = patchs.reshape(-1)
        summarywritter.add_histogram(name + "/Weight_distribution", vector, global_step=step)
        # Log bias if available
        try:
            vector = layer.bias.view(-1)
            summarywritter.add_histogram(name + "/Bias_distribution", vector, global_step=step)
        except AttributeError:
            pass
  
    # Indexes of layer that we will meet
    indexes = {
        "bottleneck_idx": 0,
        "conv_idx": 0,
        "dilation_idx": 0
    }
  
    def log_module(self, indexes=indexes):
        global conv_idx, bottleneck_idx, dilation_idx
        # Log current module
        if type(self) == BottleneckBlock:
            indexes["bottleneck_idx"] += 1
            log_conv2d(tensorboard, self.layer[0], "BottleneckBlock", indexes["bottleneck_idx"])

        if type(self) == nn.Conv2d:
            indexes["conv_idx"] += 1
            log_conv2d(tensorboard, self, "Conv2d", indexes["conv_idx"])

        if type(self) == nn.Conv1d:
            indexes["conv_idx"] += 1
            log_conv1d(tensorboard, self, "Conv1d", indexes["conv_idx"])

        # Log sub modules
        for layer in self.children():
            log_module(layer, indexes)
    
    # Log body of model
    body.apply(log_module)
  
    # Helper to get something close to square root
    def factor_int(n):
        nsqrt = math.ceil(math.sqrt(n))
        solution = False
        val = nsqrt
        while not solution:
            val2 = int(n/val)
            if val2 * val == float(n):
                solution = True
            else:
                val-=1
        return int(val), int(val2), n
  
    #TODO LOG
    log_conv1d(tensorboard, head.pitch, name="Head")

In [None]:
class PitchnetModule(pl.LightningModule):

    def __init__(self, model, loss_fn):
        super().__init__()

        self.model = model
        self.loss_fn = loss_fn
        self.lr = 1e-3
        self.use_cuda = torch.cuda.is_available()
        # self.device = torch.device("cuda:0" if self.use_cuda else "cpu")
        self.to(torch.device("cuda:0" if self.use_cuda else "cpu"))


        # Move on the right device
        self.to(self.device)
        self.model.to(self.device)

        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)

    def forward(self, x):
        return self.model.forward(x.to(self.device))

    def training_step(self, batch, batch_idx):
        import gc
        gc.collect()
        # Log model internals
        if batch_idx % 10 == 0:
            display_hook(pitchnet_module, logger=self.logger)

        x, y = batch
        y_hat = self.forward(x)
        loss, pitch_loss, seg_width_loss, seg_offset_loss, seg_confidence_loss, seg_presence_loss, iou_loss, onset_loss = self.loss_fn(y_hat, y.to(self.device))
        tensorboard_logs = {
            'train_loss': loss,
            'train_pitch_loss': pitch_loss,
            'train_seg_width_loss': seg_width_loss,
            'train_seg_offset_loss': seg_offset_loss,
            'train_seg_confidence_loss': seg_confidence_loss,
            'train_seg_presence_loss': seg_presence_loss,
            'train_iou_loss': iou_loss,
            'train_onset_loss': onset_loss,
            }
        self.log_dict(tensorboard_logs)
        return loss

    def validation_step(self, batch, batch_nb):
        x, y = batch
        y_hat = self.forward(x)
        loss, pitch_loss, seg_width_loss, seg_offset_loss, seg_confidence_loss, seg_presence_loss, iou_loss, onset_loss = self.loss_fn(y_hat, y.to(self.device))
        tensorboard_logs = {
            'val_loss': loss,
            'val_pitch_loss': pitch_loss,
            'val_seg_width_loss': seg_width_loss,
            'val_seg_offset_loss': seg_offset_loss,
            'val_seg_confidence_loss': seg_confidence_loss,
            'val_seg_presence_loss': seg_presence_loss,
            'val_iou_loss': iou_loss,
            'val_onset_loss': onset_loss,
            }
        self.log_dict(tensorboard_logs)
        return loss

    def validation_end(self, outputs):
        # Log validation mean
        val_loss_mean = torch.stack([x['val_loss'] for x in outputs]).mean()
        self.log_dict({'val_loss_mean': val_loss_mean})

    def configure_optimizers(self):
        # can return multiple optimizers and learning_rate schedulers
        # (LBFGS it is automatically supported, no need for closure function)
        return self.optimizer

    # @pl.data_loader
    def train_dataloader(self):
        return training_generator

    # @pl.data_loader
    def val_dataloader(self):
        return validation_generator


In [None]:
def output_to_pitch(produced):
    return np.array([onehot_to_pitch(np.exp(v)) for v in produced[:, -128:]])
def label_to_pitch(expected):
    return np.array([onehot_to_pitch(v) for v in expected[:, -128:]])

def display_hook(module, logger):
    #TODO: rewrite
    trainer = module
    # Change PLT size
    prev_plt_size = plt.rcParams['figure.figsize']
    plt.rcParams['figure.figsize'] = [14, 15]

    # Select a radom sample from training set
    import random
    #x, y = training_set.__getitem__(random.randint(0,len(training_set)-1))
    x, y = validation_set.__getitem__(random.randint(0,len(validation_set)-1))
    # DEBUG: Display only on sampled dataset
    #x, y = sampled_dataset.__getitem__(random.randint(0,len(sampled_dataset)-1))

    # Evaluate with forward pass
    with torch.set_grad_enabled(False):
        out = trainer.model(x[None, :, :, :].to(trainer.device))

    # Display expected/evaluated curves for both outputs (midi/segmentation)
    produced = np.array(out.cpu().detach())[0]
    expected = np.array(y.cpu().detach())

    def centernorm(x):
        #x = (x - x.mean())
        return x / np.abs(x).max()


    # Display Segmentation
    fig = plt.figure()
    ax = plt.subplot(511)
    ax.title.set_text('Seg Width')
    plt.plot(expected[:, 0], color="green")
    plt.plot(produced[:, 0], color="blue")

    # Displat offset
    ax = plt.subplot(512)
    ax.title.set_text('Seg Offset')
    plt.plot(expected[:, 1], color="green")
    plt.plot((produced[:, 1]) * 0.5 + 0.5, color="blue")
    plt.plot(produced[:, 3], color="red")

    # Displat Pitch
    ax = plt.subplot(513)
    ax.title.set_text('Pitch')
    plt.plot(label_to_pitch(expected), color="blue")
    plt.plot(output_to_pitch(produced), color="red")

    plt.subplot(514)
    plt.imshow(produced[:, -128:].T, origin='lower', aspect='auto')

    # Losses
    ax = plt.subplot(515)
    ax.title.set_text('IOU')
    p2 = (expected[:, 0] != 0) * 1.0 
    seg1_width, seg1_offset = produced[:, 0], produced[:, 1]
    seg2_width, seg2_offset = expected[:, 0], expected[:, 1]
    c_pitch1 = onehot_to_pitch_torch2(torch.exp(torch.Tensor(produced[:, -128:]))) * p2
    c_pitch1 = c_pitch1.numpy()
    c_pitch2 = onehot_to_pitch_torch2(torch.Tensor(expected[:, -128:])) * p2
    c_pitch2 = c_pitch2.numpy()
    iou = calculate_iou(seg1_width, seg1_offset * 0.5 + 0.5, c_pitch1, seg2_width, seg2_offset, c_pitch2)
    # deltap = (c_pitch1 - c_pitch2).abs().numpy()
    plt.plot(iou, label='iou')
    plt.plot(produced[:, 2], color="orange")
    ax.legend()
    
    try:
        x_train = np.linspace(0, len(trainer.train_losses), len(trainer.train_losses))
        x_valid = np.linspace(0, len(trainer.train_losses), len(trainer.valid_losses))
        plt.plot(x_train, trainer.train_losses)
        plt.plot(x_valid, trainer.valid_losses)
    except AttributeError:
        pass

    plt.show()
    
    logger.experiment.add_figure("Display/Validation", fig, display_hook.iter)
    log_model(trainer.model, logger.experiment, display_hook.iter)
    display_hook.iter +=1

    # Restore PLT size
    plt.rcParams['figure.figsize'] = prev_plt_size

try:
    display_hook.iter
except AttributeError:
    display_hook.iter = 0

We now train the network while monitoring and loging the losses.

In [None]:
# Create trainer
import gc
# try:
#   pitchnet
# except NameError:
pitchnet_module = PitchnetModule(model, loss_fn)
# pitchnet_module = PitchnetModule.load_from_checkpoint("runs/pitchnet-pitch+presence+dil3i2-nopresence/version_1/checkpoints/epoch=131-step=17556.ckpt", model=model, loss_fn=loss_fn)

# Init logger
from pytorch_lightning.loggers import TensorBoardLogger
logger = TensorBoardLogger("runs", name="pitchnet-pitchd23w2+3datasets+ablation_autocorrelation")

# Display model after initialization
# print("Model before training:")
# display_hook(pitchnet_module, logger=logger)

# Train neural net
trainer = pl.Trainer(max_epochs=300,
                     limit_train_batches=1.0, logger=logger,
                     log_every_n_steps=1, accelerator='gpu',
                     callbacks=[pl.callbacks.ModelCheckpoint(
                        save_top_k=3,
                        save_last=True,
                        # mode="min" if "acc" not in hparams.metric_to_track else "max",
                        monitor='val_loss',
                        dirpath=None,
                        # filename="{epoch}",
                        verbose=True,
                    )]) 

In [None]:
trainer.fit(pitchnet_module)

logger.finalize("success")

In [None]:
!ls runs/pitchnet-pitchd23w2+3datasets+ablation_autocorrelation/version_0/checkpoints/

In [None]:
!cp runs/pitchnet-pitchd23w2+3datasets+ablation_autocorrelation/version_0/checkpoints/epoch=20-step=4914.ckpt pitchnet-ablation-pitchd23w2+3datasets.ckpt

In [None]:
!cp runs/pitchnet-pitchd23w2+3datasets/version_3/checkpoints/epoch=23-step=5616.ckpt pitchnet-pitchd23w2+3datasets.ckpt

In [None]:
#First good model:
# pitchnet_module = pitchnet_module.load_from_checkpoint('runs/pitchnet-pitchd23w2+3datasets/version_3/checkpoints/epoch=23-step=5616.ckpt', model=model, loss_fn=loss_fn)

pitchnet_module = pitchnet_module.load_from_checkpoint('runs/pitchnet-pitchd23w2+3datasets+ablation_autocorrelation/version_0/checkpoints/epoch=21-step=5148.ckpt', model=model, loss_fn=loss_fn)




In [None]:
# Recover model
model = pitchnet_module.model

# Visualization of training result

Display some nice graph of what happened during learning.

In [None]:
# Show curve in plotly (allow zooming !!!)
def show_curve_plotly(signal, title=None, **kwargs):
  fig = go.Figure()
  
  if type(signal) == tuple:
    for s in signal:
      fig.add_trace(go.Scatter(x=list(range(len(s))), y=s, **kwargs))
  else:
    fig.add_trace(go.Scatter(x=list(range(len(signal))), y=signal, **kwargs))
    
  if title is not None:
    fig.update_layout(
      title=go.layout.Title(text=title,
          xref="paper",
          x=0
      )
    )
  fig.show()

# Show curve in pyplot
def show_curve(signal, title=None, **kwargs):
    plt.figure()
    if title: plt.title(title)
  
    if type(signal) == tuple:
        for s in signal:
            plt.plot(s)# ; plt.show()
    else:
        plt.plot(signal)# ; plt.show()
    plt.show()


# Export Model

We first export the build_id of the datasets used

In [None]:
with open("datasets_build_id", 'w') as file:
    file.write(build_id)

## Export in pytorch format

In [None]:
input_shape = (1, 500, 4, 513)

Export directly by saving the pytorch model with code.

In [None]:
from datetime import datetime
!mkdir -p models
torch.save(model, "models/monophonic_net.pt")
torch.save(model, "models/monophonic_net_"+ datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + ".pt")

Export after compiling using the JIT library. This allow loading/runing the model without any code from this file.

In [None]:
# Compile network and save it
timestr = datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
name = 'models/monophonic_net_jit_' + timestr + '.pt'
jit_model = torch.jit.trace(model.cpu(), torch.randn(input_shape))
jit_model.save(name)
del jit_model
import gc
gc.collect()
model.cuda();

## Export to ONNX format

We start by patching the local ONNX version of python to take one that finaly support log_softmax. This should be removed when the standard pytorch version used contain this commit.

In [None]:
# Download a working ONNX 9 opcode implementation
!curl -O https://raw.githubusercontent.com/pytorch/pytorch/3ada2e0d64b40622e823b8135d2bbbc74e6526b9/torch/onnx/symbolic_opset9.py
!cp symbolic_opset9.py /usr/local/lib/python3.6/dist-packages/torch/onnx/
!sudo cp symbolic_opset9.py /opt/anaconda3/lib/python3.7/site-packages/torch/onnx/

In [None]:
# You may need to restart your python runtime
import torch.onnx

In [None]:
# Create dummy input
dummy_input = torch.rand(input_shape).cuda()
gc.collect()
# Define input / output names
input_names = ["network_input"]
output_names = ["network_output"]

# Convert the PyTorch model to ONNX
timestr = datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
onnx_filename = 'models/monophonic_net_' + timestr + '.onnx'
torch.onnx.export(model,
                  dummy_input,
                  onnx_filename,
                  verbose=True,
                  input_names=input_names,
                  output_names=output_names)

## Visualization

In [None]:
plt.rcParams['figure.figsize'] = [12, 5]

Display the trained model on a random sample from validation set.

In [None]:
import gc; gc.collect()

# Select a radom sample
import random
x, y = validation_set.__getitem__(random.randint(0,len(validation_set) - 1))
# x, y = voice_dataset.__getitem__(random.randint(0,len(validation_set) - 1))
# x, y = voice_dataset.__getitem__(0)
# x, y = synth_dataset.__getitem__(random.randint(0,len(validation_set) - 1))
# x, y = synth_dataset.__getitem__(len(validation_set) - 1)

# Evaluate with forward pass
with torch.set_grad_enabled(False):
    out = model(x[None, :, :, :].to(pitchnet_module.device))

# Display expected/evaluated curves for both outputs (midi/segmentation)
produced = np.array(out.cpu().detach())[0]
expected = np.array(y.cpu().detach())

# show_curve((expected[:, 0], produced[:, 0]), title='Width')
# show_curve((expected[:, 1], produced[:, 1] * 0.5 + 0.5), title='Offset')
# show_curve(((expected[:, 0] != 0) * 1.0, produced[:, 2]), title='Presence')
# show_curve(([onehot_to_pitch(v) for v in expected[:, -128:]], [onehot_to_pitch(np.exp(v)) for v in produced[:, -128:]]), title='Pitch')


In [None]:
ppp = produced.copy()
ppp[:, -128:] = np.exp(ppp[:, -128:])
visualize_onehot_segmentation(ppp)

In [None]:
visualize_onehot_segmentation(produced)
visualize_onehot_segmentation(expected)

# Measure performances

In [None]:
def test_step(model, batch):
    x, y = batch
    y_hat = model.forward(x)
    loss, pitch_loss, seg_width_loss, seg_offset_loss, seg_confidence_loss, seg_presence_loss, iou_loss, onset_loss = self.loss_fn(y_hat, y.to(self.device))
    tensorboard_logs = {
        'val_loss': loss,
        'val_pitch_loss': pitch_loss,
        'val_seg_width_loss': seg_width_loss,
        'val_seg_offset_loss': seg_offset_loss,
        'val_seg_confidence_loss': seg_confidence_loss,
        'val_seg_presence_loss': seg_presence_loss,
        'val_iou_loss': iou_loss,
        'val_onset_loss': onset_loss,
        }
    self.log_dict(tensorboard_logs)
    return loss


In [None]:
pitchnet_module.freeze()

In [None]:
def evaluate_perf(dataset, batch_size=16, num_workers=8, accuracy_threshold=50, verbose=True):
    """
    Accuracy threshold expressed in cents (100cents = 1 semi tone)
    """
    loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    
    
    def get_quantiles(error, weight):
        error = torch.Tensor(error)
        mean = error.mean()
        q25, q75 = torch.quantile(error, 0.25), torch.quantile(torch.Tensor(error), 0.75)
        q99 = torch.quantile(error, 0.99)
        q50 = error.median()
        return q25, q50, q75, q99,  mean
    
    
    errors = []
    errors_30ms = []
    accuracies = []
    accuracies_30ms = []
    weights = []
    for mini_batch in loader:
        x, y = mini_batch
        z = pitchnet_module(x)

        pitch1 = z[:, :, -128:]
        pitch2 = y[:, :, -128:]
        pitch_exists = pitch2[:, :, 1:].sum(dim=2) > 0

        pitch_estimated = onehot_to_pitch_torch2(torch.exp(pitch1)).cpu() * pitch_exists # NxT
        pitch_label = onehot_to_pitch_torch2(pitch2).cpu() * pitch_exists # NxT
        
        # How much non zero information is contained in this batch
        weight = pitch_exists.sum()
        weights.append(weight.item())
        nb_unused_cells = (~pitch_exists).sum()

        # The error in pitch for non zero labels
        error = (pitch_estimated - pitch_label).abs()
        error_p1 = torch.zeros_like(error)
        error_m1 = torch.zeros_like(error)
        error_p1[:, 1:] = (pitch_estimated[:, 1:] - pitch_label[:, :-1]).abs()
        error_m1[:, :-1] = (pitch_estimated[:, :-1] - pitch_label[:, 1:]).abs()
        error_30ms = torch.minimum(error, torch.minimum(error_p1, error_m1))
        errors.append(error[pitch_exists].numpy())
        errors_30ms.append(error_30ms[pitch_exists].numpy())
        
        # The accuracy with a 
        accuracy_ar = error < 50 / 100
        accuracy = (accuracy_ar.sum() - nb_unused_cells ) / weight
        accuracies.append(accuracy.item())
        
        
        accuracy_30ms = error_30ms < 50/100
        accuracy_30ms = (accuracy_30ms.sum() - nb_unused_cells ) / weight
        accuracies_30ms.append(accuracy_30ms.item())
        
        if verbose:
            print('Accuracy 10ms:', accuracy.item(), 'and 30ms:', accuracy_30ms.item())
        
        #debug
        # if len(accuracies) > 3:
        #     break
    
    accuracies = np.array(accuracies)
    weights = np.array(weights)
    errors = np.concatenate(errors)
    errors_30ms = np.concatenate(errors_30ms)
    
    # Total accuracy
    total_accuracy = (accuracies * weights).sum() / weights.sum()
    print(f"Total accuracy: {total_accuracy}")
    
    # Total accuracy 30ms
    total_accuracy = (accuracies_30ms * weights).sum() / weights.sum()
    print(f"Total accuracy 30ms: {total_accuracy}")
    
    # Total errors
    q25, q50, q75, q99, mean = get_quantiles(errors.reshape(-1), weight.sum())
    print(f"Error in semitones: mean:{mean}, 25th:{q25}, median:{q50}, 75th:{q75}, 99th:{q99}")
    
    
    q25, q50, q75, q99, mean = get_quantiles(errors_30ms.reshape(-1), weight.sum())
    print(f"Error in semitones (30ms): mean:{mean}, 25th:{q25}, median:{q50}, 75th:{q75}, 99th:{q99}")
    
    return errors, accuracy

print("-"*20)
print("- Generate Report")
print("-"*20)
print("Synth")
evaluate_perf(test_synth, batch_size=16, num_workers=8)
print("*"*10)
print("Voice")
evaluate_perf(test_voice, batch_size=16, num_workers=8)
print("*"*10)
print("Sampled")
evaluate_perf(test_sampled, batch_size=16, num_workers=8)

## Synthetic dataset

In [None]:
def evaluate_perf(dataset, batch_size=16, num_workers=8, accuracy_threshold=50, verbose=True):
    """
    Accuracy threshold expressed in cents (100cents = 1 semi tone)
    """
    loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    
    
    def get_quantiles(error, weight):
        error = torch.Tensor(error)
        mean = error.mean()
        q25, q75 = torch.quantile(error, 0.25), torch.quantile(torch.Tensor(error), 0.75)
        q99 = torch.quantile(error, 0.99)
        q50 = error.median()
        return q25, q50, q75, q99,  mean
    
    
    errors = []
    errors_30ms = []
    accuracies = []
    accuracies_30ms = []
    weights = []
    for mini_batch in loader:
        x, y = mini_batch
        z = pitchnet_module(x)

        pitch1 = z[:, :, -128:]
        pitch2 = y[:, :, -128:]
        pitch_exists = pitch2[:, :, 1:].sum(dim=2) > 0

        pitch_estimated = onehot_to_pitch_torch2(torch.exp(pitch1)).cpu() * pitch_exists # NxT
        pitch_label = onehot_to_pitch_torch2(pitch2).cpu() * pitch_exists # NxT
        
        # How much non zero information is contained in this batch
        weight = pitch_exists.sum()
        weights.append(weight.item())
        nb_unused_cells = (~pitch_exists).sum()

        # The error in pitch for non zero labels
        error = (pitch_estimated - pitch_label).abs()
        error_p1 = torch.zeros_like(error)
        error_m1 = torch.zeros_like(error)
        error_p1[:, 1:] = (pitch_estimated[:, 1:] - pitch_label[:, :-1]).abs()
        error_m1[:, :-1] = (pitch_estimated[:, :-1] - pitch_label[:, 1:]).abs()
        error_30ms = torch.minimum(error, torch.minimum(error_p1, error_m1))
        errors.append(error[pitch_exists].numpy())
        errors_30ms.append(error_30ms[pitch_exists].numpy())
        
        # The accuracy with a 
        accuracy_ar = error < 50 / 100
        accuracy = (accuracy_ar.sum() - nb_unused_cells ) / weight
        accuracies.append(accuracy.item())
        
        
        accuracy_30ms = error_30ms < 50/100
        accuracy_30ms = (accuracy_30ms.sum() - nb_unused_cells ) / weight
        accuracies_30ms.append(accuracy_30ms.item())
        
        if verbose:
            print('Accuracy 10ms:', accuracy.item(), 'and 30ms:', accuracy_30ms.item())
        
        #debug
        # if len(accuracies) > 3:
        #     break
    
    accuracies = np.array(accuracies)
    weights = np.array(weights)
    errors = np.concatenate(errors)
    errors_30ms = np.concatenate(errors_30ms)
    
    # Total accuracy
    total_accuracy = (accuracies * weights).sum() / weights.sum()
    print(f"Total accuracy: {total_accuracy}")
    
    # Total accuracy 30ms
    total_accuracy = (accuracies_30ms * weights).sum() / weights.sum()
    print(f"Total accuracy 30ms: {total_accuracy}")
    
    # Total errors
    q25, q50, q75, q99, mean = get_quantiles(errors.reshape(-1), weight.sum())
    print(f"Error in semitones: mean:{mean}, 25th:{q25}, median:{q50}, 75th:{q75}, 99th:{q99}")
    
    
    q25, q50, q75, q99, mean = get_quantiles(errors_30ms.reshape(-1), weight.sum())
    print(f"Error in semitones (30ms): mean:{mean}, 25th:{q25}, median:{q50}, 75th:{q75}, 99th:{q99}")
    
    return errors, accuracy

print("-"*20)
print("- Generate Report")
print("-"*20)
print("Synth")
evaluate_perf(test_synth, batch_size=16, num_workers=8)
print("*"*10)
print("Voice")
evaluate_perf(test_voice, batch_size=16, num_workers=8)
print("*"*10)
print("Sampled")
evaluate_perf(test_sampled, batch_size=16, num_workers=8)