In [1]:
from random import seed, random, randrange
from csv import reader
from math import exp
import os 
import numpy as np
import pickle
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler

In [2]:
data_path = 'C:/Users/tianyi/Northeastern University/Machine Learning Final Project - Music Classification - Documents'
myspace_mp3s_path = '%s/myspace_mp3s' % data_path
metadata_path = '%s/metadata.json.gz' % myspace_mp3s_path
genre_map_path = '%s/genre_map.pkl' % myspace_mp3s_path
mfcc_path = '%s/audio_features/mfcc' % data_path

In [3]:
def initialize_network(n_inputs, n_hidden, n_outputs): 
    '''
    Initialize a neural network
    organize layers as arrays of dictionaries
    '''
    layer_input = []
    for i in range(n_inputs): 
        layer_input.append({})
    
    # n hidden neurons for the hidden layer, and each neuron has n inputs + 1 weights, 
    # one for each input column in the dataset and another for the bias. 
    layer_hidden = []
    for i in range(n_hidden): 
        w = []
        b = random()
        for j in range(n_inputs): 
            w.append(random())
        layer_hidden.append({'w': w, 'b': b, 'o': 0, 'e': 0})
        
    #  n outputs neurons in the output layer that links to the hidden layer, 
    # each with n hidden + 1 weights. 
    layer_output = []
    for i in range(n_outputs): 
        w = []
        b = random()
        for j in range(n_hidden): 
            w.append(random())
        layer_output.append({'w': w, 'b': b, 'o': 0, 'e': 0})
    
    # treat the whole network as an array of layers
    network = [layer_input, layer_hidden, layer_output]
    
    return network


def activate(weights, inputs): 
    '''
    Calculate neuron activation for an input
    weights shape: n_neurons_hidden_layer x  n_neurons_inputs_layer
    inputs shape: n_neurons_inputs_layer
    result shape:  n_neurons_hidden_layer * 1
    '''

    res = []
    for hid in range(len(weights)): 
        sum = 0.0
        for inp in range(len(inputs)): 
            sum += inputs[inp] * weights[hid]['w'][inp]
        
        sum += weights[hid]['b']
        
        z = transfer(sum)
        
        res.append(z)
        
    return res


def transfer(activation): 
    '''
    Transfer neuron activation
    '''
    return (1.0 / (1.0 + exp(-activation)));


def forward_propagate(network, row): 
    '''
    Forward propagate input to a network output
    '''
    
    # input layer -> hidden layer
    hidden_output = activate(network[1], row)
    for i in range(len(hidden_output)): 
        network[1][i]['o'] = hidden_output[i]

    
    # hidden layer -> output layer
    output = activate(network[2], hidden_output)
    for i in range(len(output)): 
        network[2][i]['o'] = output[i]


def transfer_derivative(output):
    '''
    Calculate the derivative of an neuron output
    '''
    return output * (1.0 - output)


def backward_propagate_error(network, expected):
    '''
    Backpropagate error and store in neurons
    '''
    error_output = []
    for out in range(len(network[2])): 
        error = expected[out] - network[2][out]['o']
        error_output.append(error)
        network[2][out]['e'] = error
        
    for hid in range(len(network[1])): 
        error = 0.0
        for out in range(len(network[2])): 
            error += error_output[out] * network[2][out]['w'][hid]
            
        error *= transfer_derivative(network[1][hid]['o'])
        
        network[1][hid]['e'] = error
    

def back_propagation(train, test, l_rate, n_epoch, n_hidden):
    '''
    Backpropagation Algorithm With Stochastic Gradient Descent
    '''
    (X_train, y_train) = train
    (X_test, y_test) = test
    n_inputs = len(X_train[0])
    n_outputs = len(y_train[0])
    network = initialize_network(n_inputs, n_hidden, n_outputs)
    
    train_network(network, train, l_rate, n_epoch, n_outputs, test)
    print('end of training')

    # evaluate 
    pred_y = []
    for X in X_test: 
        y = predict(network, X)
        pred_y.append(y)
        
    
    acc = accuracy_metric(y_test, pred_y)
    print('final testing acc:', acc)

    return acc


def update_weights(network, row, l_rate):
    '''
    Update network weights with error
    '''
    # Update the weights for the output layer
    for out in range(len(network[2])): 
        for hid in range(len(network[1])): 
            w = network[2][out]['w'][hid]
            w += (l_rate * network[2][out]['e'] * network[1][hid]['o'])
            network[2][out]['w'][hid] = w
            
        b = network[2][out]['b']
        b += (l_rate * network[2][out]['e'])
        network[2][out]['b'] = b
        
    # Update the weights for the hidden layer
    for hid in range(len(network[1])): 
        for inp in range(len(network[0])): 
            w = network[1][hid]['w'][inp]
            w += (l_rate * network[1][hid]['e'] * row[inp])
            network[1][hid]['w'][inp] = w
        
        b = network[1][hid]['b']
        b += (l_rate * network[1][hid]['e'])
        network[1][hid]['b'] = b

def train_network(network, train, l_rate, n_epoch, n_outputs, test): 
    '''
    Train a network for a fixed number of epochs
    '''
    (X_train, y_train) = train
    (X_test, y_test) = test
    
    # updating the network for each row in the training dataset inside each epoch.
    for epoch in range(n_epoch): 
        # evaluate for every 10 epoch: 
        if epoch % 10 == 0: 
            print('epoch:', epoch)

            pred_y = []
            for X in X_train: 
                y = predict(network, X)
                pred_y.append(y)

            acc = accuracy_metric(y_train, pred_y)
            print('train acc:', acc)

            pred_y = []
            for X in X_test: 
                y = predict(network, X)
                pred_y.append(y)

            acc = accuracy_metric(y_test, pred_y)
            print('test acc:', acc)

        for i in range(len(X_train)): 
            X = X_train[i]
            y = y_train[i]
            forward_propagate(network, X)
            backward_propagate_error(network, y)
            update_weights(network, X, l_rate)


def predict(network, row):
    '''
    Make a prediction with a network
    '''
    forward_propagate(network, row)
    y = []
    for out in range(len(network[2])): 
        output = network[2][out]['o']
        y.append(output)
        
    return y


def arg_max(output): 
    '''
    find max element's index of array output
    '''
    max_index = 0
    max = output[0]
    for i in range(len(output)): 
        if output[i] > max: 
            max_o = output[i]
            max_index = i

    return max_index
        

def accuracy_metric(actual, predicted):
    '''
    Calculate accuracy percentage
    '''
    total = len(actual)
    correct = 0
    for i in range(total): 
        
        if arg_max(actual[i]) == arg_max(predicted[i]): 
            correct += 1
         
    acc = correct / total
    return acc

In [4]:
def shuffle_data(x, y):
    '''
    Shuffles x and y data. 
    '''
    idx = np.arange(x.shape[0])
    np.random.shuffle(idx)
    x = x[idx]
    y = y[idx]
    
    return x, y


def normalize_data(x):
    '''
    Normalizes x data. 
    '''
    return (x - x.mean()) / x.std()


def get_mfccs(mfcc_path, genres, lab_idx, max_recs):
    '''
    Open and process mfcc and return as x and y arrays.
    '''
    
    # save all x and y values
    mfccs = []
    y=[]
    i = 0
    
    # loop over each genre we are working with
    for genre in genres:
        print("genre: ", genre)
        
        # extract path to all song mfccs in current genre
        genre_path = '%s/%s' % (mfcc_path, genre)
        ct = 0
        
        # loop over each mfcc in current genre
        for fn in os.listdir(genre_path): 
            
            i += 1
            # get path to mfcc
            fp = '%s/%s' % (genre_path, fn)
            
            # load mfcc and transpose/take mean
            mfcc = np.load(fp) 
            mfcc = np.mean(mfcc.T,axis=0)
            if np.isnan(np.sum(mfcc)):
                print('Error with mfcc file')
                continue
            else:
                mfccs.append(mfcc)
            
            # append target label to list
            y.append(lab_idx[genre])
           
    # normalize data and create arrays
    x = np.array(mfccs)
    x = normalize_data(x) # error with normalizing, will scale later
    y = np.array(y)
    
    return shuffle_data(x, y)


SEED = 20211130

# genre counts (# samples per genre)
genre_cts = {}
for genre in os.listdir(mfcc_path):
    genre_path = '%s/%s' % (mfcc_path, genre)
    genre_cts[genre] = len(os.listdir(genre_path))
    
# print counts for each genre
for g in sorted(genre_cts, key=genre_cts.get, reverse=True):
    print(g, genre_cts[g])


rock 13158
metal 8782
alternative 8778
rap 5906
dance 5624
pop 4684
jazz 4552
hip_hop 4526
experimental 3686
other 3544
world 2225
electronic 2127
folk 1804
punk 1729
blues 1472
ambient 1299
reggae 1114
goth 722
acoustic 678
country 533
house 512
classical 486
spiritual 369
oldies 248
progressive 221
funk 142
easy_listening 131
spoken_word 130
bluegrass 47
industrial 44
showtunes 38
disco 23


In [5]:
genres = list(genre_cts.keys())
all_genres = ['jazz', 'reggae','rock','rap']

#all_genres =  list(genre_cts.keys())
print(len(genres))
# all_genres = ['jazz', 'reggae']
print(all_genres)


lab_idx = {g:i for i,g in enumerate(all_genres)}
print('lab_idx', lab_idx)


min_recs = min([genre_cts[g] for g in all_genres]) 
print('min_recs', min_recs)


x, y = get_mfccs(mfcc_path, all_genres, lab_idx,min_recs)
print('shape:', x.shape, y.shape)

32
['jazz', 'reggae', 'rock', 'rap']
lab_idx {'jazz': 0, 'reggae': 1, 'rock': 2, 'rap': 3}
min_recs 1114
genre:  jazz
genre:  reggae
genre:  rock
genre:  rap
shape: (24730, 20) (24730,)


In [6]:
rus = RandomUnderSampler()
x_resampled, y_resampled = rus.fit_resample(x, y)
for i in [0, 1, 2, 3]: 
    print(i, np.count_nonzero(y == i))

0 4552
1 1114
2 13158
3 5906


In [None]:
seed(SEED)

l_rate = 0.01
n_epoch = 500
n_hidden = 128

x = x.tolist()

# convert y to one-hot

y = y.tolist()
for i in range(len(y)): 
    if y[i] == 0: 
        y[i] = [1, 0, 0, 0]
        continue
        
    if y[i] == 1: 
        y[i] = [0, 1, 0, 0]
        continue
        
    if y[i] == 2: 
        y[i] = [0, 0, 1, 0]
        continue

    if y[i] == 3: 
        y[i] = [0, 0, 0, 1]
        continue

X_train, X_test, y_train, y_test = train_test_split(x,y, test_size=0.2, random_state=SEED)
train = (X_train, y_train)
test = (X_test, y_test)
# train and evaluate
back_propagation(train, test, l_rate, n_epoch, n_hidden)

epoch: 0
train acc: 0.14046704407602104
test acc: 0.13404771532551557
epoch: 10
train acc: 0.5997775980590376
test acc: 0.6091791346542661
epoch: 20
train acc: 0.6049332794177112
test acc: 0.6103922361504246
epoch: 30
train acc: 0.6063991103922362
test acc: 0.6130206227254347
epoch: 40
train acc: 0.6091285887585928
test acc: 0.6130206227254347
epoch: 50
train acc: 0.6126668014557218
test acc: 0.613829357056207
epoch: 60
train acc: 0.6146380913869793
test acc: 0.6152446421350586
epoch: 70
train acc: 0.6152446421350586
test acc: 0.6140315406389001
epoch: 80
train acc: 0.6153457339264051
test acc: 0.6140315406389001
epoch: 90
train acc: 0.6159522846744845
test acc: 0.6150424585523655
epoch: 100
train acc: 0.6163061059441973
test acc: 0.6156490093004447
epoch: 110
train acc: 0.6158006469874646
test acc: 0.6168621107966034
epoch: 120
train acc: 0.6164071977355439
test acc: 0.6166599272139103
epoch: 130
train acc: 0.6164577436312172
test acc: 0.6162555600485241
epoch: 140
train acc: 0.617468