## Load Model

In [None]:
from wavenet_model import *
from audio_data import WavenetDataset

dtype = torch.FloatTensor
ltype = torch.LongTensor

use_cuda = torch.cuda.is_available()
if use_cuda:
    print('use gpu')
    dtype = torch.cuda.FloatTensor
    ltype = torch.cuda.LongTensor

In [None]:
model = WaveNetModel(layers=10,
                     blocks=3,
                     dilation_channels=32,
                     residual_channels=32,
                     skip_channels=1024,
                     end_channels=512, 
                     output_length=16,
                     dtype=dtype, 
                     bias=True)
model = load_latest_model_from('snapshots', use_cuda=use_cuda)

model.dtype = dtype
if use_cuda:
    model.cuda()
else:
    model.cpu()
    
print('model: ', model)
print('receptive field: ', model.receptive_field)
print('parameter count: ', model.parameter_count())

In [None]:
data = WavenetDataset(dataset_file='train_samples/bach_chaconne/dataset.npz',
                      item_length=model.receptive_field + model.output_length - 1,
                      target_length=model.output_length,
                      file_location='train_samples/bach_chaconne',
                      test_stride=500)
print('the dataset has ' + str(len(data)) + ' items')

## Deployment

In [None]:
import torch
import torch.nn as nn
import os
import onnxruntime
import numpy as np
import onnx
import shutil
from timeit import default_timer as timer
import vai_q_onnx

### PyTorch (CPU)

In [None]:
input_data = torch.rand(1, 256, 3070)

start = timer()
for _ in range(100):
    model(input_data)
pytorch_total = timer() - start

print(f"Inference Time: {pytorch_total / 100}")

In [None]:
def prog_callback(step, total_steps):
    print(str(100 * step // total_steps) + "% generated")

start_data = data[260000][0] # use start data from the data set
start_data = torch.max(start_data, 0)[1] # convert one hot vectors to integers

start = timer()
generated = model.generate(num_samples=160000,
                           first_samples=start_data,
                           temperature=1.0,)
pytorch_total = timer() - start

print(f"Generation Time: {pytorch_total}")

### ONNX Runtime (CPU)

In [None]:
# Prep for ONNX export
inputs = {"x": torch.rand(1, 256, 3070)}
input_names = ['input']
output_names = ['output']
dynamic_axes = {'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
model_path = "models/wavenet.onnx"

# Call export function
torch.onnx.export(
        model,
        inputs,
        model_path,
        export_params=True,
        opset_version=13,  # Recommended opset
        input_names=input_names,
        output_names=output_names,
        dynamic_axes=dynamic_axes,
    )

In [None]:
# Specify the path to the quantized ONNZ Model
model_path = r'./models/wavenet.onnx'
onnx_model = onnx.load(model_path)

cpu_options = onnxruntime.SessionOptions()

# Create Inference Session to run the quantized model on the CPU
cpu_session = onnxruntime.InferenceSession(
    onnx_model.SerializeToString(),
    providers = ['CPUExecutionProvider'],
    sess_options=cpu_options,
)

# Run Inference
start = timer()
for _ in range(100):
    cpu_results = cpu_session.run(None, {"input": input_data.numpy()})
cpu_total = timer() - start

print(f"Inference Time: {cpu_total / 100}")

In [None]:
def generate(model,
             num_samples,
             first_samples=None,
             temperature=1.,
             session=None):
    model.eval()
    if first_samples is None:
        first_samples = model.dtype(1).zero_()
    generated = Variable(first_samples, volatile=True)

    num_pad = model.receptive_field - generated.size(0)
    if num_pad > 0:
        generated = constant_pad_1d(generated, model.scope, pad_start=True)
        print("pad zero")

    for i in range(num_samples):
        input = Variable(torch.FloatTensor(1, model.classes, model.receptive_field).zero_())
        input = input.scatter_(1, generated[-model.receptive_field:].view(1, -1, model.receptive_field), 1.)

        x = torch.tensor(session.run(None, {"input": input.numpy()})[0])[:, :, -1].squeeze()

        if temperature > 0:
            x /= temperature
            prob = F.softmax(x, dim=0)
            prob = prob.cpu()
            np_prob = prob.data.numpy()
            x = np.random.choice(model.classes, p=np_prob)
            x = Variable(torch.LongTensor([x]))#np.array([x])
        else:
            x = torch.max(x, 0)[1].float()

        generated = torch.cat((generated, x), 0)

    generated = (generated / model.classes) * 2. - 1
    mu_gen = mu_law_expansion(generated, model.classes)

    model.train()
    return mu_gen

start = timer()
generated = generate(model=model,
                     num_samples=160000,
                     first_samples=start_data,
                     temperature=1.0,
                     session=cpu_session)
cpu_total = timer() - start

print(f"Generation Time: {cpu_total}")

### ONNX Runtime (NPU)

In [None]:
# We want to make sure we compile everytime, otherwise the tools will use the cached version
# Get the current working directory
current_directory = os.getcwd()
directory_path = os.path.join(current_directory,  r'cache\wavenet_cache')
cache_directory = os.path.join(current_directory,  r'cache')

# Check if the directory exists and delete it if it does.
if os.path.exists(directory_path):
    shutil.rmtree(directory_path)
    print(f"Directory deleted successfully. Starting Fresh.")
else:
    print(f"Directory '{directory_path}' does not exist.")

# Point to the config file path used for the VitisAI Execution Provider
config_file_path = "vaip_config.json"

aie_options = onnxruntime.SessionOptions()

aie_session = onnxruntime.InferenceSession(
    onnx_model.SerializeToString(),
    providers=['VitisAIExecutionProvider'],
    sess_options=aie_options,
    provider_options = [{'config_file': config_file_path,
                         'cacheDir': cache_directory,
                         'cacheKey': 'wavenet_cache'}]
)

# Run Inference
npu_results = aie_session.run(None, {"input": input_data.numpy()})
start = timer()
for _ in range(100):
    npu_results = aie_session.run(None, {"input": input_data.numpy()})
npu_total = timer() - start

print(f"Inference Time: {npu_total / 100}")

In [None]:
start = timer()
generated = generate(model=model,
                     num_samples=160000,
                     first_samples=start_data,
                     temperature=1.0,
                     session=aie_session)
npu_total = timer() - start

print(f"Generation Time: {npu_total}")

## Generate Audio

In [None]:
import IPython.display as ipd

ipd.Audio(generated, rate=16000)

In [None]:
import soundfile as sf
sf.write('wav/generated_clip1.wav', generated, 16000)