# pyBela Drum Synth

In this notebook we'll look at using pyBela to capture onset features from the drumsynth project. This includes a Bela project that has been updated to log onset energy and spectral centroid audio features computed at detected onsets using `Watcher`. We'll look at training a small MLP to regress synthesis parameters based on those features.

First, establish a connection with Bela and copy the project.

In [None]:
! ssh-keyscan bela.local >> ~/.ssh/known_hosts

In [None]:
! rsync -rvL --exclude 'main.cpp' --exclude 'DrumControllerInference.cpp' src/ root@bela.local:Bela/projects/pybela-drumsynth

## 1 - Collect dataset
To collect data run the `pybela-drumsynth` project on Bela (you can do so from web-based IDE)


In [None]:
from pybela import Logger
import asyncio
import os

logger=Logger()
logger.connect()

In [None]:
variables = ["onsetEnergy", "spectralCentroid"]
data = {}

Record data for three different classes -- run this cell three times and update the classes [0,1,2]

In [None]:
class_num = 2
read_time = 10

file_paths = logger.start_logging(variables=variables)
await asyncio.sleep(read_time)
logger.stop_logging()

# Extract the data from the binary
raw_sc = logger.read_binary_file(file_path=file_paths["local_paths"]["spectralCentroid"], timestamp_mode="sparse")
raw_oe = logger.read_binary_file(file_path=file_paths["local_paths"]["onsetEnergy"], timestamp_mode="sparse")

spectral_centroid = []
onset_energy = []

# Loop through all the buffers and each data point in each buffer.
# PyBela appends zeros to the end of buffers so disregard those if values
# in each variable are both zeros.
for sc_buffer, oe_buffer in zip(raw_sc['buffers'], raw_oe['buffers']):
    for x, y in zip(sc_buffer['data'], oe_buffer['data']):
        if x == 0 and y == 0:
            continue
        spectral_centroid.append(x)
        onset_energy.append(y)

assert len(spectral_centroid) == len(onset_energy)
print(f"Found {len(spectral_centroid)} points")

data[class_num] = {
    "sc": spectral_centroid,
    "oe": onset_energy,
}

## 2 - Visualize Data

Let's look at the data we collected from audio features

In [None]:
import matplotlib.pyplot as plt

In [None]:
for k,v in data.items():
    plt.scatter(v["sc"], v["oe"], label=f"Class {k}")
    
plt.xlabel("Spectral Centroid (Bin Number)")
plt.ylabel("Energy")
plt.title("Scatter Plot of Extracted Onset Features")
plt.legend()
plt.show()

## 2 - Train model

Now let's train a model to map from these values to different synth presets

In [None]:
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm 
import pprint as pp
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
preset_1 = [0.30, 0.60, 0.30, 0.92, 0.75, 0.50, 0.50]
preset_2 = [0.77, 0.35, 0.12, 0.37, 0.24, 0.76, 0.64]
preset_3 = [0.16, 0.50, 0.53, 0.77, 0.20, 0.30, 0.50]
presets = [preset_1, preset_2, preset_3]

In [None]:
class SynthOnsetDataset(Dataset):
    """
    PyTorch Dataset that returns input features and groud truth synth parameters
    """

    def __init__(self, data, presets, device):
        super().__init__()
        assert len(data) == len(presets), "Must have same number of classes as synth presets"

        self.device = device
        self.presets = torch.tensor(presets, device=self.device).float()
        self.features = []
        self.classes = []
    
        for k, v in data.items():
            for features in zip(v["sc"], v["oe"]):
                self.classes.append(k)
                self.features.append(features)
        self.features = torch.tensor(self.features, device=self.device).float()

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.presets[self.classes[idx]]
        

In [None]:
batch_size = 8
dataset = SynthOnsetDataset(data, presets, device)

# Split dataset
train_count = int(0.9 * dataset.__len__())
test_count = dataset.__len__() - train_count
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, (train_count, test_count)
)

# Dataloaders
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class MLP(nn.Module):
    """
    A Multilayer Perceptron for Parameter Estimation
    """

    def __init__(
        self,
        in_size: int,  # Input parameter size
        hidden_size: int,  # Hidden layer size
        out_size: int,  # Output parameter size
        num_layers: int,  # Number of hidden layers
        activation: torch.nn.Module = torch.nn.LeakyReLU(),  # Activation function
    ):
        super().__init__()
        channels = [in_size] + (num_layers) * [hidden_size]
        net = []
        for i in range(num_layers):
            net.append(torch.nn.Linear(channels[i], channels[i + 1]))
            net.append(torch.nn.LayerNorm(channels[i + 1], elementwise_affine=False))
            net.append(activation)

        net.append(torch.nn.Linear(channels[-1], out_size))
        self.in_size = in_size
        self.net = torch.nn.Sequential(*net)

    def forward(self, x: torch.Tensor):
        y = self.net(x)
        y = (torch.tanh(y) + 1.0) * 0.5 # Apply tanh to constrain range and scale to [0,1]
        return y
    

In [None]:
x, y = dataset[0]
model = MLP(x.shape[0], 32, y.shape[0], 2).to(device)

y_hat = model(x)
print(y_hat)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.L1Loss()

In [None]:
epochs = 500

pbar = tqdm(range(epochs))
for i in pbar:

    # Training step
    error_log = []
    for x, y in train_loader:
        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        y_hat = model(x)

        # Compute error and gradients
        error = loss_fn(y_hat, y)
        error.backward()

        # Do optimization step
        optimizer.step()

        error_log.append(error.detach().cpu().item())

    epoch_loss = np.mean(error_log)
    pbar.set_description(f"Epoch {i} | Train Loss: {epoch_loss:.4f}")

In [None]:
model.eval()

error_log = []
for x, y, in test_loader:
    with torch.no_grad():
        y_hat = model(x)

    error = loss_fn(y_hat, y)
    error_log.append(error.detach().cpu().item())

print(f"Model error: {np.mean(error_log):.4f}")

In [None]:
model.to(device='cpu')
model.eval()
script = torch.jit.script(model)
path = "drum_model.jit"
script.save(path)

In [None]:
torch.jit.load(path) # check model is properly saved

In [None]:
! rsync -av ./drum_model.jit root@bela.local:Bela/projects/pybela-drumsynth-inference/