In [None]:
import os
import gc
import csv
import multiprocessing as mp

import numpy as np
import scipy as sc

import soundfile as sf

import librosa

from matplotlib import pyplot as plt

from keras.models import Model, Sequential
from keras.layers import Dense, SimpleRNN, LSTM, Conv2D, MaxPooling2D, Dropout, Activation, Flatten, Bidirectional
from keras.callbacks import TensorBoard, ModelCheckpoint
from keras import regularizers

import datetime

%run math_utils.py
%run plot_utils.py
%run load_utils.py

In [None]:
plt.rc('font', size=16)
plt.rc('figure', figsize=(20.0, 15.0))

In [None]:
metadata = load_metadata('metadata/UrbanSound8K.csv', base_path='./UrbanSound8K')
metadata['class_label'] = int_to_one_hot(metadata['class_id'], len(metadata['class_dict']))

folds_file = "folds.txt"
n_fold = 5

# load folds file
# create if not exist
if not os.path.exists(folds_file): 
    data_len = len(metadata['fold'])
    folds = np.concatenate([np.repeat(i, data_len // n_fold) for i in range(1, n_fold + 1)])
    folds = np.append(folds, (np.random.randint(1, n_fold+1, data_len % n_fold)))
    np.random.shuffle(folds)
    assert len(folds) == len(metadata['fold'])
    with open(folds_file, "w") as f:
        for s in folds:
            f.write(str(s) +"\n")

metadata['alt_fold'] = []
with open(folds_file, "r") as f:
    for line in f:
        metadata['alt_fold'].append(int(line.strip()))
        
u, c = np.unique(metadata['alt_fold'], return_counts=True)
print("Folds:", u, c)

In [None]:
def mel_stft_callback(s):
    return  librosa.feature.melspectrogram(
        s, 44100,
        n_fft=1024,
        hop_length=1024,
        n_mels=128,
        power=1
    )

data_tag = 'stft_1024_1024_1024_44100_mel_128_pow_1'

data = load_large_data(
    metadata, 16,
    callback=mel_stft_callback, args=(), sample_rate=44100,
    cache_tag=data_tag, base_path='./UrbanSound8K',
    verbose=True
)

labels = np.array(metadata['class_label']) 

class_dict = metadata['class_dict']

In [None]:
# normalize the data
def preprocess(item):
    mean = np.mean(item)
    std = np.std(item)
    return (item - mean) / std

data = [preprocess(item) for item in data]

In [None]:
# data preview
sample = data[1] 

print(np.shape(sample))
print(np.sum(sample ** 2))
print(np.max(sample))
print(np.min(sample))

plt.imshow(sample, aspect='auto',interpolation='nearest')
plt.show()

In [None]:
# take frame from sequence
def random_clip(item):
    signal_length = len(item[1])
    difference = signal_length - 128 
    if difference > 0:
        shift = np.random.randint(0, difference)
        return item[:, shift:shift+128]
    if difference < 0: 
        difference = np.abs(difference)      
        pad_left = np.random.randint(0, difference)
        pad_right = difference - pad_left
        return np.pad(item, ((0, 0), (pad_left, pad_right)), 'constant', constant_values=0)
    return item

# data for cnn, 128x128
cnn_data = (list(map(random_clip, data)))
cnn_labels = labels

# data for lstm as is, 128xT
lstm_data = data
lstm_labels = labels

assert len(data) == len(labels)
print(len(data))

ctag = 'full'

In [None]:
# split data into train and test 
def split_data(metadata, test_fold, alt=False):
    folds = metadata['fold'] if not alt else metadata['alt_fold']
    assert len(folds) == len(data)
    train, test = [], []
    for i in np.random.permutation(len(data)):
        if folds[i] in test_fold:
            test.append(i)
        else:
            train.append(i)
    return np.array(train), np.array(test) 

In [None]:
# save confusion matrix in csv
def save_confusion_matrix(metadata, predict, data, labels, test_indices, train_indices, path):

    assert len(data) == len(predict)
    assert len(data) == len(labels)
    assert len(train_indices) + len(test_indices) == len(data)

    predict = (predict == np.max(predict, axis=1)[:, np.newaxis]).astype(dtype=int)
    
    class_dict = metadata['class_dict']
    
    labels_array = np.array(labels)
    test_cm = confusion_matrix(labels_array[test_indices], predict[test_indices], len(class_dict))
    train_cm = confusion_matrix(labels_array[train_indices], predict[train_indices], len(class_dict))
    all_cm = confusion_matrix(labels_array, predict, len(class_dict))
    
    with open(path + '_test_cm.csv', 'w') as file:
        writer = csv.writer(file)
        writer.writerows(test_cm.tolist())
    
    with open(path + '_train_cm.csv', 'w') as file:
        writer = csv.writer(file)
        writer.writerows(train_cm.tolist())
    
    with open(path + '_all_cm.csv', 'w') as file:
        writer = csv.writer(file)
        writer.writerows(all_cm.tolist())

In [None]:
def train_model(
    model,
    metadata,
    data,
    labels,
    test_fold,
    tag,
    save_path,
    generator_callback=(lambda a, b: (a, b))
):
    
    train_indices, test_indices = split_data(metadata, test_fold, alt=True)
    
    assert len(set(train_indices) & set(test_indices)) == 0
    assert (set(train_indices) | set(test_indices)) == set(range(len(data)))
    assert len(data) == len(labels)
    assert len(train_indices) + len(test_indices) == len(data)

    def train_generator():
        while True:
            for i in train_indices:
                yield generator_callback(data[i], labels[i])
        
    def test_generator():
        while True:
            for i in test_indices:
                yield generator_callback(data[i], labels[i])
                
    def all_data_generator():
        while True:
            for i in range(len(data)):
                yield generator_callback(data[i], labels[i])
    
    btag = tag + ('_tf%s_' % "_".join(str(i) for i in test_fold)) + \
                                            datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")

    checkpoint_format = 'model.best-acc.hdf5'
    checkpoint_file = save_path + btag + '_' + checkpoint_format

    checkpoint = ModelCheckpoint(checkpoint_file, verbose=0, monitor='val_acc', save_best_only=True, mode='max')
    board = TensorBoard(save_path + btag, write_graph=True, write_grads=True, write_images=True)
      
    history = model.fit_generator(
        train_generator(), steps_per_epoch=len(train_indices), epochs=64,
        validation_data=test_generator(), validation_steps=len(test_indices),
        callbacks=[board, checkpoint], verbose=1
    )
    
    model.load_weights(checkpoint_file)
    
    predict = model.predict_generator(all_data_generator(), len(data))
    save_confusion_matrix(metadata, predict, data, labels, test_indices, train_indices, save_path + btag)
    
    return history

In [None]:
def cnn_model():

    model = Sequential()

    model.add(Conv2D(24, (5, 5), input_shape=(128, 128, 1)))
    model.add(MaxPooling2D((4, 2), (4, 2)))
    model.add(Activation('relu'))
          
    model.add(Conv2D(48, (5, 5)))
    model.add(MaxPooling2D((4, 2), (4, 2)))
    model.add(Activation('relu'))
          
    model.add(Conv2D(48, (5, 5)))
    model.add(Activation('relu'))

    model.add(Flatten())

    model.add(Dropout(0.5))
    model.add(Dense(64, kernel_regularizer=regularizers.l2(0.001)))
    model.add(Activation('relu'))

    model.add(Dropout(0.5))
    model.add(Dense(10, kernel_regularizer=regularizers.l2(0.001)))
    model.add(Activation('softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    return model
    
tag = 'cnn_adam_' + data_tag + '_' + ctag
log_path = './logs_cv_cnn_mel_p1_norm/'

shape_callback = lambda d, l: (d.T[np.newaxis, :, :, np.newaxis], l[np.newaxis, :])

best_test_acc = []

for i in range(1, 6, 1): 
    
    model = cnn_model()
    
    h = train_model(
        model, metadata,
        cnn_data, cnn_labels,
        test_fold=[i], tag=tag,
        save_path=log_path,
        generator_callback=shape_callback
    ).history
    
    best_acc = np.max(h['val_acc'])
    best_test_acc.append(best_acc)
    print(best_acc)

with open(log_path +tag + '_result.csv', 'w') as file:
    writer = csv.writer(file)
    writer.writerows([best_test_acc])
    

print(best_test_acc)

In [None]:
def lstm_model():

    model = Sequential()
    
    model.add(LSTM(128, return_sequences=True, input_shape=(None, 128)))
    model.add(Dropout(0.25))
    
    model.add(LSTM(64, return_sequences=False))
    model.add(Dropout(0.25)) 
    
    model.add(Dense(10, activation='softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    return model
    
tag = 'lstm_adam_' + data_tag + '_' + ctag
log_path = './logs_cv_lstm_mel_p1_norm/'

shape_callback = lambda d, l: (d.T[np.newaxis, :, :], l[np.newaxis, :])

best_test_acc = []

for i in range(2, 6, 1): 
    
    model = lstm_model()
    
    h = train_model(
        model, metadata,
        lstm_data, lstm_labels,
        test_fold=[i], tag=tag,
        save_path=log_path,
        generator_callback=shape_callback
    ).history
    
    best_acc = np.max(h['val_acc'])
    best_test_acc.append(best_acc)
    print(best_acc)

print(best_test_acc)

with open(log_path + tag + '_result.csv', 'w') as file:
    writer = csv.writer(file)
    writer.writerows([best_test_acc])
    