Firstly we have to import all needed libraries.

In [None]:
import os

import glob2
import librosa
import numpy as np
from librosa.display import specshow
from librosa.feature import mfcc
from librosa.feature import melspectrogram
from matplotlib import pyplot as plt
from matplotlib.colors import Normalize
from joblib import Parallel, delayed
import numpy as np
from tqdm import tqdm_notebook as tqdm
from collections import defaultdict
from shutil import copyfile
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Activation, Conv1D, MaxPooling1D, Flatten, Dropout
from keras.callbacks import TensorBoard
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, accuracy_score
import itertools

`extract_features()` will be used to load file and draw and return coresponding spectrogram.

In [None]:
def extract_features(file, mfcc_n=16):
    soundfile, samplerate = librosa.core.load(file)
    spectogram = melspectrogram(y=soundfile, n_fft=2048, n_mels=128, hop_length=1040)
    spectogram = librosa.power_to_db(spectogram, ref=np.max)
    return spectogram

all_files = glob2.glob('./**/*.mp3')
extract_features(all_files[1]).shape

In [None]:
def preprocess(file):
    new_name = file.replace('.mp3', '.npy').replace('./','records/')
    directory = os.path.basename(os.path.dirname(new_name))
    if not os.path.isdir(os.path.join('records', directory)):
        os.mkdir(os.path.join('records', directory))
    features = extract_features(file)
    np.save(new_name, features, allow_pickle=False)

We use joblib to parallelize preprocessing

In [None]:
v = Parallel(n_jobs=5)(delayed(preprocess)(x) for x in tqdm(all_files))

We have to check if data was properly saved and make some data exploration

In [None]:
classes = defaultdict(int)
min_shape = np.inf
for file in all_files:
    npy_file = file.replace('.mp3','.npy').replace('./', 'records/')
    data = np.load(npy_file) # check if correct saved
    min_shape = min(min_shape, data.shape[1])
    last_shape = data.shape
    classes[os.path.basename(os.path.dirname(file))] += 1
plt.bar(classes.keys(), classes.values())
plt.xticks(rotation=90)
print(min_shape)
plt.show()
print(len(all_files))

Now we are redy to split files to train and test examples

In [None]:
pre_files = glob2.glob('records/**/*.npy')
file_train, file_test = train_test_split(pre_files, test_size=0.33, shuffle=True)
for file in file_test:
    new_name = file.replace('records', 'test')
    directory = os.path.basename(os.path.dirname(new_name))
    if not os.path.isdir(os.path.join('test', directory)):
        os.mkdir(os.path.join('test', directory))
    copyfile(file, new_name)

for file in file_train:
    new_name = file.replace('records', 'train')
    directory = os.path.basename(os.path.dirname(new_name))
    if not os.path.isdir(os.path.join('train', directory)):
        os.mkdir(os.path.join('train', directory))
    copyfile(file, new_name)

In [None]:
len(glob2.glob('train/**/*.npy')), len(glob2.glob('test/**/*.npy'))

This function creates CNN model with 3 convolution layers and 2 dense layers.

In [None]:
def create_model():
    model = Sequential()
    model.add(Conv1D(256, kernel_size=4, strides=2, activation='relu',
                     data_format='channels_last',
                     input_shape=(636, 128)))
    model.add(MaxPooling1D(pool_size=4))
    model.add(Conv1D(256, kernel_size=4, strides=2, activation='relu'))
    model.add(MaxPooling1D(pool_size=4))
    model.add(Conv1D(512, kernel_size=4, strides=2, activation='relu'))
    model.add(Flatten())
    model.add(Dropout(0.5))
    model.add(Dense(2048, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2048, activation='relu'))
    model.add(Dense(13, activation='softmax'))
    
              
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model
model = create_model()

In [None]:
le = preprocessing.LabelBinarizer()
le.fit(list(classes.keys()))
le.classes_
le.transform(['Samba', 'Rumba', 'Jive'])

In [None]:
def extract_class(file_name):
    return os.path.basename(os.path.dirname(file_name))

extract_class('test/Samba/118806.npy'), extract_class('test/Rumba/118806.npy'), extract_class('test/Jive/118806.npy')

Here we load all test data to one numpy array

In [None]:
train_data = glob2.glob('train/**/*.npy')
X_train = np.empty((len(train_data), 636, 128))
y_train = np.empty((len(train_data), 13))
for idx, file in tqdm(enumerate(train_data)):
    data = np.transpose(np.load(pre_files[0])[:,:636])
    data = (data - np.mean(data)) / np.std(data)
    X_train[idx, :, :] = data
    y = np.zeros(13)
    example_class = extract_class(file)
    y_train[idx,:] = le.transform([example_class])

In [None]:
print(np.max(X_train), np.min(X_train))
print(np.max(X_train[1]), np.min(X_train[1]))

And start learning

In [None]:
model.fit(X_train, y_train, epochs=55, batch_size=32, callbacks=[TensorBoard(log_dir='./logs', 
                                                                             histogram_freq=0,
                                                                             batch_size=32, 
                                                                             write_graph=True,
                                                                             write_grads=False, 
                                                                             write_images=False,
                                                                             embeddings_freq=0, 
                                                                             embeddings_layer_names=None,
                                                                             embeddings_metadata=None, 
                                                                             embeddings_data=None, 
                                                                             update_freq='epoch')])

In [None]:
model.save('dance_style_recognition.h5')

We similary load teat data 

In [None]:
test_data = glob2.glob('test/**/*.npy')
X_test = np.empty((len(test_data), 636, 128))
y_true = np.empty((len(test_data), 13))
for idx, file in tqdm(enumerate(test_data)):
    data = np.transpose(np.load(file)[:,:636])
    data = (data - np.mean(data)) / np.std(data)
    X_test[idx, :, :] = data
    y = np.zeros(13)
    example_class = extract_class(file)
    y_true[idx,:] = le.transform([example_class])

In [None]:
y_pred = model.predict(X_test)

In [None]:
label_true = le.inverse_transform(y_true)
label_pred = le.inverse_transform(y_pred)

print(label_true[:10])
print(label_pred[:10])

Finally we calculate accuracy and plot confusion matrix

In [None]:
acc = accuracy_score(label_true, label_pred)
print('Acc: ', acc)
confmat = confusion_matrix(label_true, label_pred)

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    
plot_confusion_matrix(confmat, le.classes_)

We can also check styles distribution in train and test sets

In [None]:
test_data = glob2.glob('test/**/*.npy')
print(len(test_data))
test_c = defaultdict(int)
for file in test_data:
    test_c[extract_class(file)] += 1
plt.bar(test_c.keys(), test_c.values())
plt.xticks(rotation=90)

In [None]:
train_data = glob2.glob('train/**/*.npy')
print(len(train_data))
train_c = defaultdict(int)
for file in train_data:
    train_c[extract_class(file)] += 1
plt.bar(train_c.keys(), train_c.values())
plt.xticks(rotation=90)