Declaração de funções custo

In [18]:
import numpy as np
import pandas as pd
from math import ceil
from typing import Literal
import matplotlib.pyplot as plt
from numpy.typing import NDArray
from IPython.display import display
from abc import ABC, abstractmethod
from sklearn.tree import DecisionTreeClassifier

In [19]:
def distance_euclidean(x1: NDArray, x2: NDArray):
    return np.linalg.norm(x1 - x2, axis=1)

def distance_mahalanobis(x1, x2, inv_cov):
    diff = x1 - x2
    return np.sqrt(np.diag(diff @ inv_cov @ diff.T))


Calcular métricas de classificação

In [20]:
def evaluate_metrics(y_test: NDArray, y_predict: NDArray):
    y_test = y_test[:, 0]
    n = y_test.shape[0]

    TP = np.sum(np.logical_and(y_predict == y_test, y_test == 1))
    TN = np.sum(np.logical_and(y_predict == y_test, y_test == 0))
    FP = np.sum(np.logical_and(y_predict == 1, y_test == 0))
    FN = np.sum(np.logical_and(y_predict == 0, y_test == 1))

    accuracy = (TP + TN) / n
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1_score = 2 * (precision * recall) / (precision + recall)

    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
    }
    return metrics


Declaração dos escaladores MinMax e ZScore

In [21]:
class Scaler(ABC):
    @abstractmethod
    def normalize(self, X: NDArray, update_params: bool = False) -> NDArray:
        pass

    @abstractmethod
    def denormalize(self, X: NDArray) -> NDArray:
        pass

class ZScore(Scaler):
    def __init__(self):
        self.mean: NDArray = None
        self.std: NDArray = None
        pass

    def normalize(self, X: NDArray, update_params: bool = False):
        mean = np.mean(X, axis=0) if self.mean is None or update_params else self.mean
        std = np.std(X, axis=0) if self.std is None or update_params else self.std

        self.mean = mean
        self.std = std

        return (X - mean) / std

    def denormalize(self, X: NDArray):
        return X * self.std + self.mean

class MinMax(Scaler):
    def __init__(self):
        self.min = None
        self.max = None
        pass

    def normalize(self, X: NDArray, update_params: bool = False):
        min = np.min(X, axis=0) if self.min is None or update_params else self.min
        max = np.max(X, axis=0) if self.max is None or update_params else self.max

        self.min = min
        self.max = max

        return (X - min) / (max - min)

    def denormalize(self, X: NDArray):
        return X * (self.max - self.min) + self.min


Implementação do KNN com a mesma interface do modelo do scikit learn para usar na validação cruzada

In [22]:
class Model(ABC):
    has_training_costs = False

    @abstractmethod
    def fit(self, X: NDArray, y: NDArray):
        pass

    @abstractmethod
    def predict(self, X: NDArray) -> NDArray:
        pass


class KNN(Model):
    def __init__(self, k: int, distance_function: Literal['euclidean', 'mahalanobis'] = 'euclidean'):
        self.k = k
        self.distance_function = distance_function
        self.in_scaler = ZScore()

    def fit(self, X: NDArray, y: NDArray):
        X = self._preprocess_input(X, True)
        self.X = X
        self.y = y

        if self.distance_function == 'mahalanobis':
            cov = np.cov(X.T, bias=True)
            self.inv_cov = np.linalg.pinv(cov)

    def predict(self, X: NDArray):
        X = self._preprocess_input(X, False)
        n, m = X.shape
        y_predict = np.zeros(n)
        for i, x in enumerate(X):
            distances = self._get_distances(x)
            k_neighbors = np.argpartition(distances, self.k)[:self.k]
            y_predict[i] = np.bincount(self.y[k_neighbors].ravel()).argmax()

        return y_predict

    def _preprocess_input(self, X: NDArray, update_params: bool = False):
        n, m = X.shape
        X_normalized = self.in_scaler.normalize(X, update_params)
        return X_normalized

    def _get_distances(self, x: NDArray):
        match self.distance_function:
            case 'euclidean':
                return distance_euclidean(self.X, x)
            case 'mahalanobis':
                return distance_mahalanobis(self.X, x, self.inv_cov)
            case _:
                raise ValueError(f'Unknown distance function: {self.distance_function}')


Classes para treinar vários modelos usando validação cruzada (K-Folds) e Grid Search


In [23]:
class KFoldTrainer:
    '''
    Treina um modelo (instanciado com hyper parâmetros), separando k partições de treino/validação
    Salva a média das métricas de cada validação feita
    '''
    def __init__(self, k: int, n_classes: int, dataset: NDArray, model: Model):
        self.k = k
        self.n_classes = n_classes
        self.dataset = dataset
        self.model = model
        self.metrics = None

    def fit(self):
        metrics = {
            'accuracy': np.zeros(self.k),
            'precision': np.zeros(self.k),
            'recall': np.zeros(self.k),
            'f1_score': np.zeros(self.k),
        }
        for i, dataset in enumerate(self._genereate_dataset_k_folds()):
            X_train, y_train, X_validation, y_validation = dataset
            self.model.fit(X_train, y_train)

            y_predict = self.model.predict(X_validation)
            metrics_i = evaluate_metrics(y_validation, y_predict)
            for key, value in metrics_i.items():
                metrics[key][i] = value

        self.metrics = {
            key: np.mean(value)
            for key, value in metrics.items()
        }

    def _genereate_dataset_k_folds(self):
        np.random.shuffle(self.dataset)
        X, y = self.dataset[:, :-1], self.dataset[:, -1].astype(int).reshape((-1, 1))
        validation_size_percent = 1 / self.k

        n, _ = X.shape
        validation_size = ceil(n * validation_size_percent)
        for i in range(self.k):
            validation_index_start = i * validation_size
            validation_index_final = (i + 1) * validation_size if i < self.k - 1 else n

            X_validation = X[validation_index_start:validation_index_final]
            y_validation = y[validation_index_start:validation_index_final]
            X_train = np.concatenate([X[:validation_index_start], X[validation_index_final:]])
            y_train = np.concatenate([y[:validation_index_start], y[validation_index_final:]])

            yield X_train, y_train, X_validation, y_validation

class GridSearchTrainer:
    '''
    Instancia um modelo a partir de sua classe com cada combinação possível dos hyper parâmetros recebidos
    Chama o KFoldTrainer para fazer a validação cruzada para cada combinação de hyper parâmetros
    Salva as métricas, modelo (treinado), e hyper parâmetros escolhidos da melhor combinação encontrada (avaliado pela acurácia)
    '''
    def __init__(self, k: int, n_classes: int, dataset: NDArray, model_class: type[Model], hyper_params: dict[str, list]):
        self.k = k
        self.n_classes = n_classes
        self.dataset = dataset
        self.model_class = model_class
        self.hyper_params = hyper_params
        self.best_model = None
        self.best_hyper_params = None
        self.best_metrics = None

    def fit(self):
        params_names = list(self.hyper_params.keys())
        grid = np.meshgrid(*self.hyper_params.values())
        grid = np.hstack([ np.atleast_2d(g.ravel()).T for g in grid ], dtype='object')

        best_metrics = None
        best_model = None
        best_hyper_params = None
        for params in grid:
            params = dict(zip(params_names, params))
            model = self.model_class(**params)

            k_fold_trainer = KFoldTrainer(self.k, self.n_classes, self.dataset, model)
            k_fold_trainer.fit()
            accuracy = k_fold_trainer.metrics['accuracy']

            if best_model is None or accuracy > best_metrics['accuracy']:
                best_metrics = k_fold_trainer.metrics
                best_model = model
                best_hyper_params = params

        self.best_model = best_model
        self.best_hyper_params = best_hyper_params
        self.best_metrics = best_metrics


class KFoldMultiModelsTrainer(KFoldTrainer):
    '''
    Treina vários modelos separando o dataset em k partições de treinamento/teste para fazer validações cruzadas
    Chama o GridSearchTrainer para encontrar a melhor combinação de hyper parâmetros de um modelo, para cada conjunto de treinamento
    Para cada modelo, salva a média e o desvio padrão das métricas com a melhor combinação de hyper parâmetros para cada conjunto de treinamento
    '''
    def __init__(self, k: int, n_classes: int, dataset: NDArray, models: dict[str, tuple[type[Model], dict[str, list]]]):
        self.k = k
        self.n_classes = n_classes
        self.dataset = dataset
        self.models = models

    def fit(self):
        trained_models = dict()
        for model_name, value in self.models.items():
            model_class, hyper_params = value
            model_attributes = {
                'metrics_train': {
                    metric: np.zeros(self.k)
                    for metric in ['accuracy', 'precision', 'recall', 'f1_score']
                },
                'metrics_test': {
                    metric: np.zeros(self.k)
                    for metric in ['accuracy', 'precision', 'recall', 'f1_score']
                },
                'hyper_params': {
                    param: np.zeros(self.k, dtype=np.object_)
                    for param in hyper_params.keys()
                },
                'model': np.zeros(self.k, dtype=np.object_)
            }
            for i, dataset in enumerate(self._genereate_dataset_k_folds()):
                X_train, y_train, X_test, y_test = dataset
                dataset_train = np.c_[X_train, y_train]
                trainer = GridSearchTrainer(self.k, self.n_classes, dataset_train, model_class, hyper_params)
                trainer.fit()
                y_predict = trainer.best_model.predict(X_test)
                test_metrics = evaluate_metrics(y_test, y_predict)

                for metric in model_attributes['metrics_train'].keys():
                    model_attributes['metrics_train'][metric][i] = trainer.best_metrics[metric]
                    model_attributes['metrics_test'][metric][i] = test_metrics[metric]

                for param in model_attributes['hyper_params'].keys():
                    model_attributes['hyper_params'][param][i] = trainer.best_hyper_params[param]

                model_attributes['model'][i] = trainer.best_model

            trained_models[model_name] = model_attributes

        return trained_models


Funções auxiliares

In [24]:
def plot_costs(costs: NDArray, title: str):
    plt.plot(costs)
    plt.xlabel('Épocas')
    plt.ylabel('Cross Entropia')
    plt.title(f'Função Custo {title}')
    plt.show()

def extract_from_text(text: str):
        return float(text.split('%')[0])

def plot_models_metrics(trained_models: dict[str, dict[str, list[float]]]):
    table = pd.DataFrame(trained_models).T.map(lambda x: f"{np.mean(x):.2%} +- {1.96*np.std(x)/np.sqrt(len(x)):.2%}")
    table.columns = ['Accuracy', 'Precision', 'Recall', 'F1 Score']
    table.index = trained_models.keys()

    styled_table = table.style.apply(lambda col: [ 'font-weight:bold; color:red' if extract_from_text(x)==col.apply(extract_from_text).max() else '' for x in col ])
    display(styled_table)


# Questão 01 -> Items a,b

1. Lendo o dataset 'kc2.csv'
1. Treinando o modelo KNN implementado e a Árvores de Decisão do scikit-learn com o dataset
1. Definindo os possíveis valores dos hyper parâmetros para o grid search
1. Treinando modelos usando 10 partições para testes cruzados, e 10 partições para validações cruzadas
1. Plot da média e desvio padrão das métricas de cada modelo com a melhor combinação de hiper parâmetros encontrados em treinamento

In [25]:
dataset = np.genfromtxt('kc2.csv', delimiter=',')

k = 10
n_classes = 2
np.random.seed(0)
models = {
    'KNN': (KNN, {'k': [1, 5], 'distance_function': ['euclidean', 'mahalanobis']}),
    'Árvode de Decisão': (DecisionTreeClassifier, {'criterion': ['gini', 'entropy']}),
}
trainer = KFoldMultiModelsTrainer(k, n_classes, dataset, models)

trained_models = trainer.fit()
trained_models_metrics = {
    name: results['metrics_test']
    for name, results in trained_models.items()
}
plot_models_metrics(trained_models_metrics)


Unnamed: 0,Accuracy,Precision,Recall,F1 Score
KNN,75.51% +- 2.52%,75.64% +- 7.12%,75.07% +- 4.60%,74.54% +- 3.94%
Árvode de Decisão,72.16% +- 5.77%,71.33% +- 9.22%,73.44% +- 9.53%,71.35% +- 7.37%
