# Timbre Encoder
This is the notebook used to train the Vocal Pitch Modulator.

This notebook makes use of the data to train our timbre encoder.

There are two models here, the first is a vowel classifier, that takes in an MFCC and outputs a vowel, and the second is a VAE that takes in an MFCC, reduces its dimensionality, and attempts to reconstruct the provided MFCC.

## Global variables/Imports
Run these cells before running either of the following sections.

In [None]:
%load_ext autoreload
%autoreload 1

import os
import csv

import scipy.io as sio
from scipy.io import wavfile
from scipy.io.wavfile import write

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import subplots

import time
import math

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, mean_squared_error, log_loss

from tqdm.notebook import trange, tqdm

from IPython.display import HTML
import warnings
from sklearn.preprocessing import OneHotEncoder

import torch
warnings.filterwarnings('ignore')
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

%aimport VPM
from VPM import *
%aimport Utils
from Utils import *
%aimport ANN
from ANN import *

In [None]:
# Constants that should not change without the dataset being changed
n_pitches = 16
n_vowels = 12
n_people = 3

# These dictionaries are more for reference than anything
label_to_vowel = { 0: "bed",  1: "bird",   2: "boat",  3: "book", 
                   4: "cat",  5: "dog",    6: "feet",  7: "law",  
                   8: "moo",  9: "nut",   10: "pig",  11: "say" }

vowel_to_label = { "bed": 0,  "bird": 1,  "boat":  2, "book":  3,
                   "cat": 4,  "dog":  5,  "feet":  6, "law":   7,
                   "moo": 8,  "nut":  9,  "pig":  10, "say":  11}

noteidx_to_pitch = {  0: "A2",   1: "Bb2",  2: "B2",   3: "C3",
                      4: "Db3",  5: "D3",   6: "Eb3",  7: "E3", 
                      8: "F3",   9: "Gb3", 10: "G3",  11: "Ab3",
                     12: "A3",  13: "Bb3", 14: "B3",  15: "C4" }

### Constants
Used to tune the data generation and ANN.

In [None]:
n_mels = 128
n_mfcc = 20

### Data Generation
This is all the code that was explained in the Data Walkthrough. It generates data structures to hold all wav file data, spectrograms, mel spectra and MFCC data for all wav files.

For diagram-visualization of the data set, refer to the [readme](https://github.com/zioul123/VocalPitchModulator/blob/master/README.md).

Note that MFCC are normalized BY ROW to `[0, 1]`, to allow the VAE to output it.

In [None]:
# File reference lists
data_ref_list = create_data_ref_list(os.path.join("Data", 'dataset_files.csv'),
                            n_pitches, n_vowels, n_people)
# flat_data_ref_list[flat_ref_idx(vowel, pitch, person)]
flat_data_ref_list = flatten_3d_array(data_ref_list, 
                                      n_vowels, n_pitches, n_people)

# File reference list accessors
# Returns a flat_ref_idx, given a vowel, pitch, person
flat_ref_idx = lambda vowel, pitch, person: flat_3d_array_idx(
    vowel, pitch, person, n_vowels, n_pitches, n_people)
# Returns vowel, pitch, person, given a flat_ref_idx
nd_ref_idx = lambda idx: nd_array_idx(idx, n_vowels, n_pitches, n_people)

# Data-label pairs for pitch-shift training - not used here
# data_label_pairs, _ = create_data_label_pairs(n_pitches)

# wav, spectrogram, mels, mfcc for each file in flat_data_ref_list
# wav_data:     (576, ~29400)  (n_wavs, n_samples)
# spectrograms: (576, 513, 58) (n_wavs, n_freq_bins, n_windows)
# mels:         (576, 128, 58) (n_wavs, n_mels, n_windows)
# mfccs:        (576, 20, 58)  (n_wavs, n_mfcc, n_windows)
all_wav_data = load_wav_files(os.path.join("Data", "dataset"), 
                              flat_data_ref_list)
all_spectrograms = np.array([ stft(waveform, plot=False) 
                              for waveform in all_wav_data ])
all_mels, all_mfcc = map(np.array, map(list, zip(*
                         [ ffts_to_mel(ffts, n_mels = n_mels, n_mfcc = n_mfcc) 
                           for ffts in all_spectrograms ])))

# Normalize each mfcc (i.e. rows of the all_mfcc array) to [0,1]
all_mfcc = normalize_rows(all_mfcc, NormMode.REAL_TO_ZERO_ONE)

### Data-Label Structuring
This puts together the actual data-label pairs to be fed into the ANN.

Generate `data` and `labels` from `all_mfcc` and using `nd_ref_idx`.

In [None]:
n_files, n_mfcc_dummy, n_windows = all_mfcc.shape

# vowel_labels: (576) (n_wavs)
all_vowel_labels, _, _ = map(np.array, map(list, zip(*
                         [ nd_ref_idx(idx) 
                           for idx in range(len(flat_data_ref_list)) ])))

# Returns a flat 2d idx, given a wavfile index and a window index
flat_data_idx = lambda wav_idx, win_idx: flat_2d_array_idx(
    wav_idx, win_idx, n_files, n_windows)

# data:   (33408, 20) (n_wavs * n_windows, n_mfcc)
# labels: (33408) (n_wavs * n_windows)
data = np.array([ all_mfcc[wav_file_idx][:, window_idx] 
                  for wav_file_idx in range(n_files) 
                  for window_idx in range(n_windows) ])
labels = np.array([ all_vowel_labels[wav_file_idx]
                    for wav_file_idx in range(n_files)
                    for window_idx in range(n_windows) ])

In [None]:
# For testing purposes - verify that the mfcc have been arranged in order of
# wav_idx, win_idx, mel_feature_idx
for wav_idx in range(n_files):
    for win_idx in range(n_windows):
        for m in range(n_mfcc_dummy):
            assert data[flat_data_idx(wav_idx, win_idx)][m] == \
                   all_mfcc[wav_idx][m][win_idx]
# Verify that the labels are arranged in order of wav_idx, win_idx
for wav_idx in range(n_files):
    for win_idx in range(n_windows):
        assert labels[flat_data_idx(wav_idx, win_idx)] == \
               all_vowel_labels[wav_idx]

Split Data into `train` and `test`, and convert to Torch tensors of the correct types. Run **only one of these cells.**

First method (**not-recommended**, simple): Random sampling to train and test

In [None]:
# X_train, Y_train: (25056, 20) (25056) 
# X_val, Y_val:     (8352, 20) (8352)
X_train, X_val, Y_train, Y_val = train_test_split(data, labels, stratify=labels, random_state=0)
X_train, Y_train, X_val, Y_val = map(torch.tensor, (X_train, Y_train, X_val, Y_val))
# Default tensor is float
X_train = X_train.float(); X_val = X_val.float()
# Used as index, so it is long
Y_train = Y_train.long(); Y_val = Y_val.long()

Second method **(recommended)**: 1 person from each wav will be the test data

In [None]:
# X_train, Y_train: (22272, 20) (22272,) 
# X_val, Y_val:     (11136, 20) (11136,)
X_train = []; X_val = []; Y_train = []; Y_val = []
for vow_idx in range(n_vowels):
    for pit_idx in range(n_pitches):
        # Choose the person for this pitch/vowel to be used as test data
        test_pid = int(np.random.rand() * 3)
        for pid_idx in range(n_people):
            wav_idx = flat_ref_idx(vow_idx, pit_idx, pid_idx)
            if (pid_idx != test_pid):
                for win_idx in range(n_windows):
                    X_train.append(data[flat_data_idx(wav_idx, win_idx)])
                    Y_train.append(labels[flat_data_idx(wav_idx, win_idx)])
            else:
                for win_idx in range(n_windows):
                    X_val.append(data[flat_data_idx(wav_idx, win_idx)])
                    Y_val.append(labels[flat_data_idx(wav_idx, win_idx)])  
X_train, Y_train, X_val, Y_val = map(torch.tensor, (X_train, Y_train, X_val, Y_val))
# Default tensor is float
X_train = X_train.float(); X_val = X_val.float()
# Used as index, so it is long
Y_train = Y_train.long(); Y_val = Y_val.long()

### Timbre-Encoder - MFCC -> Vowel
This takes MFCC (and mel-spectrograms in future?), and tries to identify the vowel spoken.

**Results:**
```
| epochs | n_mfcc | n_hid | n_timb | Val acc |
|   5000 |     20 |    12 |      4 | 0.77898 |
```

In [None]:
n_hid = 12; n_timb = 4; lr = 0.2; n_epochs = 5000;

# Training model 
model = TimbreEncoder(n_mfcc=n_mfcc, n_hid=n_hid, n_timb=n_timb, n_vowels=n_vowels)
# Define loss 
loss_fn = F.cross_entropy

In [None]:
print("GPU Available" if torch.cuda.is_available() else "GPU Not available")

In [None]:
# Use GPU if possible (will run on CPU otherwise)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Move inputs to GPU (if possible)
X_train = X_train.to(device)
Y_train = Y_train.to(device)
X_val = X_val.to(device)
Y_val = Y_val.to(device)

# Move the network to GPU (if possible)
model.to(device) 
# Define optimizer 
opt = optim.SGD(model.parameters(), lr=lr)

# Fit the model
tic = time.time()
loss = model.train_func(X_train, Y_train, X_val, Y_val, model, opt,
                        loss_fn, epochs=n_epochs, print_graph=True)
toc = time.time()
print('Final loss: {}\nTime taken: {}'.format(loss, toc - tic))

Saving the model

In [None]:
print("Model's state_dict:")
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())
model_path = os.path.join("model_data", "TimbreEncoder_{}_{}_{}_{}_{}_{}.pt"
                          .format(lr, n_epochs, n_mfcc, n_hid, n_timb, loss))
torch.save(model.state_dict(), model_path)
print("Model saved at {}".format(model_path))

Loading the saved model, and using the model for prediction (whole dataset) example

In [None]:
model = TimbreEncoder(n_mfcc=n_mfcc, n_hid=n_hid, n_timb=n_timb, n_vowels=n_vowels)
model.load_state_dict(torch.load(model_path))
model.eval()
# model.to(device)

data_tensor, label_tensor = map(torch.tensor, (data, labels))
data_tensor = data_tensor.float(); label_tensor = label_tensor.long(); 
# data_tensor = data_tensor.to(device); label_tensor = label_tensor.to(device)

correct = 0; wrong = 0;
corrects = np.zeros(n_vowels); wrongs = np.zeros(n_vowels)
predictions = np.zeros((n_vowels, n_vowels));
for vowel_idx in range(n_vowels):
    for pitch_idx in range(n_pitches):
        for pid_idx in range(n_people):
            wav_idx = flat_ref_idx(vowel_idx, pitch_idx, pid_idx)
            for win_idx in range(n_windows):
                data_idx = flat_data_idx(wav_idx, win_idx)
                label = (label_tensor[data_idx]).item()
                pred = (torch.argmax(model(data_tensor[data_idx]))).item()
                
                predictions[vowel_idx][pred] = predictions[vowel_idx][pred] + 1
                if label == pred:
                    correct = correct + 1
                    corrects[vowel_idx] = corrects[vowel_idx] + 1
                else:
                    wrong = wrong + 1
                    wrongs[vowel_idx] = wrongs[vowel_idx] + 1
                    
print("Total Accuracy: {}"
      .format(correct / (wrong + correct)))
for vowel_idx in range(n_vowels):
    print("Vowel: {}. Accuracy: {}. Most common pred: {}"
          .format(label_to_vowel[vowel_idx],
                  corrects[vowel_idx] / (wrongs[vowel_idx] + corrects[vowel_idx]),
                  label_to_vowel[np.argmax(predictions[vowel_idx])]))


### Timbre-VAE - MFCC -> MFCC
This takes MFCC, reduces dimensionality to a `n_timb` latent space, and attempts to recreate the MFCC.

In [None]:
n_hid = 12; n_timb = 4; lr = 1e-3; n_epochs = 5000; batch_size=22272

# Training model 
model = TimbreVAE(n_mfcc=n_mfcc, n_hid=n_hid, n_timb=n_timb, n_vowels=n_vowels)

# Define loss - from pytorch VAE example.
def loss_fn(recon_x, x, mu, logvar):
    BCE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

In [None]:
print("GPU Available" if torch.cuda.is_available() else "GPU Not available")

In [None]:
# Use GPU if possible (will run on CPU otherwise)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Move inputs to GPU (if possible)
X_train = X_train.to(device)
X_val = X_val.to(device)

# Move the network to GPU (if possible)
model.to(device) 
# Define optimizer 
# opt = optim.SGD(model.parameters(), lr=lr)
opt = optim.Adam(model.parameters(), lr=lr)

# Fit the model
tic = time.time()
loss = model.train_func(X_train, X_val, model, opt, loss_fn, batch_size=batch_size,
                        epochs=n_epochs, print_graph = True)
toc = time.time()
print('Final loss: {}\nTime taken: {}'.format(loss, toc - tic))

Saving the model

In [None]:
print("Model's state_dict:")
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())
model_path = os.path.join("model_data", "TimbreVAE_{}_{}_{}_{}_{}_{}_{}.pt"
                          .format(lr, n_epochs, n_mfcc, n_hid, n_timb, batch_size, loss))
torch.save(model.state_dict(), model_path)
print("Model saved at {}".format(model_path))

Loading the saved model, and using the model for prediction example

In [None]:
model = TimbreVAE(n_mfcc=n_mfcc, n_hid=n_hid, n_timb=n_timb, n_vowels=n_vowels)
model.load_state_dict(torch.load(model_path))
model.eval()

data_tensor = torch.tensor(data)
data_tensor = data_tensor.float();

wav_idx = flat_ref_idx(5, 5, 1)
data_idx = flat_data_idx(wav_idx, 30)
label = data_tensor[data_idx]
pred = model(data_tensor[data_idx])