In [183]:
import os
import itertools
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn import metrics
from sklearn.metrics import make_scorer

import scipy
import librosa
import sed_eval

import matplotlib.pyplot as plt
import matplotlib.cm

import csv
import dcase_util

In [184]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [185]:
def DatasetLoader(TrainPath,ValidatePath,TestPath):
    #train    
    train_data = np.load(TrainPath,allow_pickle=True)
    #validation    
    validate_data = np.load(ValidatePath,allow_pickle=True)
    #test    
    test_data = np.load(TestPath,allow_pickle=True)

    #Processing all the loaded dataset
    Train = {}
    print(train_data.shape)
    Train_Data = np.c_[train_data[:,0].tolist()]
    Train_Label = np.c_[train_data[:,1].tolist()]
    Train_Data = np.hstack(Train_Data).transpose() #sklearn requires data in [n_samples,n_features]
    Train_Label = np.hstack(Train_Label)
    Train['Data'] = Train_Data
    Train['Label'] = Train_Label

    Validation = {}
    Validate_Data = np.c_[validate_data[:,0].tolist()]
    Validate_Label = np.c_[validate_data[:,1].tolist()]
    Validate_Data = np.hstack(Validate_Data).transpose() #sklearn requires data in [n_samples,n_features]
    Validate_Label = np.hstack(Validate_Label)
    Validation['Data'] = Validate_Data
    Validation['Label'] = Validate_Label

    Test = {}
    Test_Data = np.c_[test_data[:,0].tolist()]
    Test_Label = np.c_[test_data[:,1].tolist()]
    Test_Data = np.hstack(Test_Data).transpose() #sklearn requires data in [n_samples,n_features]
    Test_Label = np.hstack(Test_Label)
    Test['Data'] = Test_Data
    Test['Label'] = Test_Label
    
    return Train,Validation,Test

def Evaluation(y_true,y_pred,y_pred_prob):
    print('=============precision_recall_fscore_support======================')
    print(metrics.precision_recall_fscore_support(y_true, y_pred, average="binary"))
    print('===================Report=======================')
    print(metrics.classification_report(y_true, y_pred))
    print('===================Accuracy=======================')
    print(metrics.accuracy_score(y_true, y_pred,normalize=True,sample_weight=None))
    print('===================ConfusionMatrix=======================')
    c_matrix = metrics.confusion_matrix(y_true, y_pred)
    plt.figure(1)
    plot_confusion_matrix(c_matrix, classes=[0, 1], normalize = True, title='Confusion matrix with normalization')
    plt.show()

    print('===================ROC curve======================')
    #Roc Auc
    fpr, tpr, thresholds = metrics.roc_curve(y_true,y_pred)
    plt.figure(2)
    plt.plot(fpr, tpr, label='RF')
    plt.xlabel('False positive rate')
    plt.ylabel('True positive rate')
    plt.title('ROC curve')
    plt.legend(loc='best')
    plt.show()
    print('===================AUC score=======================')
    print(metrics.roc_auc_score(y_true,y_pred))

In [186]:
def Write_Estimated_Annotation(Predicted_Label_Vector,AnAnnotation,AllAnnotations,Parameters):
    '''
    Predicted_Label_Vector:The predicted label vector generated by classifier
    AnAnnotation: a list for only storing a pair of onset&offset for single event
    AllAnnotaions: the list for storing multiple AnAnnotation-lists for one processing file.
    '''
    for i in range(len(Predicted_Label_Vector)-1):
                    #If next frame is different with current frame
                    if Predicted_Label_Vector[i] != Predicted_Label_Vector[i+1]:
                        #Next Frame:Speech; 
                        if Predicted_Label_Vector[i+1] - Predicted_Label_Vector[i] > 0:
                            #Append this Onset
                            AnAnnotation.append((i+1)*Parameters['hop_size']/Parameters['sampling_rate'])
                        #Next Frame:Non speech
                        elif Predicted_Label_Vector[i+1] - Predicted_Label_Vector[i] < 0:
                            #Append this Offset
                            AnAnnotation.append((i+1)*Parameters['hop_size']/Parameters['sampling_rate'])
                            AnAnnotation.append('speech')
                            if len(AnAnnotation) == 2:
                                AnAnnotation.insert(0,0)
                            AllAnnotations.append(AnAnnotation)
                            AnAnnotation = []
                    #Insert the ending frame and class label manually if the speech last til the end
                    if len(Predicted_Label_Vector) - 2 == i and Predicted_Label_Vector[i] == 1:
                        AnAnnotation.append(np.floor(len(Predicted_Label_Vector)*Parameters['hop_size']/Parameters['sampling_rate']))
                        AnAnnotation.append('speech')
                        if len(AnAnnotation) == 2:
                            AnAnnotation.insert(0,0)
                        AllAnnotations.append(AnAnnotation)
                        AnAnnotation = []
    return AllAnnotations

def Annotations_Evaluation(File_List,Annotated_Data):
    #File_List: the list of dictionaries that save all reference and estimation txt pair. see detail in doc of sed_eval
    #Annotated_Data: empty list for saving all 
    
    # Get used event labels
    all_data = dcase_util.containers.MetaDataContainer()
    #load both ref and est event lists
    for file_pair in File_List:
        reference_event_list = sed_eval.io.load_event_list(
                filename=file_pair['reference_file']
        )
        ref_speech_list = sed_eval.util.event_list.filter_event_list(reference_event_list, scene_label=None, event_label='speech', filename=None)
        
        estimated_event_list = sed_eval.io.load_event_list(
                filename=file_pair['estimated_file']
        )
        est_speech_list = sed_eval.util.event_list.filter_event_list(estimated_event_list, scene_label=None, event_label='speech', filename=None)

        Annotated_Data.append({'ref_speech_list': ref_speech_list,
                        'est_speech_list': est_speech_list})
        all_data += ref_speech_list
    # Start evaluating
    # Create metrics classes, define parameters    
    event_labels = all_data.unique_event_labels
    segment_based_metrics = sed_eval.sound_event.SegmentBasedMetrics(
        event_label_list=event_labels,
        time_resolution=0.1
        )
    event_based_metrics = sed_eval.sound_event.EventBasedMetrics(
    event_label_list=event_labels,
    t_collar=0.250
    )
    # Go through files
    for file_pair in Annotated_Data:
        segment_based_metrics.evaluate(
                reference_event_list=file_pair['ref_speech_list'],
                estimated_event_list=file_pair['est_speech_list']
        )

        event_based_metrics.evaluate(
                reference_event_list=file_pair['ref_speech_list'],
                estimated_event_list=file_pair['est_speech_list']
        )
    # Get only certain metrics
    overall_segment_based_metrics = segment_based_metrics.results_overall_metrics()
    print('-------------------------------------')
    print("Accuracy:", overall_segment_based_metrics['accuracy']['accuracy'])
    # print all metrics as reports
    print(segment_based_metrics)
    
def Prediction_Evaluation(Dir,Estimator,Parameters,TransitionMatrix):
    #Dir: the path of the fold that contains of the test data
    #Estimator: Pre trained testimator
    
    file_list = []#list for storing  annotated_data over all files
    annotated_data = [] #list for storing all annotation pairs

    for root, dirs,files in os.walk(Dir):
        for file in os.listdir(root):   
            if file.endswith('.txt'):  
                # a dictionary contained ref and est pair is required by sed_eval
                ref_est_pair = {}
                AnAnnotation = [] #a list of single onset&offset pair
                AllAnnotations = []#a list for storing all onset&offset of speech events for one file
                
                #Get the processing filename without extension
                file_name = os.path.splitext(file)[0]
                
                #corresponding .wav file path
                audio_file_path = root + '/' + file_name + '.wav'                
                #This is the txt path for storing estimated annotation
                estimated_txt_path = os.getcwd() + '/estimated_txt/'  + file_name + '_estimated.txt'                
                #corresponding reference txt path
                reference_txt_path = root + file #the .txt file would be used as reference
                
                print(reference_txt_path)
                ref_est_pair['reference_file'] = reference_txt_path
                ref_est_pair['estimated_file'] = estimated_txt_path
                
                #Load Annotated Information and read speech event only                
                annotated_event = sed_eval.io.load_event_list(reference_txt_path)
                speech_event = sed_eval.util.event_list.filter_event_list(annotated_event, scene_label=None, event_label='speech', filename=None)               
                
                #process all audio data                     
                feature_vector,audio_data, audio_sr = FeatureExtraction(audio_file_path,Parameters)
                #Get Feature vector
                feature_vector = feature_vector.transpose()
                #Get true label vector
                y_label = CreateLabelVector(Data=audio_data,
                                                EventList=speech_event,
                                                Parameters=Parameters,
                                                LabelIndex=1) 
                #Predict the probability of speech for each frame
                y_pred_speech_prob = Estimator.predict_proba(feature_vector)[:,1] #[0][0] is non-speech; [0][1] is speech
                #viterbi smoothing
                viterbi_sequence = librosa.sequence.viterbi_binary(y_pred_speech_prob, TransitionMatrix, p_state=0.5, p_init=None).ravel()
                
                #create estimated annotation txt file based on predicted label vector, write all onset&offset pairs of speech events in a list
                estimated_speech_annotations = Write_Estimated_Annotation(Predicted_Label_Vector = viterbi_sequence,
                                                                          AnAnnotation = AnAnnotation,
                                                                          AllAnnotations = AllAnnotations,
                                                                          Parameters = Parameters)               
#=====================================================================   
                #write the estimated annotations
                with open(estimated_txt_path, 'w') as csv_file:
                    writer = csv.writer(csv_file, delimiter='\t')
                    writer.writerows(estimated_speech_annotations)
                file_list.append(ref_est_pair)
#=============================use sed_eval to evaluate the prediction=========  
    #file_list
    #annotated_data
    Annotations_Evaluation(File_List = file_list,
                           Annotated_Data = annotated_data)
    

In [187]:
#Load the dataset
TrainSetPath = os.getcwd()+'/JPNotebookExported/Train_Dataset.npy'
ValidateSetPath = os.getcwd()+'/JPNotebookExported/Validate_Dataset.npy'
TestSetPath = os.getcwd()+'/JPNotebookExported/Test_Dataset.npy'

#Load Transition Matrix
train_transition_matrix = np.load(os.getcwd()+'/JPNotebookExported/Train_TransitionMatrix.npy',allow_pickle=True)
validate_transition_matrix = np.load(os.getcwd()+'/JPNotebookExported/Validate_TransitionMatrix.npy',allow_pickle=True)
test_transition_matrix = np.load(os.getcwd()+'/JPNotebookExported/Test_TransitionMatrix.npy',allow_pickle=True)

#Load all the data and label
Train,Validation,Test = DatasetLoader(TrainSetPath,ValidateSetPath,TestSetPath)
Train_Data,Train_Label,Validate_Data,Validate_Label,Test_Data,Test_Label = Train['Data'], Train['Label'],Validation['Data'],Validation['Label'],Test['Data'],Test['Label']

ValueError: could not broadcast input array from shape (40,431) into shape (40)

In [63]:
#Make own scoring functions

def report_wrapper(y_true, y_pred): 
    #wrapper of metrics.classification_report
    return metrics.classification_report(y_true,y_pred)

def confusion_matrix_wrapper(y_true, y_pred): 
    return metrics.confusion_matrix(y_true, y_pred)

def recall_score_wrapper(y_true, y_pred): 
    return metrics.recall_score(y_true, y_pred)

def roc_auc_wrapper(y_true, y_pred): 
    return metrics.roc_auc_score(y_true, y_pred)

def f1_score_wrapper(y_true, y_pred): 
    return metrics.f1_score(y_true, y_pred)

def accuracy_wrapper(y_true, y_pred): 
    return metrics.accuracy_score(y_true, y_pred)

def precision_wrapper(y_true, y_pred): 
    return metrics.precision_score(y_true, y_pred)

In [65]:
Params = {
        'sampling_rate':22050,
        'win_size': 1024,
        'hop_size': 512,
        'min_freq': 80,
        'max_freq': 8000,
        'num_mel_filters': 128,
        'n_dct': 20}
#Grid search with cross validation

#Create the parameters grid
params_grid = {
    'bootstrap': [True],
    'max_depth': [5,15],
    'max_features': [40],
    'min_samples_leaf': [1,5],
    'min_samples_split': [4,8],
    'n_estimators': [5,15,35]
}

#Define your scoring strategy
#we can list a bunch of scoring functions here that predined by ssklearn
#Think about your problem first, then pick your effective ones
#scoring = ['recall','roc_auc']
scoring = {#'report_wrapper': make_scorer(report_wrapper),
           #'confusion_matrix_wrapper': make_scorer(confusion_matrix_wrapper),
           'recall_score_wrapper': make_scorer(recall_score_wrapper),
           'roc_auc_wrapper': make_scorer(roc_auc_wrapper),
           'f1_score_wrapper': make_scorer(f1_score_wrapper),
           'accuracy_wrapper': make_scorer(accuracy_wrapper),
           'precision_wrapper': make_scorer(precision_wrapper)}

# Instantiate a RF classfier
RFclf = RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=30, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=5, min_samples_split=4,
            min_weight_fraction_leaf=0.0, n_estimators=35, n_jobs=1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

# Instantiate grid search
RF_grid_search = GridSearchCV(estimator=RFclf, param_grid=params_grid, cv=5, 
                           n_jobs=-1, verbose=0, scoring=scoring,refit = 'recall_score_wrapper')
# Fit grid search to the data
RF_grid_search.fit(Train_Data, Train_Label)

# Train using best parameters
bestRF = RF_grid_search.best_estimator_

# Predictions
pred_Train = bestRF.predict(Train_Data)
pred_Validate = bestRF.predict(Validate_Data)
pred_Test = bestRF.predict(Test_Data)

# Predicted probability
pred_Train_prob = bestRF.predict_proba(Train_Data)[:,1]
pred_Validate_prob = bestRF.predict_proba(Validate_Data)[:,1]
pred_Test_prob = bestRF.predict_proba(Test_Data)[:,1]

# Evaluation
Validate_Data_path =  os.getcwd() + '/1_Dataset_Generate/audio/soundbanks/Validate/generated/bimodal/'
Prediction_Evaluation(Validate_Data_path,bestRF,Params,train_transition_matrix)


'\n# Train using best parameters\nbest_forest = grid_search.best_estimator_\n# Predictions\n#pred_train = best_forest.predict(train_data)\npred_validate = best_forest.predict(Validate_Data)\n# Accuracy\n#train_acc = accuracy_score(train_label, pred_train)\ntest_acc = accuracy_score(Validate_Label, pred_validate)\n#print ("train acc: {0:.2f}, test acc: {1:.2f}".format(train_acc, test_acc))\n# Calculate other evaluation metrics \n#precision, recall, F1, _ = precision_recall_fscore_support(Validate_Label, pred_validate, average="binary")\n#print ("precision: {0:.2f}. recall: {1:.2f}, F1: {2:.2f}".format(precision, recall, F1))\n'

In [182]:
bestRF_sample

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=55, max_features=40, max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=5,
            min_weight_fraction_leaf=0.0, n_estimators=15, n_jobs=1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)