In [None]:
import numpy as np
import pandas as pd
import csv
from data_loader import DataLoader
from pandas import DataFrame
from sklearn.model_selection import train_test_split
from sklearn.metrics import balanced_accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, ExtraTreesClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.linear_model import LassoCV
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.feature_selection import f_classif
from sklearn.feature_selection import VarianceThreshold
from sklearn.feature_selection import SelectFromModel
import seaborn as sns
import matplotlib.pyplot as plt

Przestrzenie poszukiwań hiperparametrów dla wybranych algorytmów

In [None]:
search_spaces = {
    'random_forest' : {
        "estimator__n_estimators" : np.arange(10, 200),
        "estimator__max_depth" : np.arange(1, 50),
        "estimator__max_samples" : np.linspace(0.1, 1, num=100),
        "estimator__max_features" : ['sqrt', 'log2'],
    },
    'k_neighbours' : {
        "estimator__n_neighbors" : np.arange(1, 30),
    },
    'gradient_boosting' : {
        "estimator__n_estimators" : np.arange(1, 200, step=50),
        "estimator__learning_rate" : np.logspace(-10, 0, base=2.0, num=10),
        "estimator__subsample" : np.linspace(0.1, 1, num=10),
        "estimator__loss" : ['log_loss', 'exponential'],
        "estimator__max_depth" : np.arange(1, 16, step=2),
        "estimator__max_features" : np.arange(0.05, 1.05, 0.05),
    },
    'extra_trees' : {
        "estimator__n_estimators" : np.arange(80, 120),
        "estimator__max_depth" : np.arange(16, 20),
        'feature_selector__max_features' : np.arange(17, 23)
    }
}

Klasa do optymalizacji hiperparametrów wraz z selekcją cech

In [None]:
class HPO():
    def __init__(self, estimator, feature_selector, search_space, random_state=0, test_size=0.3, n_iter=10, n_jobs=1): 
        self.pipeline = Pipeline(steps=[
            ('feature_selector', feature_selector),
            ('estimator', estimator)
        ])

        self.n_iter = n_iter
        self.search_space = search_space
        self.random_state = random_state
        self.test_size = test_size
        self.n_jobs = n_jobs
        self.estimator_name = estimator.__class__.__name__
        self.feature_selector_name = feature_selector

    def load_dataset(self):
        X, y = DataLoader.read_train_data()

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=self.test_size, random_state=self.random_state)
        self.y_train = np.ravel(self.y_train)
        self.y_test = np.ravel(self.y_test)
        
    def save_to_file(self, type, results):
        csv_file_path = '{0}-{1}-{2}-{3}.csv'.format(type, self.estimator_name, self.random_state, self.feature_selector_name)

        with open(csv_file_path, 'w', newline='') as csvfile:
            all_param_names = set()
            for params_dict in results['params']:
                all_param_names.update(params_dict.keys())

            fieldnames = ['Iteration'] + list(all_param_names) + ['Mean test score']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=';')
            writer.writeheader()

            for i in range(len(results['params'])):
                row_data = {'Iteration': i + 1}
                row_data.update(results['params'][i])
                row_data.update({
                    'Mean test score': results['mean_test_score'][i],
                })

                writer.writerow(row_data)

        print(f"Wyniki zostały zapisane do pliku CSV: {csv_file_path}")

    def run_without_optimization(self):
        self.pipeline.fit(self.X_train, self.y_train)
        predictions = self.pipeline.predict(self.X_test)
        balanced_accuracy = balanced_accuracy_score(self.y_test, predictions)
        print(f"Balanced Accuracy for {self.feature_selector_name} and {self.estimator_name}: {balanced_accuracy}")
        return balanced_accuracy

    def run_random_search(self):
        rs = RandomizedSearchCV(self.pipeline, self.search_space, n_iter=self.n_iter,
            random_state=self.random_state, n_jobs=self.n_jobs, scoring='balanced_accuracy')
        rs.fit(self.X_train, self.y_train)
        score = rs.score(self.X_test, self.y_test)
        print(score)
        self.save_to_file('random_search', rs.cv_results_)

        return rs

Zbieranie danych do heatmapy przedstawiającej wyniki balanced accuracy dla poszczególnych algorytmów z użyciem danych metod feature selection

In [None]:
def run_all():
    algorithms = [
        {'estimator': RandomForestClassifier(random_state=0), 'name': 'random_forest'},
        {'estimator': GradientBoostingClassifier(random_state=0), 'name': 'gradient_boosting'},
        {'estimator': KNeighborsClassifier(), 'name': 'k_neighbours'},
        {'estimator': ExtraTreesClassifier(random_state=0), 'name': 'extra_trees'},
    ]

    max_features = 20

    feature_selectors = [
        SelectFromModel(LinearSVC(penalty="l1", dual=False), max_features=max_features),
        SelectFromModel(ExtraTreesClassifier(random_state=0), max_features=max_features),
        SelectFromModel(GradientBoostingClassifier(random_state=0), max_features=max_features),
        SelectFromModel(RandomForestClassifier(random_state=0), max_features=max_features),
        SelectFromModel(LassoCV(cv=5, random_state=42)),
        VarianceThreshold(threshold=0.1),
        SelectKBest(chi2, k=max_features),
        SelectKBest(f_classif, k=max_features),
    ]

    results = DataFrame(columns=['selection_method', 'classifier', 'score'])
    for feature_selector in feature_selectors:
        for algorithm in algorithms:
            hpo = HPO(algorithm['estimator'], feature_selector, search_spaces[algorithm['name']], n_iter=10, n_jobs=-1)
            hpo.load_dataset()
            balanced_accuracy_score = hpo.run_without_optimization()
            results.loc[len(results)] = [feature_selector, algorithm['name'], balanced_accuracy_score]

    results.to_csv('feature_selector_comparison.csv', index=False, sep=',')

In [None]:
run_all()

Wykreślenie heatmapy przedstawiającej wyniki balanced accuracy dla poszczególnych algorytmów z użyciem danych metod feature selection

In [None]:
df = pd.read_csv('feature_selector_comparison.csv', sep=',')
print(df.head())

average_score = df.groupby('classifier')['score'].mean().reset_index()
print(average_score)

heatmap_data = df.pivot(index='selection_method', columns='classifier', values='score')

plt.figure(figsize=(12, 6))
heatmap = sns.heatmap(heatmap_data, annot=True, cmap="YlGnBu", fmt=".4f", cbar_kws={"shrink": 0.8})

heatmap.set_xlabel('Algorytm', labelpad=15)
heatmap.set_ylabel('Metoda selekcji', labelpad=10)

cbar = heatmap.collections[0].colorbar
cbar.set_label('Balanced accuracy', labelpad=15)

plt.savefig('feature-selectors.svg', bbox_inches='tight')
plt.show()

Zebranie danych do wykresów balanced accuracy w zależności od liczby wybranych kolumn

In [None]:
algorithms = [
        {'estimator': RandomForestClassifier(random_state=0), 'name': 'RandomForest'},
        {'estimator': GradientBoostingClassifier(random_state=0), 'name': 'GradientBoosting'},
        {'estimator': KNeighborsClassifier(), 'name': 'KNeighbors'},
        {'estimator': ExtraTreesClassifier(random_state=0), 'name': 'ExtraTrees'},
    ]

import os

directory = 'scores_for_columns'
os.makedirs(directory, exist_ok=True)

for algorithm in algorithms:
    scores = []
    for number_of_features in range(1, 100):
        feature_selector = SelectFromModel(RandomForestClassifier(random_state=0), max_features=number_of_features)
        hpo = HPO(algorithm["estimator"], feature_selector, {}, n_iter=10, n_jobs=-1)
        hpo.load_dataset()
        score = hpo.run_without_optimization()
        scores.append(score)

    df = pd.DataFrame({'balanced_accuracy': scores})
    df.to_csv(f'{directory}/RF_scores_for_columns_{algorithm["name"]}.csv', index=False)

Wykreślenie wykresów balanced accuracy w zależności od liczby wybranych kolumn

In [None]:
import os

directory = 'scores_for_columns'

file_list = os.listdir(directory)
csv_files = [file for file in file_list if file.endswith('.csv')]

plot_directory = 'plots_for_columns'
os.makedirs(plot_directory, exist_ok=True)

for file in csv_files:
    file_path = os.path.join(directory, file)
    classifier_name = file.split('_')[-1].split('.')[0]
    df = pd.read_csv(file_path)

    scores = df['balanced_accuracy'].tolist()

    best_index = scores.index(max(scores))
    best_score = max(scores)

    x_values = list(range(1, len(scores) + 1))

    plt.figure(figsize=(12, 6))
    plt.plot(x_values, scores, marker='o', linestyle='-', color='b')
    plt.text(len(scores) - 2, max(scores) - 0.015, f'{max(scores):.4f}', color='r')
    plt.axhline(y=max(scores), color='r', linestyle='--', label='Najlepszy wynik')
    plt.axvline(x=best_index + 1, color='r', linestyle='--')
    plt.text(best_index + 2, min(scores), best_index + 1, color='r')
    plt.xlabel('Liczba wybranych cech')
    plt.ylabel('Balanced accuracy')
    plt.grid(True)
    plt.legend()
    plt.title(classifier_name, pad=15)
    plt.savefig(f'{plot_directory}/{classifier_name}-features-scores.svg', bbox_inches='tight')
    plt.show()

Próba optymalizacji hiperparametrów

In [None]:
feature_selector = SelectFromModel(ExtraTreesClassifier(random_state=0), max_features=18)
hpo = HPO(ExtraTreesClassifier(random_state=0), feature_selector, search_spaces['extra_trees'], n_iter=30, n_jobs=-1)
hpo.load_dataset()
predictor = hpo.run_random_search()

Ostateczny pipeline z wynikiem

In [None]:
predictor = Pipeline(steps=[
            ('feature_selector', SelectFromModel(ExtraTreesClassifier(random_state=0), max_features=18)),
            ('estimator', ExtraTreesClassifier(random_state=0))
])

X, y = DataLoader.read_train_data()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)
predictor.fit(X_train, y_train)

In [None]:
x_final_test = DataLoader.read_test_data()

y_pred = predictor.predict_proba(x_final_test)
y_pred = DataFrame(y_pred)
DataLoader.save_results(y_pred.reset_index(drop=True)[1])