Autor: Matyáš Sládek <br>
Rok: 2020 <br>

Tento soubor slouží k selekci atributů pomocí metod dopředné selekce a zpětné eliminace. <br>

Tato buňka importuje potřebné knihovny.

In [None]:
%%capture
import os
import time
from datetime import timedelta
import json
import re

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler

# Classifiers
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

from tqdm.notebook import tqdm   # Progress bars
tqdm().pandas()

Tato buňka obsahuje funkci pro tisk na obrazovku a zároveň do souboru <strong>feature_selection.out</strong>

In [None]:
def print_log(*args, **kwargs):
    """
    Prints to stdout and logs to file.
    """
    print(*args, **kwargs)   # Print to stdout
    with open('../metadata/misc/feature_selection.out','a') as file:
        print(re.sub('\\033\[.m', '', *args), file=file)   # Print to log file with removed bold text, that would not display correctly

Tato buňka obsahuje funkci pro načtení a zpracování dat a provedení selekce atributů pomocí metody dopředné selekce či zpětné eliminace s použitím křížové validace či validace na validační sadě. <br>

In [None]:
def get_feature_sets(optimised_feature_sets, dataset, library, classifiers, default_params, parameters):
    """
    Finds optimal feature subset using forward selection and/or backward elimination method.

    Parameters:

    optimised_feature_sets: Dictionary containing optimised feature sets
    dataset:                Name of a dataset to indicate which extracted features to load (dataset from which the features were extracted)
    library:                Name of a library to indicate which extracted features to load (library with which the features were extracted)
    classifiers:            Dictionary containing classifiers for which the optimal feature subset should be found
    default_params:         Dictionary containing parameters for classifiers
    parameters:             Parameters about which selection to use etc.
    """
    
    # Get number of CPU cores for multiprocessing
    num_cpus = 1 if os.cpu_count() is None else os.cpu_count()
    
    # Sets a character for formatting printing
    format_char = ''
    if parameters['verbose']:
        format_char = '\n'
      
    # Load specified extracted features
    try:                
        features = pd.read_csv('../metadata/features/features_{}_{}.csv'.format(dataset, library), index_col=0, header=[0, 1, 2])        
    except Exception as e:
        print_log('Failed to read file: "../metadata/features/features_{}_{}.csv"!'.format(dataset, library), file=sys.stderr)
        print_log('Error: {}'.format(repr(e)), file=sys.stderr)
        return -1

    # Perform One-hot encoding on categorical features
    for column in features.select_dtypes(include='object'):   # For each categorical column
        dummy_columns = pd.get_dummies(features[column], drop_first=True)   # Encode the column values  
        features = features.drop(columns=column)   # Drop the column from the dataframe
        dummy_columns.columns = pd.MultiIndex.from_product([[column[0]], [column[1]], ['{}'.format(c) for c in dummy_columns.columns]], names=features.columns.names) # Reindex for consistance
        features = pd.concat([features, dummy_columns], axis=1).sort_index(axis=1)   # Append columns to features dataframe

    feature_names = list(features.columns.levels[0])   # Get names of all features     
        
    # Load track-genre list
    try:                
        genres = pd.read_csv("../metadata/track_genre_lists/{}_track_genre_list.csv".format(dataset), index_col=0, header=0)        
    except Exception as e:
        print_log('Failed to read file: "../metadata/track_genre_lists/{}_track_genre_list.csv"!'.format(dataset), file=sys.stderr)
        print_log('Error: {}'.format(repr(e)), file=sys.stderr)
        return -1
    
    genres = genres.loc[features.index]   # Remove unwanted data from track-genre list (data about tracks removed from features because of corruption or other reason)

    # Encode genre labels
    encoder = LabelEncoder()
    y = encoder.fit_transform(np.ravel(genres))

    scaler = StandardScaler()
    
    ###################################################################################################################################################
    # FORWARD SELECTION
    
    if parameters['use_forward_selection']:   # If forward selection is selected
        print_log('Selecting features using forward selection method:')
        t1 = time.time()   # Store the start time of forward selection 

        for classifier_name, classifier in tqdm(classifiers.items(), desc='classifiers', leave=True):   # For each of the selected classifiers
            print_log('\nSelecting features for classifier \033[1m{}\033[0m:\n'.format(classifier_name))
            t2 = time.time()   # Store the start time of forward selection for the classifier

            classifier = classifier(**default_params[classifier_name])   # Initialize the classifier
            n_jobs = parameters['cross_validation_num_of_folds'] if classifier_name in ['LogisticRegression', 'DecisionTreeClassifier', 'SVC_linear', 'SVC_rbf'] else 1   # Set number of processes for cross-validation for classifiers which do not support multiprocessing
            n_jobs = n_jobs if n_jobs <= num_cpus else num_cpus   # Limit number of processes to CPU cores
            remaining_features = feature_names.copy()   # Stores the feature names from which to select
            best_features = []   # Stores the best selected features
            score_max_previous = 0   # Stores the classification score of the best feature set in last completed iteration

            for i in range(len(feature_names)):   # For each feature name
                scores_tmp = pd.Series(index=remaining_features, dtype='float64')   # Stores scores for all feature sets in each iteration

                for feature_name in remaining_features:   # For each of the yet unselected features
                    X = features[best_features + [feature_name]].values   # Add the feature to the best feature set
                    X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)   # Split the samples to train and test data (test data is not used)
                    
                    if parameters['use_cross_validation']:   # If cross-validation is selected, transform all the training data and perform cross-validation                        
                        X_train = scaler.fit_transform(X_train)
                        scores_tmp[feature_name] = cross_val_score(classifier, X_train, y_train, cv=StratifiedKFold(n_splits=parameters['cross_validation_num_of_folds']), n_jobs=n_jobs).mean()
                    else:   # If validation set is selected, split the training data to training and validation sets, fit the classifier and perform classification on the validation set
                        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=parameters['validation_set_size'], random_state=42, stratify=y_train)
                        X_train = scaler.fit_transform(X_train)
                        X_val = scaler.transform(X_val)
                        classifier.fit(X_train, y_train)
                        scores_tmp[feature_name] = classifier.score(X_val, y_val)
                        
                    if parameters['verbose']:   # If verbose is selected, print out score for each of the added features
                        print_log('Score with feature \033[1m{}\033[0m: \033[1m{}\033[0m'.format(feature_name, scores_tmp[feature_name]))

                score_max = scores_tmp.max()   # Store the score of the best performimg feature set

                if(score_max > score_max_previous):   # If any of the features added in the last iteration has improved classification score, add the feature which improved score the most to the best feature set, store the best score and remove the feature from the features to be selected from
                    score_max_previous = score_max
                    best_features.append(scores_tmp.idxmax())
                    remaining_features.remove(scores_tmp.idxmax())

                    print_log('{}Added feature: \033[1m{}\033[0m Score: \033[1m{}\033[0m{}'.format(format_char, scores_tmp.idxmax(), score_max, format_char))
                else:   # If none of the features added in the last iteration improved classification score, stop the selection
                    print_log('\nAdding any of the remaining features did not improve score, stopping.')
                    break

            # Add the classifier name to the optimised feature sets dictionary if not there already
            if not classifier_name in optimised_feature_sets:
                optimised_feature_sets[classifier_name] = {}
                
            # Name the best performing feature set according to the selection and validation methods used and store it
            name = 'opt_feature_set_FS_CV' if parameters['use_cross_validation'] else 'opt_feature_set_FS_VS'
            optimised_feature_sets[classifier_name][name] = best_features

            print_log('\nSelecting features for classifier \033[1m{}\033[0m finished in ~\033[1m{}\033[0m.'.format(classifier_name, str(timedelta(seconds=(time.time() - t2))).split(".")[0]))
            print_log('Best feature subset FS: \033[1m{}\033[0m'.format(best_features))
            print_log('Score: \033[1m{}\033[0m'.format(score_max_previous))
            print_log('-'*100)

        print_log('\nSelecting features using forward selection method finished in ~\033[1m{}\033[0m.'.format(str(timedelta(seconds=(time.time() - t1))).split(".")[0]))
        print_log('*'*100)
    
    ###################################################################################################################################################
    # BACKWARD ELIMINATION
    
    if parameters['use_backward_elimination']:   # If backward elimination is selected
        print_log('Selecting features using backward elimination method:')
        t1 = time.time()   # Store the start time of backward elimination

        for classifier_name, classifier in tqdm(classifiers.items(), desc='classifiers', leave=True):   # For each of the selected classifiers
            print_log('\nSelecting features for classifier \033[1m{}\033[0m:\n'.format(classifier_name))
            t2 = time.time()# Store the start time of backward elimination for the classifier

            classifier = classifier(**default_params[classifier_name])   # Initialize the classifier
            n_jobs = parameters['cross_validation_num_of_folds'] if classifier_name in ['LogisticRegression', 'DecisionTreeClassifier', 'SVC_linear', 'SVC_rbf'] else 1   # Set number of processes for cross-validation for classifiers which do not support multiprocessing
            n_jobs = n_jobs if n_jobs <= num_cpus else num_cpus   # Limit number of processes to CPU cores
            best_features = feature_names.copy()   # Stores the best selected features
            X = features[best_features].values   # Get the values of the features
            X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)   # Split the samples to train and test data (test data is not used)
            
            # Perform classification with all features to get the score to compare results with
            if parameters['use_cross_validation']:   # If cross-validation is selected, transform all the training data and perform cross-validation                         
                X_train = scaler.fit_transform(X_train)
                scores_tmp[feature_name] = cross_val_score(classifier, X_train, y_train, cv=StratifiedKFold(n_splits=parameters['cross_validation_num_of_folds']), n_jobs=n_jobs).mean()
            else:   # If validation set is selected, split the training data to training and validation sets, fit the classifier and perform classification on the validation set
                X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=parameters['validation_set_size'], random_state=42, stratify=y_train)
                X_train = scaler.fit_transform(X_train)
                X_val = scaler.transform(X_val)
                classifier.fit(X_train, y_train)
                score_max_previous = classifier.score(X_val, y_val)
                
            print_log('Initial score: \033[1m{}\033[0m'.format(score_max_previous))

            for i in range(len(feature_names)):   # For each feature name
                scores_tmp = pd.Series(index=best_features, dtype='float64')   # Stores scores for all feature sets in each iteration

                for feature_name in best_features:   # For each of the remaining features
                    current_features = best_features.copy()   # Restore remaining features
                    current_features.remove(feature_name)   # Remove the feature
                    X = features[current_features].values   # Get the values of the features
                    X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)   # Split the samples to train and test data (test data is not used)
                    
                    # Perform classification with the currently selected features
                    if parameters['use_cross_validation']:   # If cross-validation is selected, transform all the training data and perform cross-validation                           
                        X_train = scaler.fit_transform(X_train)
                        scores_tmp[feature_name] = cross_val_score(classifier, X_train, y_train, cv=StratifiedKFold(n_splits=parameters['cross_validation_num_of_folds']), n_jobs=n_jobs).mean()
                    else:   # If validation set is selected, split the training data to training and validation sets, fit the classifier and perform classification on the validation set
                        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=parameters['validation_set_size'], random_state=42, stratify=y_train)
                        X_train = scaler.fit_transform(X_train)
                        X_val = scaler.transform(X_val)
                        classifier.fit(X_train, y_train)
                        scores_tmp[feature_name] = classifier.score(X_val, y_val)

                    if parameters['verbose']:
                        print_log('Score without feature \033[1m{}\033[0m: \033[1m{}\033[0m'.format(feature_name, scores_tmp[feature_name]))

                score_max = scores_tmp.max()

                if(score_max >= score_max_previous):   # If removing any of the features in the last iteration has improved classification score, remove the feature which improved score the most from the best feature set and store the best score
                    score_max_previous = score_max
                    best_features.remove(scores_tmp.idxmax())

                    print_log('{}Removed feature: \033[1m{}\033[0m Score: \033[1m{}\033[0m{}'.format(format_char, scores_tmp.idxmax(), score_max, format_char))
                else:   # If removing any of the features in the last iteration did not improve classification score, stop the selection
                    print_log('\nRemoving any of the remaining features did worsen score, stopping.')
                    break

            # Add the classifier name to the optimised feature sets dictionary if not there already
            if not classifier_name in optimised_feature_sets:
                optimised_feature_sets[classifier_name] = {}
                
            # Name the best performing feature set according to the selection and validation methods used and store it
            name = 'opt_feature_set_BE_CV' if parameters['use_cross_validation'] else 'opt_feature_set_BE_VS'
            optimised_feature_sets[classifier_name][name] = best_features

            print_log('\nSelecting features for classifier \033[1m{}\033[0m finished in ~\033[1m{}\033[0m.'.format(classifier_name, str(timedelta(seconds=(time.time() - t2))).split(".")[0]))
            print_log('Best feature subset BE: \033[1m{}\033[0m'.format(best_features))
            print_log('Score: \033[1m{}\033[0m'.format(score_max_previous))
            print_log('-'*100)

        print_log('\nSelecting features using backward elimination method finished in ~\033[1m{}\033[0m.'.format(str(timedelta(seconds=(time.time() - t1))).split(".")[0]))
        print_log('*'*100)
    
    ###################################################################################################################################################
        
    return optimised_feature_sets

Tato buňka slouží k nastavení potřebných parametrů a výběru datových sad, způsobů selekce a klasifikátorů, pro které má být selekce provedena. <br>
Pro jaké datové sady má být selekce provedena je možné zvolit v proměnné <code>datasets</code> odkomentováním/zakomentováním příslušných záznamů. <br>
Při použití vlastní datové sady je nutné do proměnné <code>datasets</code> přidat stejný název datové sady, jako byl použit pro extrakci atributů v souboru <strong>feature_extraction.ipynb</strong> <br>
Dále je nutné zvolit, jestli selekci provést na atributech extrahovaných pomocí knihovny Librosa či Essentia odkomentováním/zakomentováním příslušných záznamů v proměnné <code>feature_extraction_libraries</code>. <br>
Pro které klasifikátory má být selekce provedena je možné zvolit v proměnné <code>classifiers</code> odkomentováním/zakomentováním příslušných záznamů. <br>
Při použití jiného klasifikátorů je tento nutné přidat do proměnné <code>classifiers</code> ve formátu {zvolený_název_klasifikátoru}:{odkaz_na_objekt_klasifikátoru}. <br>
Paremetry klasifikátorů je možné upravit v proměnné <code>default_params</code>, případně pro nový klasifikátor přidat záznam ve formátu {zvolený_název_klasifikátoru}:{slovník_parametrů}, kde zvolený název klasifikátoru musí odpovídat názvu v proměnné <code>classifiers</code>. <br>
V proměnné <code>parameters</code> je možné nastavit průběh selekce. <br>
<br>
Popis parametrů: <br>
<ul>
    <li><code>use_forward_selection</code> Hodnota True znamená použití dopředné selekce</li>
    <li><code>use_backward_elimination</code> Hodnota True znamená použití zpětné eliminace</li>
    <li><code>use_cross_validation</code> Hodnota True znamená použití křížové validace, hodnota False pak validace na validační sadě</li>
    <li><code>cross_validation_num_of_folds</code> Značí, kolikanásobná bude křížová validace, pokud je tato metoda zvolena</li>
    <li><code>validation_set_size</code> Značí, jaká část trénovacích dat bude vyhrazena jako validační, pokud je tato metoda zvolena</li>
    <li><code>verbose</code> Hodnota True znamená rozšířený výpis (vypsáno bude dosažené skóre pro každou vnitřní iteraci výběrů), hodnota False znamená výpis pouze finálně přidaných/odebraných atributů</li>
</ul>

Při nastavování parametrů je třeba brát v potaz, že selkece je velmi výpočetně náročný proces. Například při použití zpětné eliminace s pětinásobnou křížovou validací pro jeden klasifikátor na sadě 80 atributů extrahovaných pomocí knihovny Essentia bude při odstranění pouhých pěti atributů klasifikátor natrénován celekm 1+((80+79+78+77+76)*5) = 1951 krát! <br>

In [None]:
if __name__ == "__main__":
        
    # List of datasets whose features should be used, unwanted can be commented out   
    datasets = [
#         'EBD',
#         'FMA',
        'GTZAN'
    ]
    
    # List of feature extraction libraries whose features should be used, unwanted can be commented out   
    feature_extraction_libraries = [
        'librosa',
#         'essentia'
    ]
    
    # Dictionary of classifier names and objects for which optimal feature subset should be found, unwanted can be commented out   
    classifiers = {
        # sklearn classifiers
#         'LogisticRegression':LogisticRegression,
        'KNeighborsClassifier':KNeighborsClassifier,
#         'MLPClassifier':MLPClassifier,
#         'DecisionTreeClassifier':DecisionTreeClassifier,
#         'SVC_linear':SVC,
#         'SVC_rbf':SVC,
        
        # sklearn ensemble classifiers
#         'RandomForestClassifier':RandomForestClassifier,
        
        # other classifiers
#         'XGBClassifier':XGBClassifier,
    }
    
    # Default classifiers parameters to use 
    default_params = {
        'LogisticRegression': {'max_iter':10000, 'class_weight':'balanced'},
        'KNeighborsClassifier': {'n_jobs':-1, 'algorithm':'brute'},
        'MLPClassifier': {'max_iter':10000, 'random_state':42},
        'DecisionTreeClassifier': {'class_weight':'balanced', 'random_state':42},
        'SVC_linear': {'kernel':'linear', 'class_weight':'balanced'},
        'SVC_rbf': {'kernel':'rbf', 'class_weight':'balanced'},
        'RandomForestClassifier': {'n_jobs':-1, 'class_weight':'balanced', 'random_state':42},
        'XGBClassifier': {'tree_method':'gpu_hist', 'n_jobs':1, 'random_state':42},
    }
    
    # Parameters for the feature selection
    parameters = {
        'use_forward_selection' : True,   # Set to true to use forward selection method
        'use_backward_elimination' : True,   # Set to true to use backward elimination method
        'use_cross_validation': False,   # Set to true to use cross-validation on the entire training data, else a portion of the train data will be reserved as validation set and cross-validation will not be used
        'cross_validation_num_of_folds' : 5,   # Determines the number of cross-validation folds, ignored if 'use_cross_validation' parameter is set to False
        'validation_set_size': 0.2,   # Determines the size of the portion of training data, which will be reserved as validation set, ignored if 'use_cross_validation' parameter is set to True
        'verbose' : False   # Set to True to print out each of the tried out combinations of features and its scores
    }
    
    # Load optimised feature sets file if available
    try:
        with open('../metadata/misc/optimised_feature_sets.json') as f:
            optimised_feature_sets = json.load(f)            
    except FileNotFoundError:    
        optimised_feature_sets = {}
        
    if parameters['use_cross_validation']:
        print_log('Using cross-validation.')
    else:
        print_log('Using validation set.')
    
    t_start = time.time()   # Store the start time of the feature selection 
    
    for dataset in datasets:   # For each of the selected datasets
        
        # Add dataset name to optimised feature sets dictionary if not there already
        if not dataset in optimised_feature_sets:
            optimised_feature_sets[dataset] = {}
        
        for library in feature_extraction_libraries:   # For each of the selected extraction libraries features
            
            # Add extraction library name to optimised feature sets dictionary if not there already
            if not library in optimised_feature_sets[dataset]:
                optimised_feature_sets[dataset][library] = {}
            
            print_log('#'*100)
            print_log('Selecting features from \033[1m{}\033[0m dataset features extracted with \033[1m{}\033[0m library:\n'.format(dataset, library))
            t = time.time()   # Store the start time of feature selection for selected dataset and extraction library features
            get_feature_sets(optimised_feature_sets[dataset][library], dataset, library, classifiers, default_params, parameters)
            print_log("\nSelecting features from \033[1m{}\033[0m dataset features extracted with \033[1m{}\033[0m library finished in ~\033[1m{}\033[0m.".format(dataset, library, str(timedelta(seconds=(time.time() - t))).split(".")[0]))
            print_log('#'*100)

    # Save optimised feature sets to .json file
    with open('../metadata/misc/optimised_feature_sets.json', 'w') as json_file:
        json.dump(optimised_feature_sets, json_file)
        
    print_log("Selecting features from all selected datasets and all selected features finished in ~\033[1m{}\033[0m.".format(str(timedelta(seconds=(time.time() - t_start))).split(".")[0]))