In [1]:
import numpy as np
import scipy as sp

import torch
from torch import nn
import torchinfo
import os

## Choose computation device (CPU)

In [2]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print ("MPS device found.")
else:
    print ("MPS device not found.")

cpu_device = torch.device("cpu")
    
# Select the device for training
device = cpu_device

print(f"Using {device} device")

MPS device not found.
Using cpu device


## User inputs

In [3]:
# EDIT THIS SECTION FOR USER INPUTS
#
name = 'model_0'
in_file = '../data/ts9_test1_in_FP32.wav'
out_file = '../data/ts9_test1_out_FP32.wav'
epochs = 1

train_mode = 0     # 0 = speed training, 
                   # 1 = accuracy training 
                   # 2 = extended training

input_size = 150 
batch_size = 4096 
test_size = 0.2

if not os.path.exists('models/'+name):
    os.makedirs('models/'+name)
else:
    print("A model with the same name already exists. Please choose a new name.")
    exit

A model with the same name already exists. Please choose a new name.


## Define some helper functions

In [4]:
def save_wav(name, data):
    sp.io.wavfile.write(name, 44100, data.flatten().astype(np.float32))

def normalize(data):
    data_max = max(data)
    data_min = min(data)
    data_norm = max(data_max,abs(data_min))
    return data / data_norm

## Pre-processing the data

In [5]:
# Load and Preprocess Data ###########################################
in_rate, in_data = sp.io.wavfile.read(in_file)
out_rate, out_data = sp.io.wavfile.read(out_file)

X_all = in_data.astype(np.float32).flatten()  
X_all = normalize(X_all)
y_all = out_data.astype(np.float32).flatten() 
y_all = normalize(y_all)

# Get the last 20% of the wav data for testing and thee rest for training
X_training, X_testing = np.split(X_all, [int(len(X_all)*(1-test_size))])
y_training, y_testing = np.split(y_all, [int(len(y_all)*(1-test_size))])
print(f"X_training shape (pre-processing): {X_training.shape}")
print(f"y_training shape (pre-processing): {y_training.shape}")
print(f"X_testing shape (pre-processing): {X_testing.shape}")
print(f"y_testing shape (pre-processing): {y_testing.shape}")

# Create a new array where each element is an array of input_size samples in time order
# Each element of the new array is shifted by one sample from the previous element
indices = np.arange(input_size) + np.arange(len(X_training)-input_size+1)[:,np.newaxis]
indices = torch.from_numpy(indices)
X_training = torch.from_numpy(X_training)
X_ordered_training = torch.zeros_like(indices, dtype=torch.float32)
for i, j in enumerate(indices):
    X_ordered_training[i] = torch.gather(X_training, 0, indices[i])
X_ordered_training = X_ordered_training.unsqueeze(1)
print(f"X_ordered_training shape: {X_ordered_training.shape}")

indices = np.arange(input_size) + np.arange(len(X_testing)-input_size+1)[:,np.newaxis]
indices = torch.from_numpy(indices)
X_testing = torch.from_numpy(X_testing)
X_ordered_testing = torch.zeros_like(indices, dtype=torch.float32)
for i, j in enumerate(indices):
    X_ordered_testing[i] = torch.gather(X_testing, 0, indices[i])
X_ordered_testing = X_ordered_testing.unsqueeze(1)
print(f"X_ordered_testing shape: {X_ordered_testing.shape}")


# The input size defines the number of samples used for each prediction
# Therefore the first output value that we get is at index input_size-1
y_ordered_training = y_training[input_size-1:]
y_ordered_training = torch.from_numpy(y_ordered_training)
y_ordered_training = y_ordered_training.unsqueeze(1)
print(f"y_ordered_training shape: {y_ordered_training.shape}")

y_ordered_testing = y_testing[input_size-1:]
y_ordered_testing = torch.from_numpy(y_ordered_testing)
y_ordered_testing = y_ordered_testing.unsqueeze(1)
print(f"y_ordered_testing shape: {y_ordered_testing.shape}")

print(f"The X_ordered_training data is an array, where each element is an array of input_size samples in time order. Therefore the lenght is smaller than the original X_training array (the first {input_size} samples are grouped).")
print(f"The y_ordered_training data is an array, where each element is a single sample. This single sample is the target output for the corresponding X_random_training element, which consists of input samples.")

X_training shape (pre-processing): (6587907,)
y_training shape (pre-processing): (6587907,)
X_testing shape (pre-processing): (1646977,)
y_testing shape (pre-processing): (1646977,)
X_ordered_training shape: torch.Size([6587758, 1, 150])
X_ordered_testing shape: torch.Size([1646828, 1, 150])
y_ordered_training shape: torch.Size([6587758, 1])
y_ordered_testing shape: torch.Size([1646828, 1])
The X_ordered_training data is an array, where each element is an array of input_size samples in time order. Therefore the lenght is smaller than the original X_training array (the first 150 samples are grouped).
The y_ordered_training data is an array, where each element is a single sample. This single sample is the target output for the corresponding X_random_training element, which consists of input samples.


## Create dataloaders

In [6]:
training_dataset = torch.utils.data.TensorDataset(X_ordered_training, y_ordered_training)
training_dataloader = torch.utils.data.DataLoader(training_dataset, batch_size=batch_size, shuffle=True)

for batch, (X, y) in enumerate(training_dataloader):
    print(f"Batch: {batch}")
    print(f"Shape of X: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

testing_dataset = torch.utils.data.TensorDataset(X_ordered_testing, y_ordered_testing)
testing_dataloader = torch.utils.data.DataLoader(testing_dataset, batch_size=batch_size, shuffle=False)

for batch, (X, y) in enumerate(testing_dataloader):
    print(f"Batch: {batch}")
    print(f"Shape of X: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Batch: 0
Shape of X: torch.Size([4096, 1, 150])
Shape of y: torch.Size([4096, 1]) torch.float32
Batch: 0
Shape of X: torch.Size([4096, 1, 150])
Shape of y: torch.Size([4096, 1]) torch.float32


## Define the model

In [7]:
'''This is a similar PyTorch implementation of the LSTM model from the paper:
    "Real-Time Guitar Amplifier Emulation with Deep Learning"
    https://www.mdpi.com/2076-3417/10/3/766/htm

    Uses a stack of two 1-D Convolutional layers, followed by LSTM, followed by 
    a Dense (fully connected) layer. Three preset training modes are available, 
    with further customization by editing the code. A PyTorch model 
    is implemented here.

    Note: RAM may be a limiting factor for the parameter "input_size". The wav data
      is preprocessed and stored in RAM, which improves training speed but quickly runs out
      if using a large number for "input_size".  Reduce this if you are experiencing
      RAM issues.
'''

if train_mode == 0:         # Speed Training
    learning_rate = 0.01 
    conv1d_strides = 12   
    conv1d_1_strides = 12
    conv1d_filters = 16
    hidden_units = 36
elif train_mode == 1:       # Accuracy Training (~10x longer than Speed Training)
    learning_rate = 0.01 
    conv1d_strides = 4
    conv1d_filters = 36
    hidden_units= 64
else:                       # Extended Training (~60x longer than Accuracy Training)
    learning_rate = 0.0005 
    conv1d_strides = 3
    conv1d_filters = 36
    hidden_units= 96

# Define model ########################################################
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.pad = nn.ConstantPad1d(padding=12, value=0)
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=conv1d_filters, kernel_size=12, stride=conv1d_strides)
        self.conv2 = nn.Conv1d(in_channels=conv1d_filters, out_channels=conv1d_filters, kernel_size=12, stride=conv1d_strides)
        self.lstm = nn.LSTM(input_size=16, hidden_size=hidden_units, batch_first = True, bias=True)
        self.linear = nn.Linear(in_features=hidden_units, out_features=1)

    def forward(self, x):
        x = self.pad(x)
        x = self.conv1(x)
        x = self.pad(x)
        x = self.conv2(x)
        x = x.permute(0, 2, 1)
        output, (hidden, cell) =  self.lstm(x)
        x = self.linear(output[:, -1, :])
        return x

model = NeuralNetwork().to(device)
summary = torchinfo.summary(model, (1, 1, 150), device=device)
print(summary)

# Define loss function and optimizer ##################################
loss_fn = nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

Layer (type:depth-idx)                   Output Shape              Param #
NeuralNetwork                            [1, 1]                    --
├─ConstantPad1d: 1-1                     [1, 1, 174]               --
├─Conv1d: 1-2                            [1, 16, 14]               208
├─ConstantPad1d: 1-3                     [1, 16, 38]               --
├─Conv1d: 1-4                            [1, 16, 3]                3,088
├─LSTM: 1-5                              [1, 3, 36]                7,776
├─Linear: 1-6                            [1, 1]                    37
Total params: 11,109
Trainable params: 11,109
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.04
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.04
Estimated Total Size (MB): 0.05


In [8]:
# Define training procedure ############################################
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Preprocess input and target data
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

# Define training procedure ############################################
def test(dataloader, model, loss_fn):
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
    test_loss /= num_batches
    print(f"Test Error: \n Avg loss: {test_loss:>8f} \n")

## Train the model

In [9]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(training_dataloader, model, loss_fn, optimizer)
    test(testing_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.030668  [ 4096/6587758]
loss: 0.001876  [413696/6587758]
loss: 0.000879  [823296/6587758]
loss: 0.000736  [1232896/6587758]
loss: 0.000589  [1642496/6587758]
loss: 0.000537  [2052096/6587758]
loss: 0.000478  [2461696/6587758]
loss: 0.000447  [2871296/6587758]
loss: 0.000449  [3280896/6587758]
loss: 0.000381  [3690496/6587758]
loss: 0.000383  [4100096/6587758]
loss: 0.000381  [4509696/6587758]
loss: 0.000355  [4919296/6587758]
loss: 0.000361  [5328896/6587758]
loss: 0.000334  [5738496/6587758]
loss: 0.000329  [6148096/6587758]
loss: 0.000337  [6557696/6587758]
Test Error: 
 Avg loss: 0.000272 

Done!


In [10]:
# Save the model #######################################################
torch.save(model.state_dict(), "models/"+name+"/"+name+".pth")
print("Saved PyTorch Model State to model.pth")

Saved PyTorch Model State to model.pth


## Run predictions
### 0. Load the model

In [11]:
model.load_state_dict(torch.load("models/"+name+"/"+name+".pth", map_location=torch.device('cpu')))

<All keys matched successfully>

### 1. On the test audio data

In [12]:
# Set the model to evaluate mode #################################
model.eval()
# Run prediction ##################################################
prediction = torch.zeros(0).to(device)
# X_input = torch.zeros(0).to(device)

# for X, _ in testing_dataloader:
#     X = X.to(device)
#     for i in range(X.shape[0]):
#         X_input = torch.cat((X_input, X[i, 0, 0].flatten()), 0)

# save_wav('models/'+name+'/x_input.wav', X_input.cpu().numpy())

print("Running prediction..")
with torch.no_grad():
    for X, _ in testing_dataloader:
        X = X.to(device)
        predicted_batch = model(X)
        prediction = torch.cat((prediction, predicted_batch.flatten()), 0)

save_wav('models/'+name+'/y_pred.wav', prediction.cpu().numpy())
save_wav('models/'+name+'/x_test.wav', X_testing.numpy())
save_wav('models/'+name+'/y_test.wav', y_testing)

print("X_testing shape: ", X_testing.shape)
print("X_ordered_testing shape: ", X_ordered_testing.shape)
print("y_testing shape: ", y_testing.shape)
print("prediction shape: ", prediction.shape)

print("Note that the prediction shape is smaller than the y_testing shape. This is because the first predicted sample needs input_size samples for prediction.\n")


Running prediction..
X_testing shape:  torch.Size([1646977])
X_ordered_testing shape:  torch.Size([1646828, 1, 150])
y_testing shape:  (1646977,)
prediction shape:  torch.Size([1646828])
Note that the prediction shape is smaller than the y_testing shape. This is because the first predicted sample needs input_size samples for prediction.



### 2. On a number sequence (to control inference)

In [13]:
batch_size_test = 2

# Test the model simple number sequence to compare with inference #
X_testing_2 = np.array([], dtype=np.float64)

for i in range(0, batch_size_test * input_size):
    X_testing_2 = np.append(X_testing_2, i*0.001)

X_testing_2 = np.expand_dims(X_testing_2, axis=0)
X_testing_2 = np.expand_dims(X_testing_2, axis=0)
X_testing_2 = np.reshape(X_testing_2, (batch_size_test, 1, input_size))

X_testing_2 = torch.from_numpy(X_testing_2).double()

print(f"X_testing_2 shape: {X_testing_2.shape}")

print("Running prediction..")
model = model.float()

prediction_2 = model(X_testing_2.to(device).float())

print(f"prediction {prediction_2}")

print("X_testing_2 shape: ", X_testing_2.shape)
print("prediction_2 shape: ", prediction_2.shape)

X_testing_2 shape: torch.Size([2, 1, 150])
Running prediction..
prediction tensor([[-0.1476],
        [-0.3037]], grad_fn=<AddmmBackward0>)
X_testing_2 shape:  torch.Size([2, 1, 150])
prediction_2 shape:  torch.Size([2, 1])


## Export as pt model

In [14]:
# An example input you would normally provide to your model's forward() method.
example = torch.rand(1, 1, input_size).to(device)

# Use torch.jit.trace to generate a torch.jit.ScriptModule via tracing.
traced_script_module = torch.jit.trace(model, example)
traced_script_module.save("models/"+name+"/"+"GuitarLSTM-dynamic.pt")

In [15]:
# An example input you would normally provide to your model's forward() method.
example = torch.rand(1, 1, input_size).to(device)
filepath = "models/"+name+"/"+"GuitarLSTM"+"-libtorch-dynamic.onnx"

# Export the model
torch.onnx.export(model=model,
                  args=example,
                  f=filepath,
                  export_params=True,
                  opset_version=17,
                  do_constant_folding=True,
                  input_names = ['input'],
                  output_names = ['output'],
                  dynamic_axes={'input' : {0 : 'batch_size'}, 'output' : {0 : 'batch_size'}})

  _C._jit_pass_onnx_node_shape_type_inference(node, params_dict, opset_version)
  _C._jit_pass_onnx_graph_shape_type_inference(
  _C._jit_pass_onnx_graph_shape_type_inference(
