In [1]:
import numpy as np
import torch
from torch.utils.data import random_split, Dataset, DataLoader
from tqdm.autonotebook import tqdm

import glob
import os
import random

from sklearn.preprocessing import StandardScaler, LabelEncoder

In [2]:
from google.colab import drive
drive.mount("/bebra")

Mounted at /bebra


In [3]:
np.random.seed(42)
torch.random.manual_seed(42)
torch.cuda.manual_seed(42)

In [4]:
!tar -xzf /bebra/MyDrive/data_for_ANN_ML/kurs_train/gtzan-jukebox.tar.gz
!ls features

blues.00000.npy      country.00050.npy	jazz.00000.npy	 pop.00050.npy
blues.00001.npy      country.00051.npy	jazz.00001.npy	 pop.00051.npy
blues.00002.npy      country.00052.npy	jazz.00002.npy	 pop.00052.npy
blues.00003.npy      country.00053.npy	jazz.00003.npy	 pop.00053.npy
blues.00004.npy      country.00054.npy	jazz.00004.npy	 pop.00054.npy
blues.00005.npy      country.00055.npy	jazz.00005.npy	 pop.00055.npy
blues.00006.npy      country.00056.npy	jazz.00006.npy	 pop.00056.npy
blues.00007.npy      country.00057.npy	jazz.00007.npy	 pop.00057.npy
blues.00008.npy      country.00058.npy	jazz.00008.npy	 pop.00058.npy
blues.00009.npy      country.00059.npy	jazz.00009.npy	 pop.00059.npy
blues.00010.npy      country.00060.npy	jazz.00010.npy	 pop.00060.npy
blues.00011.npy      country.00061.npy	jazz.00011.npy	 pop.00061.npy
blues.00012.npy      country.00062.npy	jazz.00012.npy	 pop.00062.npy
blues.00013.npy      country.00063.npy	jazz.00013.npy	 pop.00063.npy
blues.00014.npy      country.00064

In [5]:
batch_size = 16
epochs = 20

input_dim = 4800
embed_dim = 512
output_dim = 10

In [6]:
class EmbedMusic(torch.nn.Module):
  def __init__(self, input_dim=input_dim, embed_dim=embed_dim):
    super().__init__()
    self.embed_dim = embed_dim
    self.input_dim = input_dim
    self.embedding_layers = torch.nn.ModuleList([torch.nn.Linear(input_dim, embed_dim), torch.nn.Linear(embed_dim, embed_dim)])

  def forward(self, x):
    for emb in self.embedding_layers:
      x = emb(x)
    return x

class ShallowClassifier(torch.nn.Module):
  def __init__(self, input_dim=input_dim, embed_dim=embed_dim, output_dim=output_dim):
    super().__init__()
    self.embed_dim = embed_dim
    self.input_dim = input_dim
    self.output_dim = output_dim 

    self.embedder = EmbedMusic(input_dim, embed_dim)
    self.classifier = torch.nn.Linear(embed_dim, output_dim)

  def embed(self, x):
    return self.embedder(x)

  def forward(self, x):
    y = self.embedder(x)
    y = self.classifier(y)
    return y 

In [7]:
class GTZANDataset(Dataset):
  def __init__(self, features_path):
    self.npy_paths = sorted(glob.glob(features_path + '*.npy'))
    assert len(self.npy_paths) == 1000
    random.seed(0)
    random.shuffle(self.npy_paths)

    self.X = torch.tensor([np.load(p) for p in self.npy_paths])

    self.le = LabelEncoder()
    self.y = self.le.fit_transform(np.array([os.path.split(p)[1].split('.')[0] for p in self.npy_paths]))
    self.y = torch.tensor(self.y)


  def __getitem__(self, idx):
    return self.X[idx], self.y[idx]


  def get_genre_sample(self, genre, idx):
    genre_encoded = list(self.le.classes_).index(genre)
    genre_samples = self.y == genre_encoded
    item_idx = [i for i, n in enumerate(genre_samples) if n][idx]
    return self.X[item_idx]


  def __len__(self):
    return self.X.shape[0]

In [8]:
train_dataset, test_dataset = random_split(GTZANDataset("features/"), [0.8, 0.2])

  self.X = torch.tensor([np.load(p) for p in self.npy_paths])


In [9]:
train_dataset[0][0].shape

torch.Size([4800])

In [16]:
model = ShallowClassifier()
loss_function = torch.nn.CrossEntropyLoss()
optim = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-5)

best_model_state_dict = None
best_accuracy = 0

for epoch in tqdm(range(epochs)):
  train_data, val_data =  random_split(train_dataset, [0.8, 0.2])
  train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
  val_loader = DataLoader(val_data, shuffle=False, batch_size=batch_size)

  # training stage
  loss_agg = torch.tensor([])
  iter = tqdm(train_loader)
  iter.set_description(f"Training, epoch: {epoch}")
  for batch in iter:
    X, y = batch

    y_hat = model(X)
    optim.zero_grad()
    loss = loss_function(y_hat, y)
    loss_agg = torch.cat((loss_agg, torch.tensor([loss.item()])))
    loss.backward()
    optim.step()
    iter.set_postfix({"loss": loss_agg.mean()})
  
  # validation stage
  with torch.no_grad():
    accuracy_agg = torch.tensor([])
    iter = tqdm(val_loader)
    iter.set_description(f"Validating, epoch: {epoch}")
    for batch in iter:
      X, y = batch
      y_hat = model(X)
      accuracy_agg = torch.cat((accuracy_agg, torch.tensor([(y == torch.argmax(y_hat, dim=1)).sum() / y.shape[0]])))
      iter.set_postfix({"accuracy": accuracy_agg.mean()})

    # memory save best model
    if best_accuracy < accuracy_agg.mean():
      best_model_state_dict = model.state_dict()
      best_accuracy = accuracy_agg.mean()

model.load_state_dict(best_model_state_dict)
predicted = torch.tensor([]).reshape(0, output_dim)
y = torch.tensor([]).reshape(0, 1)
with torch.no_grad():
  for test_batch in tqdm(DataLoader(test_dataset, batch_size=batch_size, shuffle=False)):
    predicted = torch.vstack((predicted, model(test_batch[0])))
    y = torch.vstack((y, test_batch[1].unsqueeze(1)))

predicted = torch.argmax(predicted, 1)
accuracy = torch.sum(predicted == y.squeeze())
accuracy = accuracy / predicted.shape[0]
print(accuracy)


# save best model to gdisk
torch.save(best_model_state_dict, "/bebra/MyDrive/data_for_ANN_ML/kurs_train/classifier.pt")

  0%|          | 0/13 [00:00<?, ?it/s]

tensor(0.8850)


In [None]:
train_dataset.dataset.le.classes_

array(['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz',
       'metal', 'pop', 'reggae', 'rock'], dtype='<U9')

In [None]:
with torch.no_grad():
  pop_1 = train_dataset.dataset.get_genre_sample("pop", 1)
  pop_2 = train_dataset.dataset.get_genre_sample("pop", 50)
  rock_1 = train_dataset.dataset.get_genre_sample("rock", 2)

  pop_1_embed = model.embed(pop_1)
  pop_2_embed = model.embed(pop_2)
  rock_1_embed = model.embed(rock_1)

  print(f"pop_1 pop_2: {torch.nn.functional.cosine_similarity(pop_1_embed, pop_2_embed, dim=0)}")
  print(f"pop_1 rock_1: {torch.nn.functional.cosine_similarity(pop_1_embed, rock_1_embed, dim=0)}")
  print(f"rock_1 pop_2: {torch.nn.functional.cosine_similarity(rock_1_embed, pop_2_embed, dim=0)}")

pop_1 pop_2: 0.9767765998840332
pop_1 rock_1: 0.4867061972618103
rock_1 pop_2: 0.46343111991882324


In [None]:
def test_random_embeddings(test_size=5):
  np.random.seed(0)
  genres = np.array(['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz',
       'metal', 'pop', 'reggae', 'rock'])
  samples_idx = np.random.random_integers(0, 100, (test_size, 2))
  samples_genres = np.random.choice(genres, (test_size, 2))
  for idxs, genres in zip(samples_idx, samples_genres):
    print(f"\nCOMPARE {genres[0]} WITH {genres[1]}")
    mixs = [(genres[0], idxs[1]), (genres[0], idxs[0]), (genres[1], idxs[1]), (genres[1], idxs[0])]
    songs = map(lambda x: train_dataset.dataset.get_genre_sample(*x), mixs)
    embeds = list(map(model.embed, songs))

    for i in range(len(mixs) - 1):
      cos = torch.nn.functional.cosine_similarity(embeds[i], embeds[i + 1], dim=0)
      print(f"{mixs[i][0]}_{mixs[i][1]}, {mixs[i + 1][0]}_{mixs[i + 1][1]}: {cos}")

test_random_embeddings()


COMPARE metal WITH reggae
metal_47, metal_44: 0.8419066071510315
metal_44, reggae_47: -0.22881433367729187
reggae_47, reggae_44: 0.8500155210494995

COMPARE reggae WITH classical
reggae_67, reggae_64: 0.41360610723495483
reggae_64, classical_67: -0.03475770354270935
classical_67, classical_64: 0.8088746070861816

COMPARE metal WITH pop
metal_9, metal_67: 0.8450638055801392
metal_67, pop_9: 0.3266396224498749
pop_9, pop_67: 0.9376591444015503

COMPARE pop WITH reggae
pop_21, pop_83: 0.9677761793136597
pop_83, reggae_21: 0.29038333892822266
reggae_21, reggae_83: 0.7081164121627808

COMPARE classical WITH jazz
classical_87, classical_36: 0.9775892496109009
classical_36, jazz_87: 0.4397090673446655
jazz_87, jazz_36: 0.7670404314994812


  samples_idx = np.random.random_integers(0, 100, (test_size, 2))
