In [None]:
import collections
import itertools
from IPython.display import Audio, HTML, display

import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
import numpy as np

from chord_progressions.io.audio import combine_buffers
from chord_progressions.utils import is_circular_match
from chord_progressions.type_templates import get_template_from_template_str
from torch.utils.data import DataLoader
from tqdm import tqdm

from rhythmic_relationships.data import PartPairDataset

from notebook_utils import (
    get_chroma_vocab,
    get_audio_el,
    mk_voiced_chroma_buffer,
    get_voiced_hits_from_chroma,
)

display(HTML("<style>.container { width:90% !important; }</style>"))

## Load the vocabulary

In [None]:
CHROMA_VOCAB = get_chroma_vocab()
inv_vocab = {v: k for k, v in CHROMA_VOCAB.items()}
stoi = {s: i for i, s in enumerate(inv_vocab)}
itos = {i: s for s, i in stoi.items()}
itot = {ix: i for ix, i in enumerate(list(CHROMA_VOCAB))}
ttoi = {v: k for k, v in itot.items()}

vocab_size = len(CHROMA_VOCAB)
print(f"Vocab size: {vocab_size}")

def get_type_from_template(template):
    for chord_type in list(CHROMA_VOCAB):
        if is_circular_match(
            template,
            get_template_from_template_str(CHROMA_VOCAB[chord_type]),
        ):
            return chord_type
    return None

def pclist_to_i(pclist):
    """Gets a chord type index from a pitch class list
    e.g. pclist_to_i([1,0,0,0,0,0,0,0,0,0,0,0]) -> 1
    """
    chord_type = get_type_from_template(pclist)
    if not chord_type:
        chord_type = "oov"
    template_str = CHROMA_VOCAB[chord_type]
    return stoi[template_str]

## Build the dataset

In [None]:
# Load the dataset
dataset_config = {
    "dataset_name": "babyslakh_20_1bar_4res",
    "part_1": 'Guitar',
    "part_2": 'Piano',
    "repr_1": "chroma",
    "repr_2": "chroma",
}

data = PartPairDataset(**dataset_config)
loader = DataLoader(data, batch_size=1, shuffle=True)

# How many chords types do we use to predict the next one
context_len = 8

# Hold an even amount of x and y; hold an extra step of y if the context length is odd
n_ys = context_len // 2 if context_len % 2 == 0 else context_len // 2 + 1
n_xs = context_len // 2

silence_i = stoi[CHROMA_VOCAB['silence']]

prev_xs = []
X, Y = [], []

for x, y in tqdm(loader):
    # Binarize the chromas
    x = (x > 1).to(torch.int32)[0]
    y = (y > 1).to(torch.int32)[0]

    # fill context with silence
    context = [silence_i] * context_len

    for xrow, yrow in zip(x, y):
        ixx = pclist_to_i(xrow.tolist())
        ixy = pclist_to_i(yrow.tolist())
        # print(','.join(itot[i] for i in context), '-->', itot[ixy])

        X.append(context)
        prev_xs.append(ixx)
        Y.append(ixy)

        context = Y[-n_ys:] + prev_xs[-n_xs:]
        while len(context) < context_len:
            context = [silence_i] + context


X = torch.tensor(X)
Y = torch.tensor(Y)
n_examples = X.nelement()
print(f'{n_examples=}')
print(X.shape, Y.shape)

In [None]:
for x,y in zip(X[:10], Y[:10]):
    print(','.join([itot[ix.item()] for ix in x]), '-->', itot[y.item()])

In [None]:
# The classes we create here are the same API as nn.Module in PyTorch
class Linear:
    def __init__(self, fan_in, fan_out, bias=True):
        self.weight = torch.randn((fan_in, fan_out)) / fan_in**0.5
        self.bias = torch.zeros(fan_out) if bias else None

    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out

    def parameters(self):
        return [self.weight] + ([] if self.bias is None else [self.bias])


class BatchNorm1d:
    def __init__(self, dim, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
        # parameters (trained with backprop)
        self.gamma = torch.ones(dim)
        self.beta = torch.zeros(dim)
        # buffers (trained with a running 'momentum update')
        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)

    def __call__(self, x):
        # calculate the forward pass
        if self.training:
            if x.ndim == 2:
                dim = 0
            if x.ndim == 3:
                dim = (0, 1)
            xmean = x.mean(dim, keepdim=True)  # batch mean
            xvar = x.var(dim, keepdim=True)  # batch variance
        else:
            xmean = self.running_mean
            xvar = self.running_var
        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)  # normalize to unit variance
        self.out = self.gamma * xhat + self.beta
        # update the buffers
        if self.training:
            with torch.no_grad():
                self.running_mean = (
                    1 - self.momentum
                ) * self.running_mean + self.momentum * xmean
                self.running_var = (
                    1 - self.momentum
                ) * self.running_var + self.momentum * xvar
        return self.out

    def parameters(self):
        return [self.gamma, self.beta]


class Tanh:
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out

    def parameters(self):
        return []
    
class Embedding:
    def __init__(self, num_embeddings, embedding_dim):
        self.weight = torch.randn((num_embeddings, embedding_dim))
    
    def __call__(self, IX):
        self.out = self.weight[IX]
        return self.out
    
    def parameters(self):
        return [self.weight]

class FlattenConsecutive:
    def __init__(self, n):
        self.n = n

    def __call__(self, x):
        B, T, C = x.shape
        x = x.view(B, T//self.n, C*self.n)
        if x.shape[1] == 1:
            x = x.squeeze(1)
        self.out = x
        return self.out
    
    def parameters(self):
        return []
    
class Sequential:
    
    def __init__(self, layers):
        self.layers = layers
    
    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out
    
    def parameters(self):
        # get parameters of all layers and stretch them out into one list
        return [p for layer in self.layers for p in layer.parameters()]

In [None]:
torch.manual_seed(42);

In [None]:
# Bengio et al has a vocab size of 17k and they embed them in a 30-dimensional space
# Our vocab size is much smaller, so we can use a much smaller embedding space

n_embed = 36  # the dimensionality of the character embedding vectors
n_hidden = 500  # the number of neurons in the hidden layer of the MLP

model = Sequential([
    Embedding(vocab_size, n_embed),
    FlattenConsecutive(2), Linear(n_embed * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    Linear(n_hidden, vocab_size),
])

# parameter init
with torch.no_grad():
    model.layers[-1].weight *= 0.1 # last layer make less confident

parameters = model.parameters()
for p in parameters:
    p.requires_grad = True
print(f'num total params: {sum(p.nelement() for p in parameters)}')

In [None]:
# # Looking at a batch of just 4 examples
# ix = torch.randint(0, X.shape[0], (4,))
# Xb, Yb = X[ix], Y[ix]
# logits = model(Xb)
# print(Xb.shape)
# print(Xb)

# for layer in model.layers:
#     print(layer.__class__.__name__, ':', tuple(layer.out.shape))

In [None]:
max_steps = 50000
batch_size = 128
lossi = []
ud = [] # update:data ratio

for i in range(max_steps):
    # minibatch construct
    ix = torch.randint(0, X.shape[0], (batch_size,))
    Xb, Yb = X[ix], Y[ix]  # batch X,Y

    # forward pass
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb)  # loss function

    for p in parameters:
        p.grad = None
    loss.backward()

    # update
    lr = 0.1 if i < 40000 else 0.01  # step learning rate decay
    for p in parameters:
        p.data += -lr * p.grad

    # track stats
    if i % 10000 == 0:
        print(f"{i:7d}/{max_steps:7d}: {loss.item():.4f}")
    lossi.append(loss.log10().item())
    with torch.no_grad():
        ud.append(
            [((lr * p.grad).std() / p.data.std()).log10().item() for p in parameters]
        )


print(f"{i:7d}/{max_steps:7d}: {loss.item():.4f}")

In [None]:
plt.plot(torch.tensor(lossi).view(-1, 1000).mean(1));

## performance log
- original (3 character context, 200 hidden neurons, 172028 params): 1.0094
- context 3->8 (182028 params): 0.5128
- flat -> hierarchical (169209 params): 0.3639
- scale up: n_embed = 36, n_hidden = 500

### Inference

In [None]:
duration = 3  # seconds

# Get an examples from the dataset for reference
x, y = next(iter(loader))

# Binarize the chromas
x = (x > 1).to(torch.int32)[0]
y = (y > 1).to(torch.int32)[0]

x_t = []
x_chord_types = []

y_t = []
y_chord_types = []
for xrow, yrow in zip(x, y):
    ixx = pclist_to_i(xrow.tolist())
    ixy = pclist_to_i(yrow.tolist())
    
    x_t.append(ixx)
    y_t.append(ixy)

    x_chord_types.append(itot[ixx])
    y_chord_types.append(itot[ixy])

x_t = [ttoi[''.join(map(str, i))] for i in x_chord_types]
x_chroma = np.array([list(map(int, CHROMA_VOCAB[i])) for i in x_chord_types])
x_voiced_hits = get_voiced_hits_from_chroma(x_chroma)
x_buff = mk_voiced_chroma_buffer(x_voiced_hits, duration=duration, n_overtones=1)
x_hits = (x_chroma.sum(axis=1) > 0).astype(np.int8)

y_chroma = np.array([list(map(int, list(itos[i]))) for i in y_t])
y_hits = (y_chroma.sum(axis=1) > 0).astype(np.int8)
y_voiced_hits = get_voiced_hits_from_chroma(y_chroma)
y_buff = mk_voiced_chroma_buffer(y_voiced_hits, duration=duration, n_overtones=4)

buffs = collections.defaultdict(list)
buffs["".join(map(str, y_hits))].append([y_buff, combine_buffers([x_buff, y_buff])])


html = (
    f'Input {dataset_config["part_1"]}</br>{"".join(map(str, x_hits))}</br>{get_audio_el(x_buff)}'
    + f'</br></br>Predicted {dataset_config["part_2"]}</br></br>Samples from learned distribution'
    + "".join([f"</br>{k}</br>{get_audio_el(v[0][0])}predicted</br>{get_audio_el(v[0][1])}combined" for k, v in buffs.items()])
)

HTML(html)

In [None]:
# disable gradient tracking
@torch.no_grad()
def disable_grads():
    return

# Set layers to eval mode
for layer in model.layers:
    layer.training = False

# sample from the model
n_samples = 10

input_chord_types = x_chord_types

in_t = [ttoi[''.join(map(str, i))] for i in input_chord_types]

in_chroma = np.array([list(map(int, CHROMA_VOCAB[i])) for i in input_chord_types])
in_voiced_hits = get_voiced_hits_from_chroma(in_chroma)
in_buff = mk_voiced_chroma_buffer(in_voiced_hits, duration=duration, n_overtones=1)
in_hits = (in_chroma.sum(axis=1) > 0).astype(np.int8)

buffs = collections.defaultdict(list)

for _ in range(n_samples):
    out = []

    context = [silence_i] * (context_len - 1) + [in_t[0]]

    with torch.no_grad():
        for tix, t in enumerate(in_t):
            # Forward pass the neural net
            logits = model(torch.tensor([context]))
            probs = F.softmax(logits, dim=1)

            # sample from the distribution
            ix = torch.multinomial(probs, num_samples=1).item()
            out.append(ix)

            # Shift the context window and track the samples
            context = out[-n_ys:] + in_t[-n_xs:]
            while len(context) < context_len:
                context = [silence_i] + context

    out_chroma = np.array([list(map(int, list(itos[i]))) for i in out])
    out_hits = (out_chroma.sum(axis=1) > 0).astype(np.int8)
    out_voiced_hits = get_voiced_hits_from_chroma(out_chroma)
    out_buff = mk_voiced_chroma_buffer(out_voiced_hits, duration=duration, n_overtones=4)
    buffs["".join(map(str, out_hits))].append([out_buff, combine_buffers([in_buff, out_buff])])

html = (
    f'Input {dataset_config["part_1"]}</br>{"".join(map(str, in_hits))}</br>{get_audio_el(in_buff)}'
    + f'</br></br>Predicted {dataset_config["part_2"]}</br></br>Samples from model'
    + "".join([f"</br>{k}</br>{get_audio_el(v[0][0])}predicted</br>{get_audio_el(v[0][1])}combined" for k, v in buffs.items()])
)

HTML(html)