# Classification - Baseline (HMM)

Author: Tiago F. Tavares - 2016

This document contains a demonstration of an audio classification process. It is based on work by George Tzanetakis [1], with adaptations to comprise good practices in MIR and machine learning research [2]. The discussion begins on how to characterize datasets, proceeds to feature calculation, then to classification processes and, last, to the statistical analysis that allows differentiating elements.

## Dataset characterization

Audio datasets for classification will be specified using the MIREX format. This format involves .wav files (16 bits/sample, unsigned int format) and an ascii index file that relates each audio file to its label, in the format:

    file1 [tab] label1
    file2 [tab] label2

Articles should report the number of samples contained in each class.

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
import numpy as np
import re

def dataset_characterization(dataset_file, dataset_dir=""):
    dataset = {} # Dictionary index = filename, content = class
    with open(dataset_dir + dataset_file) as f:
        for line in f:
            p = re.split(" |,|\t", line.rstrip('\n').rstrip('\r'))
            dataset[p[0]] = p[1]
    return dataset

def dataset_class_histogram(dataset):
    histogram = {}
    for data in dataset:
        if dataset[data] not in histogram:
            histogram[dataset[data]] = 1
        else:
            histogram[dataset[data]] += 1
    return histogram

dataset_dir = "./fernando_falas/"
dataset_file = "labels.txt"

#dataset_dir = "./dataset_noise_t/noise_50/"
#dataset_file = "label_noise_50.txt"

#dataset_dir = "./datasets/gtzan/"
#dataset_file = "labels.txt"
dataset = dataset_characterization(dataset_file, dataset_dir)
print dataset_class_histogram(dataset)

## Feature extraction

In the feature extraction step audio samples are mapped into a vector space in which dimensions correspond to different features that describe the audio. The main hypothesis behind this system is that perceptually-related sounds are related through spectral low-level features, such that the distance among the vectors that represent the same class is smaller than the distance of vectors representing instances from different classes. Thus, the feature extraction process for each instance is as follows.

<!--Thus, audio samples are mapped into a vector space in which the distance between two vectors is small when the corresponding samples sound similar. The acoustic similarity is assumed to be roughly related to low-level features that can be calculated from the sample's spectrogram. Therefore, the feature extraction process for each sample is as follows.-->

Initialy, the sample is normalized to zero mean an unit variance, avoiding effects of gain in the processing chain. After that, it is divided into frames of 46.3ms, with an overlap of 50% between subsequent frames. Each frame is multiplied by a Hanning window and has its magnitude spectrum estimated. The magnitude spectra are then used as basis for the calculation of selected features (spectral centroid, spectral roll-off, spectral flatness, spectral flux and 30 mel-frequency cepstral coefficients). The first and second derivatives of each feature are calculated, because they contain information on the audio content variation over time. A sliding window with duration of 40 frames (approximately 1s) is used to estimate the mean and variance of each feature over time. Last, the mean and variance of each mean and variance is calculated. This process estimates a $n$-dimensional vector representation for the audio sample that can be yielded to classification algorithms. The algorithm also calculates the proportion of energy in 

In [None]:
import mir3.modules.features as feat
import mir3.modules.tool.wav2spectrogram as spec
import mir3.modules.features.centroid as cent
import mir3.modules.features.rolloff as roll
import mir3.modules.features.flatness as flat
import mir3.modules.features.flux as flux
import mir3.modules.features.mfcc as mfcc
import mir3.modules.features.diff as diff
import mir3.modules.features.stats as stats
reload(stats)
import mir3.modules.features.join as join
import mir3.modules.tool.to_texture_window as tex



def features_gtzan(filename, directory=""):
    # Calculate spectrogram (normalizes wavfile)
    converter = spec.Wav2Spectrogram()
    s = converter.convert(open(directory + filename), window_length=512, dft_length=512,
                window_step=256, spectrum_type='magnitude', save_metadata=True, wav_rate=16000)
    
    # Extract low-level features, derivatives, and run texture windows    
    
    d = diff.Diff()
    #features = (cent.Centroid(), roll.Rolloff(), flat.Flatness(), flux.Flux(), mfcc.Mfcc())
    features = (mfcc.Mfcc(),)
    all_feats = None
    for f in features:
        track = f.calc_track(s) # Feature track
        all_feats = join.Join().join([all_feats, track])
        dtrack = d.calc_track(track) # Differentiate
        all_feats = join.Join().join([all_feats, dtrack])
        ddtrack = d.calc_track(dtrack) # Differentiate again
        all_feats = join.Join().join([all_feats, ddtrack])    

        # Texture window
    #t = tex.ToTextureWindow().to_texture(all_feats, 40)
        
    # Statistics
    #s = stats.Stats()
    #d = s.stats([t], mean=True, variance=True)    
    return all_feats

In [None]:
from ipywidgets import FloatProgress
from IPython.display import display


def low_level_features(dataset_file, dataset_dir=""): # Estimate low-level features. 
                                                      # Returns sklearn-compatible structures.
    d = dataset_characterization(dataset_file, dataset_dir)
    labels = []
    features = []
    progress = FloatProgress(min=0, max=len(d.keys()))
    display(progress)
    progress.value = 0
    for f in d:
        feat = features_gtzan(filename=f, directory=dataset_dir)
        if not np.any(np.isnan(feat.data)):
            features.append(feat.data)
            labels.append(d[f])
        progress.value += 1
    return features, labels
    

In [None]:
#dataset_dir = "./datasets/"
#dataset_file = "labels.txt"

dataset = dataset_characterization(dataset_file, dataset_dir)
features, labels = low_level_features(dataset_file, dataset_dir)

# Classification

The classification process begins by normalizing the feature set so that all features present zero mean and unit variance across all samples. Then, it proceeds to a 10-fold cross validation schema. Each training fold is further split in a 80%/20% ratio into two sets: $t_a$ and $t_b$. These sets are used to optimize the classifier's hyper-parameters in a grid-search that aims to optimize the f1-score in $t_b$. After this, the classifier is tested into the test set for the fold, which allows calculating recall, precision, and f1-score. The function returns the evaluation measures for all folds, averaged over all classes.

In [None]:
def label_to_numbers(labels, d):
    return [d.keys().index(i) for i in labels]

def numbers_to_labels(numbers, d):
    return [d.keys[i] for i in numbers]


In [None]:
def f1_from_confusion(c):
    ret = []
    for i in xrange(c.shape[0]):
        n_i = np.sum(c[i,:])
        est_i = np.sum(c[:,i])
        if n_i > 0:
            R = c[i,i] / float(n_i)
        else:
            R = 0.0
        if est_i > 0:
            P = c[i,i] / float(est_i)
        else:
            P = 0.0
            
        if (R+P) > 0:
            F = 2*R*P/(R+P)
        else:
            F = 0.
        ret.append([R, P, F])
    return ret

In [None]:
import copy 

class MultipleHMM():
    base_model = None
    models = {}
    
    def __init__(self, base_model=None):
        self.base_model = base_model
    
    def fit(self, X, y, train_lengths):
        """Fits all internal models"""
        labels = set(y)
        for l in labels:
            print "Training label:", l
            l_index = [i for i in xrange(len(y)) if y[i] == l]
            if X[0].ndim == 1:
                this_x = [X[i].reshape(-1, 1) for i in l_index]
            else:
                this_x = [X[i] for i in l_index]
                
            this_lengths = [train_lengths[i] for i in l_index]
            
            my_x = this_x[0]
            for i in xrange(1, len(this_x)):
                my_x = np.vstack ((my_x, np.array(this_x[i])))
            
            new_model = copy.deepcopy(self.base_model)
            new_model.fit(my_x, this_lengths)
            self.models[l] = new_model
    
    def predict(self, X):
        """Predicts a label for input X"""
        return_label = None
        best_prob = None
        for label in self.models.keys():
            
            #print "Computing probabilities on shape", X.shape
            if X.ndim == 1:
                this_prob = self.models[label].score(X.reshape(-1,1))
            else:
                this_prob = self.models[label].score(X)
            
            #print "Prob in label", label, "=", this_prob
            if (best_prob is None) or (this_prob > best_prob):
                best_prob = this_prob
                return_label = label
        return return_label


# Statistical analysis

The results yielded by different classifiers are compared using a t-test (if there are two classifiers) or the ANOVA test (if there are more than two classifiers). This process assumes that each fold of the cross-validation process is an independent measure. P-values lower than 5% indicate a significant difference.

In [None]:
from scipy.stats import friedmanchisquare as friedman
from scipy.stats import wilcoxon as wilcoxon
from scipy.stats import ttest_ind as ttest
from scipy.stats import f_oneway as f_oneway
from sklearn.preprocessing import normalize
from sklearn import preprocessing
from sklearn.cross_validation import ShuffleSplit
from sklearn.grid_search import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
from sklearn.cross_validation import StratifiedKFold
import hmmlearn
from hmmlearn.hmm import GaussianHMM, MultinomialHMM

import numpy as np
import time

training_size = 0.8
n_folds = 10

def model_comparison(features, labels, models):
    #norm_features = normalize(features)
    features = np.array(features)
    skf = StratifiedKFold(labels, n_folds=n_folds)

    f1 = np.zeros((n_folds,len(models)))
    r = np.zeros((n_folds,len(models)))
    p = np.zeros((n_folds,len(models)))
    progress = FloatProgress(min=0, max=n_folds*len(models))
    display(progress)
    for m in xrange(len(models)):
        n = 0
        for train_index, test_index in skf:
            X_train, X_test = [features[i] for i in train_index], [features[i] for i in test_index]
            Y_train, Y_test = [labels[i] for i in train_index], [labels[i] for i in test_index]

            all_trains = np.array(X_train[0])
            train_lengths = [X_train[0].shape[0]]
            for i in xrange(1, len(X_train)):
                all_trains = np.vstack ((all_trains, np.array(X_train[i])))
                train_lengths.append(X_train[i].shape[0])
            all_trains = all_trains
            #print train_lengths
            
            scaler = preprocessing.StandardScaler().fit(all_trains)
            X_train = [scaler.transform(X_train[i]) for i in xrange(len(X_train))]
            X_test = [scaler.transform(X_test[i]) for i in xrange(len(X_test))]
            
            #cv = ShuffleSplit(len(X_train).shape[0], n_iter=10, test_size=0.2, random_state=0) # 80% train / 20% validation
            #classifier = GridSearchCV(estimator=copy.deepcopy(models[m]), cv=cv, param_grid=parameters_to_optimize[m], scoring='f1_weighted')
            classifier = models[m]
            
            classifier.fit(X_train, Y_train, train_lengths)
            Y_pred = [classifier.predict(X_test[i]) for i in xrange(len(X_test))]
            #print Y_pred
            confusion = confusion_matrix(Y_test, Y_pred)
            print "confusion matrix:\n", confusion
            conf = f1_from_confusion(confusion)
            conf_all = np.average(conf, axis=0)
            print conf_all

            r[n,m] = conf_all[0]
            p[n,m] = conf_all[1]
            f1[n,m] = conf_all[2]
            n += 1
            
            progress.value += 1
    return r, p, f1






In [None]:
from scipy.stats import friedmanchisquare as friedman
from scipy.stats import wilcoxon as wilcoxon
from scipy.stats import ttest_ind as ttest
from scipy.stats import f_oneway as f_oneway
import warnings
warnings.filterwarnings('ignore')

models = [MultipleHMM(GaussianHMM(n_components=5, covariance_type='diag', n_iter=50))]

dataset = dataset_characterization(dataset_file, dataset_dir)

recall, precision, f1_score = model_comparison(features, label_to_numbers(labels, dataset_class_histogram(dataset)), models)
#print recall, precision, f1_score
#print np.average(f1_score, axis=0), np.average(recall, axis=0), np.average(precision, axis=0)
print "recal:\n",recall, "\nprecision:\n", precision, "\nf1_score:\n",f1_score 
print "media_f1_score:",np.average(f1_score, axis=0), "media_recall:", np.average(recall, axis=0), "media_precision:",np.average(precision, axis=0)

sigma = np.std(f1_score)
print "sigma", sigma

if len(models) > 2:
#    print [f1_score[:,i].T for i in range(len(models))]
    print "Anova: ", f_oneway( *f1_score.T  )[1]
      
elif len(models) == 2:
    print "T-test:", ttest( f1_score[:,0].T,  f1_score[:,1].T)[1]


In [None]:
g = GaussianHMM(n_iter=10, covariance_type='diag', n_components=3)
train = np.array([[1, 2, 1, 2, 1, 2, 1, 2, 3, 4, 3, 2, 1], [2, 3, 1, 4, 1, 7, 1, 2, 3, 10, 11, 2, 1]]).T
test = train[0:4,:]
test0 = np.array([[4, 3], [7, 6], [5, 4], [5, 4]])
lengths = [4, 4, 5]
g.fit(train, lengths)
print g.score(test)
print g.score(test0)
print train.ndim


## Base Gtzan
n_components = 4
* MFCC / texture : 69,4
* MFCC + deltaMFCC + deltadeltaMFCC / texture: 74,9
* Todas as features / texture: 75,9
* Todas as features sem texture: 76,7

n_components = 8
* Todas as features / texture: 71,6

## Base Jamendo-Gtzan

* Todas as features sem texture, n_components=4, full_covariance: 39,4% 
* Todas as features sem texture, n_components=4, diag_covariance: 34,2% 
* Todas as features sem texture, n_components=9, diag_covariance: 33,5% 
* Todas as features sem texture, n_components=6, diag_covariance: 33,3% 
* Todas as features sem texture, n_components=6, full_covariance: 42,7%
* Todas as features sem texture, n_components=10, full_covariance: 36,9%

In [None]:
import mir3.modules.features as feat
import mir3.modules.tool.wav2spectrogram as spec
import mir3.modules.features.centroid as cent
import mir3.modules.features.rolloff as roll
import mir3.modules.features.flatness as flat
import mir3.modules.features.flux as flux
import mir3.modules.features.mfcc as mfcc
import mir3.modules.features.diff as diff
import mir3.modules.features.stats as stats
reload(stats)
import mir3.modules.features.join as join
import mir3.modules.tool.to_texture_window as tex