In [None]:
# o código a seguir tem o objetivo de carregar o arquivo contendo os dados após filtragem de linhas e colunas e após feature engeneering
import numpy as np
import pandas as pd

# carrega arquivo com dados filtrados
obj_raras_filtered = pd.read_csv(filepath_or_buffer='raras_retrospectivo_filtered.csv', sep=';', converters={"hpos": lambda x: x.strip("[]").split(", ")})
obj_raras_filtered = obj_raras_filtered.drop('Unnamed: 0', axis=1) # isso ou salvar com index=False

print(obj_raras_filtered.shape)
obj_raras_filtered.head(5)

In [None]:
# IMPORTAÇÃO DE BIBLIOTECAS E FUNÇÕES
# ferramentas básicas para manipular os dados
import numpy as np
import pandas as pd
import sklearn
# ferramenta para divisão de conjuntos de teste e treinamento
from sklearn.model_selection import train_test_split
# ferramentas para composição do fluxo dos dados
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
# ferramentas para processamento dos dados
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn import preprocessing
# ferramentas para imputação de dados
from sklearn import impute
# classificadores
from sklearn import neighbors
from sklearn import tree
from sklearn import ensemble
from sklearn import neural_network
from sklearn import linear_model
# grid para busca dos parâmetros ótimos
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import GridSearchCV, HalvingGridSearchCV, RandomizedSearchCV, HalvingRandomSearchCV
# save the trained model
import joblib
# configurações de output e warning
import warnings
warnings.filterwarnings("ignore")
from sklearn import set_config
set_config(transform_output="pandas")
# manutenção de memória
import gc

# definido funções custom
class NanReplacer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            return X.replace(np.nan, -1)
        else:
            assert isinstance(X, np.ndarray)
            where_are_NaNs = np.isnan(X)
            X[where_are_NaNs] = -1
            return X

class ColunmCleaner(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        if isinstance(X, pd.DataFrame):
            X = X.loc[:,~X.columns.str.contains('_nan', case=False)]
            return X
    def fit_transform(self, X, y=None):
        return self.transform(X)

class EmptyHPOSCleaner(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        if isinstance(X, pd.DataFrame):
            if 'multilabel_bin__' in X.columns:
                X = X.drop(['multilabel_bin__'], axis=1)
        return X
    def fit_transform(self, X, y=None):
        return self.transform(X)

class MultiHotEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.mlb = None
        self.categories_ = self.classes_ = list()
    def fit(self, X:pd.DataFrame, y=None):
        self.mlb = preprocessing.MultiLabelBinarizer().fit(X)
        self.classes_ = self.mlb.classes_
        return self
    def transform(self, X:pd.DataFrame, y=None):
        result = self.mlb.transform(X)
        result = pd.DataFrame(
            data=result, 
            columns=self.classes_, 
            index=X.index
        )
        return result
    def fit_transform(self, X, y=None):
        return self.fit(X).transform(X)
    def get_feature_names_out(self):
        return self.classes_

# PREPROCESSAMENTO DOS DADOS, TRANSFORMAÇÃO E ESCALA DE COLUNAS
# extrai coluna de classe e aplica label encoder
label_encoder = preprocessing.LabelEncoder()
y = obj_raras_filtered.diagnostico
obj_raras_filtered_nodiag = obj_raras_filtered.drop(columns=['diagnostico'])
y_encoded = label_encoder.fit_transform(y)

# separa os dados em conjunto de treinamento e teste
X_train, X_test, y_train, y_test = train_test_split(obj_raras_filtered_nodiag, y_encoded, test_size=0.25, stratify=y_encoded)

# separar as colunas com valores categóricos
categorical_features = [
    'tp_diag',
    'tp_diag_etiologico',
    'momento_diag',
    'recorrencia_familiar',
    'consang_relatada',
    'raca_cor',
    'sexo',
    'regiao_nascimento',
    'regiao_residencia',
    'internacao_previa'
]

# separar as colunas com valores numéricos
numeric_features = [
    'idade_materna_days',
    'idade_paterna_days',
    'qtd_internacoes',
    'idade_inicio_sintomas_days',
    'age_diag_days',
    'age_consulta_revisada_days',
    'age_1a_consulta_centro_days',
    'age_1a_consulta_espec_days'
]

#fazer onehot com e sem imputer
categorical_pipe_imputer = Pipeline(
    steps=[
        ('one_hot', preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
        ('clean_columns', ColunmCleaner()),
        ('imputer', impute.SimpleImputer())
    ]
)

categorical_pipe_no_imputer = Pipeline(
    steps=[
        ('one_hot', preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
        ('clean_columns', ColunmCleaner())
    ]
)

numeric_pipe_replacer = Pipeline(
    steps=[
        ('scaler', preprocessing.MinMaxScaler()),
        ('replacer', NanReplacer())
    ]
)

numeric_pipe_imputer = Pipeline(
    steps=[
        ('imputer', impute.SimpleImputer()),
        ('scaler', preprocessing.MinMaxScaler())
    ]
)

# aplica todos os transformadores nas colunas indicadas
proprocessing_ct = ColumnTransformer(
    transformers=[
        ('multilabel_bin', MultiHotEncoder(), 'hpos'),
        ('categorical_pipe', categorical_pipe_no_imputer, categorical_features),
        ('numeric_pipe', numeric_pipe_imputer, numeric_features)
    ],
    remainder='passthrough'
)

# pipeline de processamento, limpeza e classificação
classification_pipe = Pipeline(
    steps=[
        ('preprocessing_ct', proprocessing_ct),
        ('remove_hpo', EmptyHPOSCleaner()),
        ('classifier', neighbors.KNeighborsClassifier())
    ]
)

# parâmetros do classificador
param_all = [
    {
      "preprocessing_ct__numeric_pipe__scaler": [preprocessing.StandardScaler()],
      "preprocessing_ct__numeric_pipe__imputer__strategy": ["mean"],
      "preprocessing_ct__numeric_pipe__imputer": [impute.SimpleImputer()],
      "preprocessing_ct__numeric_pipe": [Pipeline(steps=[
          ('imputer', impute.SimpleImputer()), 
          ('scaler', preprocessing.StandardScaler())
        ])],
      "preprocessing_ct__multilabel_bin": [MultiHotEncoder()],
      "preprocessing_ct__categorical_pipe__imputer__strategy": ["mean"],
      "preprocessing_ct__categorical_pipe__imputer": [impute.SimpleImputer()],
      "preprocessing_ct__categorical_pipe": [Pipeline(steps=[('one_hot',
        preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
        ('clean_columns', ColunmCleaner()),
        ('imputer', impute.SimpleImputer())
        ])],
      "classifier__weights": ["distance"],
      "classifier__p": [1],
      "classifier__n_neighbors": [9],
      "classifier__algorithm": ["kd_tree"],
      "classifier": [neighbors.KNeighborsClassifier()]
    },
    {
      "preprocessing_ct__numeric_pipe__scaler": [preprocessing.MinMaxScaler()],
      "preprocessing_ct__numeric_pipe__imputer": [impute.SimpleImputer()],
      "preprocessing_ct__numeric_pipe": [Pipeline(steps=[
          ('imputer', impute.SimpleImputer(strategy='most_frequent')),
          ('scaler', preprocessing.MinMaxScaler())
      ])],
      "preprocessing_ct__multilabel_bin": [MultiHotEncoder()],
      "preprocessing_ct__categorical_pipe__imputer__weights": ["distance"],
      "preprocessing_ct__categorical_pipe__imputer__n_neighbors": [3],
      "preprocessing_ct__categorical_pipe__imputer": [impute.KNNImputer()],
      "preprocessing_ct__categorical_pipe": [Pipeline(steps=[
          ('one_hot', preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
          ('clean_columns', ColunmCleaner()),
          ('imputer', impute.KNNImputer(n_neighbors=3, weights='distance'))
        ])],
      "classifier__solver": ["saga"],
      "classifier__penalty": ["l2"],
      "classifier__l1_ratio": [0.5],
      "classifier__class_weight": [None],
      "classifier__C": [1],
      "classifier": [linear_model.LogisticRegression()]
    },
    {
      "preprocessing_ct__numeric_pipe__scaler": [preprocessing.StandardScaler()],
      "preprocessing_ct__numeric_pipe__imputer__strategy": ["mean"],
      "preprocessing_ct__numeric_pipe__imputer": [impute.SimpleImputer()],
      "preprocessing_ct__numeric_pipe": [Pipeline(steps=[
          ('imputer', impute.SimpleImputer()),
          ('scaler', preprocessing.StandardScaler())
      ])],
      "preprocessing_ct__multilabel_bin": [MultiHotEncoder()],
      "preprocessing_ct__categorical_pipe": [Pipeline(steps=[
          ('one_hot', preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
          ('clean_columns', ColunmCleaner())
      ])],
      "classifier__solver": ["adam"],
      "classifier__learning_rate": ["adaptive"],
      "classifier__hidden_layer_sizes": [100],
      "classifier__early_stopping": [True],
      "classifier__activation": ["logistic"],
      "classifier": [neural_network.MLPClassifier()]
    },
    {
      "preprocessing_ct__numeric_pipe__scaler": [preprocessing.MinMaxScaler()],
      "preprocessing_ct__numeric_pipe__replacer": [NanReplacer()],
      "preprocessing_ct__numeric_pipe": [Pipeline(steps=[
          ('scaler', preprocessing.MinMaxScaler()),
          ('replacer', NanReplacer())])
        ],
      "preprocessing_ct__multilabel_bin": [MultiHotEncoder()],
      "preprocessing_ct__categorical_pipe__imputer__weights": ["uniform"],
      "preprocessing_ct__categorical_pipe__imputer__n_neighbors": [9],
      "preprocessing_ct__categorical_pipe__imputer": [impute.KNNImputer(n_neighbors=9)],
      "preprocessing_ct__categorical_pipe": [Pipeline(steps=[
          ('one_hot', preprocessing.OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
          ('clean_columns', ColunmCleaner()),
          ('imputer', impute.KNNImputer(n_neighbors=9))
      ])],
      "classifier__n_estimators": [200],
      "classifier__max_depth": [None],
      "classifier__criterion": ["gini"],
      "classifier": [ensemble.RandomForestClassifier()]
    }
]

print('ok')

In [None]:
# cria grid com os parâmetros do objeto acima e os parametros de cross validation
grid = RandomizedSearchCV(
    classification_pipe,
    #param_grid=param_list,
    param_distributions=param_all,
    cv=5,
    refit=True,
    scoring='roc_auc_ovr_weighted',
    error_score='raise',
    #error_score=np.NINF, # ignora erros e continua grid
    n_iter=1000,
    verbose=3
)

from sklearn.utils import estimator_html_repr
html_pipe = open('models\\RandomizedSearchCV_Pipeline.html', 'w', encoding="utf-8") 
html_pipe.write(estimator_html_repr(grid))
html_pipe.close()

# limpa variáveis que não serão mais utilizadas da memória
if 'obj_raras_identificacao' in globals():
    del obj_raras_identificacao
if 'obj_raras_diagnostico' in globals():
    del obj_raras_diagnostico
if 'obj_raras_merged' in globals():
    del obj_raras_merged
if 'diagnosticos_mais_comuns' in globals():
    del diagnosticos_mais_comuns
if 'obj_raras_unused' in globals():
    del obj_raras_unused
if 'obj_raras_filtered' in globals():
    del obj_raras_filtered
if 'obj_raras_filtered' in globals():
    del obj_raras_filtered
if 'obj_raras_filtered_nodiag' in globals():
    del obj_raras_filtered_nodiag
if 'param_list' in globals():
    del param_list
gc.collect()

# cria modelos utilizando os parâmetros, retém o modelo com melhor desempenho
grid.fit(X_train, y_train)
print('------- FIT FINISHED -------')

# cria arquivo e armazena o melhor modelo
joblib.dump(grid.best_estimator_, 'raras_grid_best_estimator_all.sav')

# cria arquivo com os melhores parâmetros encontrados na otimização
joblib.dump(grid.best_params_, 'raras_grid_best_params_all.pkl', compress = 1)

# salva arquivo csv dos resultados
grid_results = pd.DataFrame(grid.cv_results_)
grid_results.to_csv(path_or_buf='grid_results_all.csv', sep=';')

print('------- SAVING FILES ENDED -------')

In [None]:
# classification metrics
from sklearn import metrics
# Plot data
import matplotlib.pyplot as plt
# pretty print
import json
# open the trained model
import joblib
# transform test for ROC curve
from sklearn.preprocessing import LabelBinarizer

def print_model_metrics(model, X_test, y_test):
    
    # testa modelo
    y_pred = model.predict(X_test)
    y_score = model.predict_proba(X_test)
    
    # imprimi relatorio de métricas por classe
    print(
        metrics.classification_report(
            label_encoder.inverse_transform(y_test),
            label_encoder.inverse_transform(y_pred),
            target_names=label_encoder.inverse_transform(model.classes_),
            labels=label_encoder.inverse_transform(model.classes_),
            digits=3
        )
    )
    
    # imprime principais métricas
    print('Accuracy: %.3f' % metrics.accuracy_score(y_test, y_pred))
    print('Balanced Accuracy: %.3f' % metrics.balanced_accuracy_score(y_test, y_pred))
    print('Top K=2 Accuracy: %.3f' % metrics.top_k_accuracy_score(y_test, y_score, k=2, labels=model.classes_))
    
    print('Precision (micro): %.3f' % metrics.precision_score(y_test, y_pred, average='micro'))
    print('Precision (macro): %.3f' % metrics.precision_score(y_test, y_pred, average='macro'))
    print('Precision (weighted): %.3f' % metrics.precision_score(y_test, y_pred, average='weighted'))
    
    print('Recall (micro): %.3f' % metrics.recall_score(y_test, y_pred, average='micro'))
    print('Recall (macro): %.3f' % metrics.recall_score(y_test, y_pred, average='macro'))
    print('Recall (weighted): %.3f' % metrics.recall_score(y_test, y_pred, average='weighted'))
    
    print('F1 (micro): %.3f' % metrics.f1_score(y_test, y_pred, average='micro'))
    print('F1 (macro): %.3f' % metrics.f1_score(y_test, y_pred, average='macro'))
    print('F1 (weighted): %.3f' % metrics.f1_score(y_test, y_pred, average='weighted'))
    
    print('F-beta (micro): %.3f' % metrics.fbeta_score(y_test, y_pred, average='micro', beta=0.5))
    print('F-beta (macro): %.3f' % metrics.fbeta_score(y_test, y_pred, average='macro', beta=0.5))
    print('F-beta (weighted): %.3f' % metrics.fbeta_score(y_test, y_pred, average='weighted', beta=0.5))
    
    print('Hamming Loss %.3f' % metrics.hamming_loss(y_test, y_pred))
    print('Zero One Loss %.3f' % metrics.zero_one_loss(y_test, y_pred))
    print('Log Loss %.3f' % metrics.log_loss(y_test, y_score, labels=model.classes_))
    
    print('Matthews Correlation Coefficient (MCC): %.3f' % metrics.matthews_corrcoef(y_test, y_pred))
    
    print('Jaccard similarity coefficient score (micro) %3f' % metrics.jaccard_score(y_test, y_pred, average='micro'))
    print('Jaccard similarity coefficient score (macro) %3f' % metrics.jaccard_score(y_test, y_pred, average='macro'))
    
    print('ROC AUC (ovr micro): %.3f' % metrics.roc_auc_score(y_test, y_score, multi_class='ovr', average='micro', labels=model.classes_))
    
    print('ROC AUC (ovo macro): %.3f' % metrics.roc_auc_score(y_test, y_score, multi_class='ovo', average='macro', labels=model.classes_))
    print('ROC AUC (ovo weighted): %.3f' % metrics.roc_auc_score(y_test, y_score, multi_class='ovo', average='weighted', labels=model.classes_))
    
    # imprime matriz de confusão
    print('Confusion Matrix:')
    cm = metrics.confusion_matrix(y_test, y_pred, labels=model.classes_)
    disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=label_encoder.inverse_transform(model.classes_))
    disp.plot(xticks_rotation='vertical')
    
    # ROC curve for all casses
    label_binarizer = LabelBinarizer().fit(label_encoder.inverse_transform(y_train))
    y_onehot_test = label_binarizer.transform(label_encoder.inverse_transform(y_test))
    
    for class_of_interest in label_binarizer.classes_:
        class_id = np.flatnonzero(label_binarizer.classes_ == class_of_interest)[0]
    
        metrics.RocCurveDisplay.from_predictions(
            y_onehot_test[:, class_id],
            y_score[:, class_id],
            name=f"{class_of_interest} vs the rest",
            color="darkorange",
        )
        plt.plot([0, 1], [0, 1], "k--", label="chance level (AUC = 0.5)")
        plt.axis("square")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title(f"One-vs-Rest ROC curves:\n{class_of_interest} vs the rest")
        plt.legend(loc='lower center', bbox_to_anchor=(0.5, -0.3, 0, 0))
        plt.show()
        
        metrics.PrecisionRecallDisplay.from_predictions(
            y_onehot_test[:, class_id],
            y_score[:, class_id],
            name=f"{class_of_interest} vs the rest",
            color="darkblue",
        )
        plt.legend(loc='lower center', bbox_to_anchor=(0.5, -0.3, 0, 0))
        plt.show()

print('ok')

In [None]:
# TESTA DADOS DO PROSPECTIVO SOMENTE

# load the model from disk
loaded_model = joblib.load('raras_grid_best_estimator_all.sav')

# carrega arquivo com dados filtrados do prospectivo
obj_raras_som_prosp_filtered = pd.read_csv(filepath_or_buffer='raras_somente_prospectivo_filtered.csv', sep=';', converters={"hpos": lambda x: x.strip("[]").split(", ")})
obj_raras_som_prosp_filtered = obj_raras_som_prosp_filtered.drop('Unnamed: 0', axis=1) # isso ou salvar com index=False

# marcando como "outros" diagnósticos que não estavam no conjunto de treinamento
obj_raras_som_prosp_filtered.loc[obj_raras_som_prosp_filtered.diagnostico.isin(label_encoder.classes_) == False, ('diagnostico')] = 'Outros ORPHAs'

# extrai coluna de classe e aplica label encoder
y_som_prosp = obj_raras_som_prosp_filtered.diagnostico
y_som_prosp = label_encoder.transform(y_som_prosp)
X_som_prosp = obj_raras_som_prosp_filtered.drop(columns=['diagnostico'])
X_som_prosp['internacao_previa'] = X_som_prosp['internacao_previa'].astype(object)

print(obj_raras_som_prosp_filtered.diagnostico.value_counts())

# resultados utilizando dados do prospectivo
print_model_metrics(loaded_model, X_som_prosp, y_som_prosp)