# This notebook is not meant to run stand alone, but is %run from *all* the other analysis files in this directory

It assumes the notebooks that %run this file define <br>
pickledParamFile
before running. pickledParamFile is the parameter file stored by RNNControl, and includes all the necessary information to reconstruct and run the saved models. <br>
Note that this file defines a generate() function that can synthesize sound under continuously changing conditioning parameters.

In [None]:
import numpy as np
import math
import time
from datetime import datetime
import os

import torch
import torch.nn as nn
import torchvision.transforms as transform

import audioDataloader.dataloader as dataloader
from audioDataloader.transforms import mulawnEncode,mulaw,array2tensor,dic2tensor,injectNoise,normalizeDim
from paramManager import paramManager

import matplotlib.pylab as plt
%matplotlib inline

import librosa

import pickle

from platform import python_version
print("python version {}".format(python_version()))

import torch
print("torch version {}".format(torch.__version__))

### Data Params  
<a id="dataparams"></a>

In [None]:
#import json
import pprint

assert(pickledParamFile)
print("will read params from {}".format(pickledParamFile))

modelDir = os.path.dirname(pickledParamFile)
print("modelDir is {}".format(modelDir))

with open(pickledParamFile, 'rb') as input:
    params = pickle.load(input) 

#print(json.dumps(params, indent=3))
print("\n Saved Parameters:\n")
pprint.pprint(params)

#If loadModelFileName is not defined or is None, grab the latest
try: loadModelFileName
except NameError: loadModelFileName = None
    
#load the last saved model, or use one specified by program %running this notebook 
loadModelFile = modelDir + '/' + params['savedModel'] if not loadModelFileName else modelDir + '/' + loadModelFileName
print("setting loadModelFile to {}".format(loadModelFile))

In [None]:
from utils.myUtils import octaves
#mapping for the way we'll specify parameters for generating tones
midi=octaves(params['lowNote'], params['hiNote']) #default maps midi notenums to freq, norm [0,1] to [p1, p2]

k_mincps=midi.param2freq(params['lowNote']) # 523 # 42 samples per period at 22kHz
k_maxcps=midi.param2freq(params['hiNote']) #

In [None]:

k_rootoutname='./output'

seqLen=params['seqLen']
sr = 16000

#Generation parameters
#*************************************
max_length = seqLen*48

# Cuda
#*************************************
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
# Let's check out the available conditional parameters first
#*************************************
pm = paramManager.paramManager(params['datadir'], params['paramdir'])
datafiles = pm.filenames(params['datadir'])
paramfile = pm.getParams(datafiles[0]) 

notpresent=[x for x in params['props'] if x not in paramfile.keys()]
assert 0 == len(notpresent), "props {} are not in the dataparam files".format(notpresent)

#print(paramfile.keys())

In [None]:
# Initialize dataset & dataloader
#*************************************
audiocoding = mulawnEncode(256,0,1) #initialize the mu-law encodings
targetcoding = mulaw(256)
rescalePitch = normalizeDim('midiPitch',params['lowNote'],params['hiNote'])
rescalePressure1 = normalizeDim('pressure1',0,0.9)
#rescalePressure2 = normalizeDim('pressure2',.1,1)


testdataset = dataloader.AudioDataset(sr,params['seqLen'],params['stride'],
                                  csvfile = None if 'csvfile' not in params else params['csvfile'],    
                                  datadir  = None if 'csvfile' in params else params['datadir'],
                                  extension= None if 'csvfile' in params else 'wav',
                                  paramdir=params['paramdir'],prop=params['props'],
                                  transform=transform.Compose([array2tensor(torch.FloatTensor)]), 
                                  param_transform=transform.Compose([ rescalePitch,dic2tensor(torch.FloatTensor)]),
                                  target_transform=transform.Compose([array2tensor(torch.LongTensor)]))

test_loader = torch.utils.data.DataLoader(dataset=testdataset,
                                          batch_size=1, 
                                          shuffle=True,
                                          num_workers=4,
                                          drop_last=True)

In [None]:
# Specify the model
#*************************************
class RNN(nn.Module):
    # input size - the number of "classes"
    def __init__(self, input_size, cond_size, hidden_size, output_size, n_layers=1):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.cond_size = cond_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers #no. of stacked GRU layers

        self.i2h = nn.Linear(input_size+cond_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_size, output_size)
    

    # input and cv are each one sequence element 
    def forward(self, input, hidden, batch_size=1):
        #print("input size is " + str((input.size())))
        
        h1 = self.i2h(input)
        #print("size of h1 is " + str(h1.size()))
        
        h_out, hidden = self.gru(h1.view(batch_size,1,-1), hidden)
        #print("h_out"+str(h_out.size()))
        
        output = self.decoder(h_out.view(batch_size,-1))
        #print("output2"+str(output.size()))
        
        return output, hidden

    # initialize hiddens for each minibatch
    def init_hidden(self,batch_size=1):
        return torch.zeros(self.n_layers, batch_size, self.hidden_size, dtype=torch.float, device=device)

In [None]:

def generate(model,max_length,primer=None,paramvect=None, returnHiddenSequence=False):
    """Generate a signal using the provided model.
    @param max_length of the synthesized signal
    @param primer a torch.tensor of shape (batch=1, primer_signal=primer_length, num_inputs=1+num_cond_parameters)
    @paramvect conditioning parameters  (a function of i, sample number)
    """ 
    if returnHiddenSequence :
        hs=[]
        
    model.eval()
    with torch.no_grad():
        for p_inp,target in test_loader:
            if primer is not None:
                # must clone else primer is changed outside this function
                p_inp.data=primer.clone()
            seq = np.copy(p_inp[0,:,0])  #extract the original sample
            
            seq_mu = audiocoding(seq)  #mu-law
            p_inp[0,:,0] = array2tensor(torch.FloatTensor)(seq_mu) #now we have both the original and mu-lawed samples
            break   
        generated = seq # before mu-law audio encoding
       
        p_inp = p_inp.to(device)
        #print("p_inp",p_inp)

        hidden = model.init_hidden()
        if returnHiddenSequence :
            hs.append(torch.squeeze(hidden).cpu().numpy())

            
        for j in range(seqLen-1):  #build up hidden state
            _, hidden = model(p_inp[:,j,:],hidden)
        inp = p_inp[:,-1,:]  #feed the last value as the initial value of the actual generation
        
        #print("last inp from primer",inp)
        
        for i in range(max_length):
            outputs, hidden = model(inp,hidden)
            
            outputs = nn.functional.log_softmax(outputs,dim=1)
            topv, topi = outputs.detach().topk(1)  #choose the strongest activation
            #print(topv,topv.shape)
            predicted_sample = targetcoding.index2float(topi)
            
            generated = np.append(generated,predicted_sample)
            
            inp[:,0] = torch.from_numpy(audiocoding([predicted_sample])).type(torch.cuda.FloatTensor)
            #print("input",inp)
            #print("shape",inp.shape)
            if paramvect is not None:
                inp[:,1:] = torch.tensor(paramvect(i))
                #print("input2",inp)
                
            if returnHiddenSequence :
                hs.append(torch.squeeze(hidden).cpu().numpy())
                                       
        if returnHiddenSequence :
            return generated, hs
        else :
            return generated
        

In [None]:
# Initialize the network, optimizer and objective func
#*************************************
rnn = RNN(input_size=1,cond_size=len(params['props']),hidden_size=params['hiddenSize'],output_size=256,n_layers=params['nLayers']).to(device)
print("**************************************************************************************************.")
print("Will load model: {}".format(loadModelFile))
rnn.load_state_dict(torch.load(loadModelFile))