# Жанровая классификация

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import keras
import matplotlib.pyplot as plt
import librosa
import tensorflow as tf

In [None]:
tf.config.list_physical_devices()

## Фильтрация метаданных

In [None]:
import glob


DATA_DIR = './data/fma_small'
METADATA_DIR = './data/fma_metadata/'

mp3_files = glob.glob(DATA_DIR + '/*/*.mp3')
mp3_names = list(map(lambda f: np.int64(f.split('/')[-1].split('.')[0]), mp3_files))

raw_tracks = pd.read_csv(METADATA_DIR + 'raw_tracks.csv')
tracks = raw_tracks[raw_tracks['track_id'].isin(mp3_names)]

tracks

## Сбор признаков, полученных с помощью `librosa`

In [None]:
features_df = pd.read_csv(METADATA_DIR + 'features.csv', index_col=0, header=[0, 1, 2])
features_df = features_df[features_df.index.isin(mp3_names)]

features = np.unique(list(map(lambda x: x[0], list(features_df.columns))))

print(f"Features available: {features}")
print(f"Total: {len(features)}")

features_df

## Отбор признаков

Рассмотрим всю имеющуюся информацию о треках

In [None]:
tracks.columns

Оценим число непустых значений тегов

In [None]:
tracks['tags'].map(lambda x: None if x == '[]' else x).notnull().value_counts()

Подсчитаем число уникальных тегов

In [None]:
from functools import reduce


unique_tags = reduce(lambda tags, l: tags.union(eval(l)), tracks['tags'], set())
print(len(unique_tags))

Оставим предположительно полезную информацию из набора данных. Убедимся
в её необходимости позже.

In [None]:
to_keep = [
  'track_id', "album_id", "artist_id", "track_duration", 
  "track_genres", "track_instrumental", "track_interest", "track_listens",
]

filtered_tracks = tracks[to_keep]
filtered_tracks

Преобразуем время в секунды

In [None]:
def duration_to_int(t):
  splitted = t.split(":")
  
  return int(splitted[0]) * 60 + int(splitted[1])

filtered_tracks.loc[:,'track_duration'] = filtered_tracks.track_duration.apply(duration_to_int)
filtered_tracks

Узнаем количество жанров для треков

In [None]:
import json


genres = filtered_tracks['track_genres'].map(lambda x: json.loads(x.replace("'", "\"")))
genre_ids = genres.map(lambda x: list(map(lambda y: y['genre_id'], x)))
genre_ids.map(lambda x: len(x)).value_counts()

Определим базовые жанры для каждого трека

In [None]:
all_genres = pd.read_csv(METADATA_DIR + 'genres.csv')

base_genres = genre_ids.map(lambda x: all_genres[all_genres.genre_id == int(x[0])].iloc[0].top_level)

filtered_tracks['track_genres'] = base_genres
filtered_tracks

In [None]:
base_genres.value_counts()

Получили 8 сбалансированных классов

In [None]:
import seaborn as sns

def display_corr(df):
  corr = df.corr()
  cmap = sns.diverging_palette(230, 20, as_cmap=True)
  mask = np.triu(np.ones_like(corr, dtype=bool))
  sns.heatmap(corr, mask=mask, cmap=cmap)
  
display_corr(filtered_tracks)

Жанр трека очень плохо коррелирует с его длительностью, поэтому исключим
этот признак из рассмотрения

In [None]:
filtered_tracks = filtered_tracks.drop('track_duration', axis=1)

Теперь добавим значения, предпосчитанные с помощью `librosa`

In [None]:
merged = features_df.merge(filtered_tracks, how='inner', on='track_id')

display_corr(merged)

Конечно, признаков слишком много. Из всех возьмем признаки с наибольшей по
модулю корреляцией.

Для этого отсортируем признаки по степени корреляции

In [None]:
correlation = merged.corr()

genres_corr = correlation['track_genres'].sort_values(key=lambda x: np.abs(x), ascending=False)
genres_corr

Изобразим распределение значений корреляции

In [None]:
sns.histplot(genres_corr)

Видно, что наибольшее число признаков имеют почти нулевую корреляцию.
В связи с этим выберем наиболее информативные из них

In [None]:
boundary = 0.2

selected = merged[genres_corr[abs(genres_corr) > boundary].reset_index()['index']]
selected.set_index('track_id', inplace=True)

Кроме того, удалим сильно коррелирующие друг с другом нецелевые признаки,
оставив среди таких пар те, что больше коррелируют с целевым

In [None]:
c = selected.corr()

to_be_excluded = set()

boundary = 0.9

for i in c:
  for j in c:
    if abs(c[i][j]) > boundary and i != j and i != 'track_genres' and j != 'track_genres':
      least_informative = i if c['track_genres'][i] < c['track_genres'][j] else j
      to_be_excluded.add(least_informative)
      
to_be_excluded

In [None]:
for feature in to_be_excluded:
  selected.drop(feature, axis=1, inplace=True)

Перекодируем метки классов

In [None]:
from sklearn.preprocessing import LabelEncoder


genre_le = LabelEncoder()

selected.track_genres = genre_le.fit_transform(selected.track_genres)
selected

In [None]:
selected.columns = selected.columns.map(str)

In [None]:
from sklearn.preprocessing import StandardScaler

for column in selected.columns:
  if column == 'track_genres':
    continue
  selected[column] = StandardScaler().fit_transform(selected[column].to_numpy().reshape(-1, 1))

Убедимся, что `StandardScaler` отработал корректно

In [None]:
selected.describe()

Разделим данные по принципу `train/test/split`

In [None]:
from sklearn.model_selection import train_test_split

x = selected.drop('track_genres', axis=1).to_numpy()
y = selected['track_genres'].to_numpy()

test_size = 0.2
valid_size = 0.1

X_train, X_test, y_train, y_test = \
    train_test_split(x, y, test_size=0.2, random_state=69)
    
X_train, X_valid, y_train, y_valid = \
    train_test_split(X_train, y_train, test_size=valid_size / (1 - test_size),
                     random_state=69)
    
n_classes = np.max(y) + 1

In [None]:
def label2vec(n_classes):
  def inner(label):  
    v = np.zeros(n_classes)
    v[label - 1] = 1
    return v
  return inner

## Сравнение моделей

In [None]:
models = []

### K-Nearest Neighbours

In [None]:
n_classes = np.max(y) + 1
list_of_neighbours = list(map(int, range(1, 300, 5)))

Опишем функцию, которая будет отображать результаты экспериментов

In [None]:
from tqdm import tqdm

def plot_score(n, scores, names):
    d = {names: n, 'score': scores}
    df = pd.DataFrame(d)

    sns.set(style='darkgrid')
    sns.lineplot(x=names, y='score', data=df)

Вычислим значения `accuracy` для моделей с разным числом соседей

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from tqdm import tqdm

best_n = -1
best_score = -1
scores = []
for n in tqdm(list_of_neighbours):
    knn = KNeighborsClassifier(p=1, n_neighbors=n)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    score = knn.score(X_test, y_test)
    
    if score > best_score:
        best_score = score
        best_n = n
    scores.append(score)

plot_score(list_of_neighbours, scores, 'neighbors')
print(f'Лучшая модель: {best_n} соседей, точность: {best_score}')

Конечно, такой метод оценки качества модели не является надежным, лучше воспользоваться
оценкой методом кросс-валидации — `cross_val_score`.

В дальнейшем будем использовать именно этот метод.

In [None]:
from sklearn.model_selection import cross_val_score

best_n = -1
best_score = -1
scores = []
for n in tqdm(list_of_neighbours):
    knn = KNeighborsClassifier(p=2, n_neighbors=n)
    
    score = cross_val_score(knn, x, y, cv=5).mean()
    
    if score > best_score:
        best_score = score
        best_n = n
    scores.append(score)

plot_score(list_of_neighbours, scores, 'neighbors')
print(f'Лучшая модель: {best_n} соседей, точность: {best_score}')

models.append((KNeighborsClassifier(p=2, n_neighbors=best_n), 'sklearn'))

In [None]:
from sklearn.metrics import log_loss


scores = []
for n in tqdm(list_of_neighbours):
    knn = KNeighborsClassifier(p=2, n_neighbors=n)
    knn.fit(X_train, y_train)
    probs = knn.predict_proba(X_test)
    
    loss = log_loss(y_test, probs)
    scores.append(loss)

plot_score(list_of_neighbours, scores, 'neighbors')

### SVC

#### C-svc

#### форма функции решения: one vs one

In [None]:
from sklearn import svm


scores = []
ps = list(map(int, range(1, 10, 1)))
for p in tqdm(ps):
    svc = svm.SVC(degree=p, decision_function_shape='ovo')
    svc.fit(X_train, y_train)
    scores.append(svc.score(X_test, y_test))

plot_score(ps, scores, 'degree')

In [None]:
scores = []
Cs = list(map(int, range(1, 100, 1)))

for c in tqdm(Cs):
    svc = svm.SVC(C=c, decision_function_shape='ovo')
    svc.fit(X_train, y_train)
    scores.append(svc.score(X_test, y_test))

plot_score(Cs, scores, 'C-argument')

In [None]:
scores =[]
for c in tqdm(Cs):
    svc = svm.SVC(C=c, decision_function_shape='ovo')
    cvs = cross_val_score(svc, x, y, cv=5)
    scores.append(cvs.mean())

plot_score(Cs, scores, 'C-argument')

In [None]:
from sklearn.multiclass import OneVsOneClassifier


scores = []
Cs = list(map(int, range(1, 100, 1)))

for c in tqdm(Cs):
    clf = OneVsOneClassifier(svm.SVC(C=c))
    clf.fit(X_train, y_train)
    scores.append(clf.score(X_test, y_test))

plot_score(Cs, scores, 'C-argument')

In [None]:
scores = []

for c in tqdm(Cs):
    clf = OneVsOneClassifier(svm.SVC(C=c))
    cvs = cross_val_score(clf, x, y, cv=5)
    scores.append(cvs.mean())
plot_score(Cs, scores, 'C-argument')

##### форма функции решения: one vs rest

In [None]:
scores = []

for c in tqdm(Cs):
    svc = svm.SVC(C=c, break_ties=True)
    svc.fit(X_train, y_train)
    scores.append(svc.score(X_test, y_test))

plot_score(Cs, scores, 'C-argument')

In [None]:
from sklearn.multiclass import OneVsRestClassifier


scores = []
for c in tqdm(Cs):
    clf = OneVsRestClassifier(svm.SVC(C=c))
    clf.fit(X_train, y_train)
    scores.append(clf.score(X_test, y_test))

plot_score(Cs, scores, 'C-argument')

In [None]:
scores = []
Cs = list(map(int, range(1, 50, 1)))
for c in tqdm(Cs):
    clf = OneVsRestClassifier(svm.SVC(C=c))
    cvs = cross_val_score(clf, x, y, cv=5)
    scores.append(cvs.mean())

plot_score(Cs, scores, 'C-argument')

#### $\nu$-svc

In [None]:
scores = []
Nus = np.arange(0.01, 0.5, 0.01)
for nu in tqdm(Nus):
    nu_svc = svm.NuSVC(nu=nu, decision_function_shape='ovo')
    nu_svc.fit(X_train, y_train)
    scores.append(nu_svc.score(X_test, y_test))

plot_score(Nus, scores, 'Nu-argument')

### Нейронные сети

#### Базовая модель

Возьмем в качестве модели многослойный перцептрон, для борьбы с переобучением
воспользуемся слоями `Dropout`. Кроме того, добавим между слоями
нормализацию по подвыборке для того, чтобы сгладить процесс обучения.

В качестве функции активации выберем `leaky_relu` как функцию, которая в отличие
от `relu` не приводит к т.н. Dying RELU problem.

In [None]:
from keras.layers import Input, Dropout, Dense, BatchNormalization
from keras.backend import clear_session

clear_session()

lr = 0.001

model = keras.Sequential()
model.add(Input(X_train.shape[1]))
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.3))
model.add(Dense(128, activation='leaky_relu'))
model.add(Dropout(.2))
model.add(BatchNormalization())
model.add(Dense(32, activation='leaky_relu'))
model.add(Dense(8, activation='softmax'))


model.compile(optimizer=keras.optimizers.Adam(learning_rate=lr),
              loss='categorical_crossentropy',
              metrics=['categorical_accuracy'])

model.summary()

In [None]:
nn_y_train = np.array(list(map(label2vec(n_classes), y_train)))
nn_y_test = np.array(list(map(label2vec(n_classes), y_test)))
nn_y_valid = np.array(list(map(label2vec(n_classes), y_valid)))

In [None]:
from keras.callbacks import TensorBoard, ModelCheckpoint, CSVLogger

run = input()

callbacks = [
  TensorBoard(),
  ModelCheckpoint(f'{run}/checkpoint/', save_best_only=True, save_weights_only=True, monitor='categorical_accuracy', verbose=1),
  CSVLogger("logs.csv")
]

In [None]:
model.fit(X_train, nn_y_train, validation_data=(X_valid, nn_y_valid), epochs=200, callbacks=callbacks, batch_size=128)

In [None]:
model.load_weights(f'{run}/checkpoint/')
model.evaluate(X_test, nn_y_test)

In [None]:
models.append((model, 'probs'))

### Сравнение моделей, обученных на мета-данных

In [None]:
for entry in models:
  model_type = entry[1]
  model = entry[0]
  if model_type == 'sklearn':
    model.fit(X_train, y_train)
    results = model.score(X_test, y_test)
  else:
    results = model.evaluate(X_test, nn_y_test)
  print(f'Результат для {model.__class__.__name__}: {results}')

### Сверточная нейронная сеть

In [None]:
def get_audio_by_id(id, sr=22050):
  id = f'{id:06}'
  subfolder = id[:3]
  filename = f'{DATA_DIR}/{subfolder}/{id}.mp3'
  return librosa.load(filename, sr=sr)[0]

In [None]:
from dataclasses import dataclass
from keras.utils import Sequence

@dataclass
class AudioLoader(Sequence):
  data: pd.DataFrame
  sr: int = 22050
  batch_size: int = 64
  x = None
  y = None
  
  def __post_init__(self):
    self.x = []
    self.y = []
    for index, row in self.data.iterrows():
      try:
        audio = get_audio_by_id(index)
      except Exception as e:
        print(f"Skipping at {index}")
        
      audio = np.pad(audio, (0, max(self.sr * 32 - audio.shape[0], 0)))
      
      spec = librosa.feature.melspectrogram(y=audio)
      self.x.append(spec.reshape(spec.shape[0], spec.shape[1], 1))
      self.y.append(label2vec(8)(int(row['track_genres'])))
    
    self.x = np.array(self.x)
    self.y = np.array(self.y)
    
    
  def __len__(self):
    return self.data.shape[0] // self.batch_size
  
  def __getitem__(self, index):
    
    x_cur =  x[self.batch_size * index:(self.batch_size + 1) * index]
    y_cur =  y[self.batch_size * index:(self.batch_size + 1) * index]
    
    return x_cur, y_cur

In [None]:
train, val = train_test_split(selected, test_size=0.2, random_state=69)

In [None]:
train_loader = AudioLoader(train)
val_loader = AudioLoader(val)

In [None]:
first_batch = train_loader[0][0]

In [None]:
first_batch.shape

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense
import keras.backend as K

K.clear_session()

model = Sequential()
model.add(Conv2D(32, (32, 32), strides=(32, 32), activation='relu', input_shape=first_batch.shape[1:]))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(32, (8, 8), strides=(8, 8), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(8, activation='softmax'))

model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

model.summary()

In [None]:
callbacks = [
  ModelCheckpoint(monitor='val_accuracy', save_best_only=True, save_weights_only=True),
  CSVLogger(),
  
]

In [None]:
model.fit(train_loader, validation_data=val_loader, batch_size=128, epochs=5)