In [119]:
import torch
import muspy as mp
import sys
import matplotlib.pyplot as plt
import numpy as np
import torch.utils.tensorboard as tb

sys.path.append(sys.path[0] + "/..")
import lib.prd as prd

In [3]:
ds_muspy = mp.datasets.EssenFolkSongDatabase("../data")
len(ds_muspy)


ds_prd = []
fail_counter = 0
nr = 9
for i, music in enumerate(ds_muspy):
    try:
        track_prd = prd.from_muspy(music)
    except Exception:
        continue
    else:
        ds_prd.append(torch.tensor(track_prd))
print(f"successfully converted: {len(ds_prd)}/{len(ds_muspy)}")

successfully converted: 9010/9034


In [4]:
class Codec:

    def __init__(self, raw: np.ndarray):
        self.code2raw = raw
        self.raw2code = {elem: i for i, elem in enumerate(raw)}

        self._encode = np.vectorize(lambda x: self.raw2code[x])
        self._decode = np.vectorize(lambda x: self.code2raw[x])
    
    def encode(self, raw: np.ndarray) -> np.ndarray:
        return self._encode(raw)
    
    def decode(self, encoded: np.ndarray) -> np.ndarray:
        return self._decode(encoded)

In [5]:
cat_prd = torch.concat(ds_prd)


min_pitch = torch.min(torch.concat(ds_prd)[:, 0])

temp = torch.concat(ds_prd)[:, 0]
max_pitch = torch.max(temp[temp != 128])

durations = torch.unique(cat_prd[:, 1])

print(min_pitch, max_pitch)

tensor(48) tensor(85)


In [6]:
dur_codec = Codec(durations.detach().numpy())
pitch_codec = Codec([128] + list(range(min_pitch, max_pitch + 1)))

In [7]:
pitch_codec.encode(cat_prd[:, 0])

array([27, 22, 25, ..., 21, 22, 24])

In [8]:
from collections import Counter

def pairs_from_tensor(tensor: torch.Tensor) -> list[torch.Tensor]:
    brange = range(0, len(tensor) - 2)
    trange = range(2, len(tensor))
    return [tensor[b:t] for b, t in zip(brange, trange)]

def settify_pairs_of_tensors(note_pairs: list[torch.Tensor]):
    return (torch.sort(pair, dim=0)[0] for pair in note_pairs)

def note_pairs_from_music(tune: torch.Tensor) -> list[torch.Tensor]:
    return list(settify_pairs_of_tensors(pairs_from_tensor(tune)))

pairs = []
for tune in ds_prd:
    pairs.extend(pairs_from_tensor(tune))
len(pairs)

463714

In [9]:
ds_enc = []
for tune in ds_prd:
    p = torch.from_numpy(pitch_codec.encode(tune[:, 0]))
    d = torch.from_numpy(dur_codec.encode(tune[:, 1]))
    ds_enc.append(torch.stack([p, d], dim=1))
len(ds_enc), len(ds_prd)

(9010, 9010)

In [10]:
def cbowify(tensor: torch.Tensor, window_size=2):
    ws = window_size

    mid_range = range(ws, len(tensor) - ws)

    cbows = []
    for i in mid_range:
        note = tensor[i:i+1]

        pre_notes = tensor[i-ws:i]
        post_notes = tensor[i+1:i+1+ws]

        adjacent_notes = torch.concat([pre_notes, post_notes])
    
        cbows.append((note, adjacent_notes))
    return cbows

note_and_adjacent = []
for tune in ds_enc:
    note_and_adjacent.extend(cbowify(tune))

len(note_and_adjacent)

445694

In [11]:
class NoteEmbeddingDataset(torch.utils.data.Dataset):

    def __init__(self, note_and_adjacent: list[tuple[torch.Tensor, torch.Tensor]]):
        self.note_and_adjacent = note_and_adjacent

    def __len__(self):
        return len(self.note_and_adjacent)
    
    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        x, y = self.note_and_adjacent[idx]
        return x.squeeze(), y

ds = NoteEmbeddingDataset(note_and_adjacent)
ds[10]


(tensor([22,  9]),
 tensor([[22,  6],
         [20, 12],
         [25,  3],
         [27,  6]]))

In [12]:
import torch.utils.data as torch_data

dataloader = torch_data.DataLoader(ds, batch_size=3)

In [13]:
x, y = next(iter(dataloader))
x.shape, y.shape

(torch.Size([3, 2]), torch.Size([3, 4, 2]))

In [14]:
class Embedder(torch.nn.Module):

    def __init__(self, pitch_count: int, duration_count: int, pitch_embed_dim: int, duration_embed_dim: int, shared_embed_dim: int):
        super().__init__()
        self.pitch_embedding = torch.nn.Sequential(
            torch.nn.Embedding(pitch_count, pitch_embed_dim),
            torch.nn.Flatten(),
            torch.nn.Sigmoid(),
        )

        self.duration_embedding = torch.nn.Sequential(
            torch.nn.Embedding(duration_count, duration_embed_dim),
            torch.nn.Flatten(),
            torch.nn.Sigmoid(),
        )

        self.shared_embedding = torch.nn.Sequential(
            torch.nn.Linear(pitch_embed_dim + duration_embed_dim, shared_embed_dim),
            torch.nn.Sigmoid(),
        )
    
    def forward(self, bag_of_words: torch.Tensor) -> torch.Tensor:
        bon = (bag_of_words[:, i, :] for i in range(bag_of_words.shape[1]))
        embedded = [self.single_forward(n) for n in bon]
        return torch.stack(embedded).mean(dim=0)

    def single_forward(self, x: torch.Tensor) -> torch.Tensor:
        pitch = x[:, 0:1]
        duration = x[:, 1:2]

        pitch = self.pitch_embedding(pitch)
        duration = self.duration_embedding(duration)

        shared_embedded = self.shared_embedding(torch.concat([pitch, duration], dim=1))

        return shared_embedded


embedder = Embedder(len(pitch_codec.code2raw), len(dur_codec.code2raw), 5, 7, 17)
note, bon = next(iter(dataloader))
embedder.forward(bon)

tensor([[0.3548, 0.3800, 0.4124, 0.5124, 0.3894, 0.6194, 0.2991, 0.6040, 0.5396,
         0.3275, 0.4495, 0.6351, 0.5357, 0.5912, 0.3874, 0.5917, 0.5027],
        [0.3560, 0.3473, 0.4168, 0.4941, 0.3685, 0.5863, 0.3123, 0.5728, 0.5400,
         0.3275, 0.4790, 0.6313, 0.5185, 0.6027, 0.3788, 0.5815, 0.4588],
        [0.3680, 0.3632, 0.4205, 0.4717, 0.3593, 0.6028, 0.3266, 0.5732, 0.5337,
         0.3348, 0.4683, 0.6359, 0.5071, 0.6102, 0.3764, 0.5642, 0.4810]],
       grad_fn=<MeanBackward1>)

In [15]:
class NoteParingModel(torch.nn.Module):

    def __init__(self, embedding_dim: int, pitch_embed_dim: int, duration_embed_dim: int, pitch_count: int, duration_count: int):
        super().__init__()

        self.pitch_prediction = torch.nn.Sequential(
            torch.nn.Linear(embedding_dim, pitch_embed_dim),
            torch.nn.Sigmoid(),
            torch.nn.Linear(pitch_embed_dim, pitch_count),
            torch.nn.Softmax(dim=1),
        )

        self.duration_prediction = torch.nn.Sequential(
            torch.nn.Linear(embedding_dim, duration_embed_dim),
            torch.nn.Sigmoid(),
            torch.nn.Linear(duration_embed_dim, duration_count),
            torch.nn.Softmax(dim=1),
        )

    def forward(self, embedded_space: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        pitch_distribution = self.pitch_prediction(embedded_space)
        duration_distribution = self.duration_prediction(embedded_space)
        return pitch_distribution, duration_distribution


embedder = Embedder(len(pitch_codec.code2raw), len(dur_codec.code2raw), 5, 7, 17)
pairer = NoteParingModel(17, 5, 7, len(pitch_codec.code2raw), len(dur_codec.code2raw))

In [16]:
pitch_count = len(pitch_codec.code2raw)
dur_count = len(dur_codec.code2raw)
pitch_emb_dim = 12
dur_emb_dim = 8
shared_emb_dim = 16
print(pitch_count, dur_count)

embedder = Embedder(pitch_count, dur_count, pitch_emb_dim, dur_emb_dim, shared_emb_dim)
pairer = NoteParingModel(shared_emb_dim, pitch_emb_dim, dur_emb_dim, pitch_count, dur_count)

lr = 0.001

train_dataloader = torch_data.DataLoader(ds, batch_size=16, shuffle=True)
criterion = torch.nn.CrossEntropyLoss()
embedder_optimizer = torch.optim.Adam(embedder.parameters(), lr=lr)
pairer_optimizer = torch.optim.Adam(pairer.parameters(), lr=lr)


global_step = 0
writer = tb.SummaryWriter("../runs")

for epoch in range(5):
    print(epoch)
    for y, x in train_dataloader:
        embedder.train()
        pairer.train()

        embedder.zero_grad()
        pairer.zero_grad()

        emb = embedder.forward(x)
        p_pred, d_pred = pairer.forward(emb)

        loss_pitch = criterion(p_pred, y[:, 0])
        loss_duration = criterion(d_pred, y[:, 1])

        loss: torch.Tensor = (loss_pitch + loss_duration) / 2

        loss.backward()

        embedder_optimizer.step()
        pairer_optimizer.step()

        if global_step % 100 == 0:
            writer.add_scalar("loss/pitch", loss_pitch, global_step)
            writer.add_scalar("loss/duration", loss_duration, global_step)
            writer.add_scalar("loss/average", loss, global_step)

        global_step += 1

39 42
0
1
2
3
4


In [17]:
deembedder = NoteParingModel(shared_emb_dim, pitch_emb_dim, dur_emb_dim, pitch_count, dur_count)

lr = 0.001

train_dataloader = torch_data.DataLoader(ds, batch_size=16, shuffle=True)
criterion = torch.nn.CrossEntropyLoss()
deembedder_optimizer = torch.optim.Adam(deembedder.parameters(), lr=lr)


global_step = 0
writer = tb.SummaryWriter("../runs")

for epoch in range(5):
    print(epoch)
    for x, y in train_dataloader:
        deembedder.train()
        deembedder.zero_grad()

        with torch.no_grad():
            emb = embedder.single_forward(x)

        p_pred, d_pred = pairer.forward(emb)

        loss_pitch = criterion(p_pred, x[:, 0])
        loss_duration = criterion(d_pred, x[:, 1])
        loss: torch.Tensor = (loss_pitch + loss_duration) / 2

        loss.backward()
        deembedder_optimizer.step()

        writer.add_scalar("loss/pitch", loss_pitch, global_step)
        writer.add_scalar("loss/duration", loss_duration, global_step)
        writer.add_scalar("loss/average", loss, global_step)

        global_step += 1

0
1
2
3
4


In [76]:
x, y = next(iter(train_dataloader))
p, d = pairer(embedder(y))

print(pitch_codec.decode(x[:, 0]))
print(dur_codec.decode(x[:, 1]))
pitch_decode = pitch_codec.decode(p.topk(5)[1])
print(pitch_decode)
# print(dur_codec.decode(d.topk(5)[0]))

[74 50 67 79 72 67 72 65 74 74 69 67 72 71 69 62]
[ 6 24 12  6  6  6 24 36 12  6 24 12 48 12  6 24]
[[69 67 64 72 71]
 [67 64 69 62 72]
 [69 67 64 72 62]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [67 69 64 62 72]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]
 [69 67 64 72 71]]


In [116]:
def metric_goals(distr, k=5):
    k = 5

    topk = distr.topk(k)[1]

    goals = (topk == x[:, 0].unsqueeze(dim=1)).int()

    weight = 1 / torch.arange(1, k+1)
    weight_matrix = torch.ones_like(topk) * weight
    return (goals * weight_matrix).mean()

metric_goals(p)

tensor(0.0556)

In [118]:
import itertools as it
def repetitiveness(tunes: torch.Tensor) -> float:
    """measures repetitions in a batch of tunes"""
    length = tunes.shape[1]
    tune_count = tunes.shape[0]
    combination_count = tune_count * (tune_count - 1) // 2

    same_cell_counter = 0
    for tune_1, tune_2 in it.combinations(tunes, 2):
        same_cell_counter += (tune_1 == tune_2).int().sum().item()

    return same_cell_counter / length / combination_count
# repetitiveness(p.topk(5)[1])
p.topk(5)[1]

tensor([[22, 20, 17, 25, 24],
        [20, 17, 22, 15, 25],
        [22, 20, 17, 25, 15],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [20, 22, 17, 15, 25],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24],
        [22, 20, 17, 25, 24]])

In [79]:
from lib.metrics import repetitiveness

repetitiveness(pitch_decode)

IndexError: tuple index out of range

In [40]:
pairer.forward(y)

RuntimeError: mat1 and mat2 must have the same dtype

In [36]:
emb = embedder.single_forward(x)
p, d = deembedder(emb)
# p, d = pairer(emb)
# p.topk(5)
# d.topk(1)
# p.topk(5)
d.topk(10)

torch.return_types.topk(
values=tensor([[0.0431, 0.0398, 0.0393, 0.0393, 0.0393, 0.0352, 0.0344, 0.0340, 0.0323,
         0.0300],
        [0.0422, 0.0393, 0.0392, 0.0389, 0.0384, 0.0366, 0.0352, 0.0333, 0.0329,
         0.0298],
        [0.0422, 0.0393, 0.0392, 0.0389, 0.0384, 0.0366, 0.0352, 0.0333, 0.0329,
         0.0298]], grad_fn=<TopkBackward0>),
indices=tensor([[13, 10, 35, 22, 40, 20,  1,  6, 15,  5],
        [13, 35, 40, 22, 10, 20,  1, 15,  6,  5],
        [13, 35, 40, 22, 10, 20,  1, 15,  6,  5]]))

In [None]:
torch.concat(pairs)

[tensor([[69, 12],
         [74, 24]]),
 tensor([[69, 12],
         [72, 12]]),
 tensor([[72, 12],
         [74, 24]]),
 tensor([[74, 24],
         [74, 24]]),
 tensor([[69, 18],
         [74, 24]]),
 tensor([[69,  6],
         [72, 18]]),
 tensor([[72,  6],
         [74, 12]]),
 tensor([[74, 12],
         [79, 12]]),
 tensor([[72, 12],
         [79, 12]]),
 tensor([[69, 12],
         [72, 12]]),
 tensor([[67, 12],
         [69, 24]]),
 tensor([[67, 18],
         [69, 24]]),
 tensor([[69,  6],
         [72, 18]]),
 tensor([[72,  6],
         [74, 12]]),
 tensor([[74, 12],
         [79, 12]]),
 tensor([[74, 12],
         [79, 12]]),
 tensor([[72, 12],
         [74, 24]]),
 tensor([[69, 12],
         [72, 24]]),
 tensor([[69, 12],
         [72, 18]]),
 tensor([[69,  6],
         [72, 18]]),
 tensor([[67,  6],
         [69, 12]]),
 tensor([[64, 12],
         [67, 12]]),
 tensor([[62, 12],
         [64, 48]]),
 tensor([[62, 12],
         [67, 48]]),
 tensor([[66, 12],
         [67, 12]]),


In [None]:
{torch.tensor([1, 2]), torch.tensor([1, 2])}

{tensor([1, 2]), tensor([1, 2])}

In [None]:
463714
pair_set

{tensor([[65, 12],
         [67, 24]]),
 tensor([[74,  6],
         [74,  6]]),
 tensor([[75, 12],
         [77, 12]]),
 tensor([[68, 12],
         [71, 12]]),
 tensor([[69, 12],
         [71, 12]]),
 tensor([[72, 18],
         [74, 24]]),
 tensor([[60, 12],
         [60, 12]]),
 tensor([[ 70,  12],
         [128,  24]]),
 tensor([[69, 24],
         [70, 24]]),
 tensor([[69, 12],
         [71, 36]]),
 tensor([[62,  6],
         [64,  6]]),
 tensor([[72,  6],
         [74, 72]]),
 tensor([[76, 12],
         [76, 12]]),
 tensor([[69, 12],
         [72, 36]]),
 tensor([[67,  8],
         [69,  8]]),
 tensor([[72,  6],
         [72,  6]]),
 tensor([[65,  6],
         [67, 12]]),
 tensor([[67,  8],
         [67,  8]]),
 tensor([[74, 48],
         [74, 96]]),
 tensor([[65, 48],
         [67, 96]]),
 tensor([[72, 12],
         [72, 24]]),
 tensor([[64, 12],
         [67, 48]]),
 tensor([[55, 12],
         [55, 12]]),
 tensor([[76,  6],
         [78, 18]]),
 tensor([[72, 12],
         [74, 12]

In [None]:
pair_tensor = torch.concat(list(pair_set))

tensor([[65, 12],
        [67, 24],
        [74,  6],
        [74,  6],
        [75, 12],
        [77, 12],
        [68, 12],
        [71, 12],
        [69, 12],
        [71, 12]])

In [121]:
pitch_codec.decode([27, 25, 15, 15, 22, 22])

array([74, 72, 62, 62, 69, 69])