In [1]:
from IPython import get_ipython
from IPython.display import display

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
DATASET_PATH = '/content/drive/MyDrive/Doutorado/Pesquisa/datasets/'
PROJECT_PATH = '/content/drive/MyDrive/Doutorado/Pesquisa/'
DATASET_NAME = "hosp_dengue_23_sample"
TARGET = "Hospitalization"
SAMPLE = "amostra"

In [4]:
!pip install gower



In [5]:
!pip install imbalanced-learn xgboost scikit-learn



In [6]:
!pip install pyhard



In [7]:
import pandas as pd
import itertools
import logging
import gower
import collections
import numpy as np
import sklearn.datasets
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn.datasets
import math
import os
import xgboost as xgb

from sklearn.model_selection import train_test_split
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.stats import iqr
from sklearn import tree
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LinearRegression
from sklearn.metrics import DistanceMetric
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from abc import ABC, abstractmethod
from sklearn.datasets import make_blobs
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler   # For oversampling
from xgboost import XGBClassifier
from sklearn.metrics import classification_report
from sklearn.impute import KNNImputer
from pyhard.classification import ClassifiersPool
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [8]:
base_bst = XGBClassifier(
learning_rate=0.01,
n_estimators=1000,
max_depth=8,
min_child_weight=1,
gamma=0,
subsample=0.8,
colsample_bytree=0.8,
objective='binary:logistic',
eval_metric='logloss',
scale_pos_weight=1,
seed=27
)

colors = sns.color_palette("dark")

rosa = colors[6]

# **Importação da base treino/validação**

In [9]:
df = pd.read_csv(DATASET_PATH + DATASET_NAME + '.csv',sep=',',encoding='utf-8')
amostra = df.sample(n=1500,random_state=51)
amostra.to_csv(DATASET_PATH + SAMPLE + '1.csv', index=False)

## Divisão das bases de treino e validação

In [10]:
class SamplesBuilder:
    def __init__(self, csv_path, name, splits_dir="output_splits"):
        """
        Initialize the SamplesBuilder with a CSV path and output directory.

        Parameters:
        - csv_path (str): Path to the input CSV file.
        - output_dir (str): Directory to save the generated splits.
        """
        self.csv_path = csv_path
        self.splits_dir = splits_dir
        self.data = pd.read_csv(csv_path)
        self.splits = {}  # To store train/validation splits for each seed
        self.name = name

        # Ensure the output directory exists
        os.makedirs(self.splits_dir, exist_ok=True)

    @staticmethod
    def impute_missing(train, n_neighbors=3):
        """
        Static method to impute missing values using the K-nearest neighbors algorithm.
        """
        imputer = KNNImputer(n_neighbors=n_neighbors)
        imputed_data = imputer.fit_transform(train)
        imputed_df = pd.DataFrame(imputed_data, columns=train.columns, index=train.index)
        return imputed_df

    def split_samples(self, seeds=[42, 43, 44, 45, 46]):
        """
        Split the data into train and validation sets using multiple seeds and save them to CSV.

        Parameters:
        - seeds (list): List of random seeds to generate splits.
        """
        for seed in seeds:
            train, val = train_test_split(self.data, test_size=0.3, random_state=seed)

            #train = self.ih_measure(train)

            self.splits[seed] = {'train': train, 'validation': val}

            # Save splits as CSV
            train_path = os.path.join(self.splits_dir, f"train_{self.name}_seed_{seed}.csv")
            val_path = os.path.join(self.splits_dir, f"validation_{self.name}_seed_{seed}.csv")

            train.to_csv(train_path, index=False)
            val.to_csv(val_path, index=False)


        print(f"Splits created and saved in '{self.splits_dir}' for seeds: {seeds}")

In [11]:
# Instantiate the class
builder = SamplesBuilder(DATASET_PATH + SAMPLE + '1.csv', SAMPLE + '1', splits_dir= DATASET_PATH + "splits")

# Generate splits and save them
builder.split_samples()

Splits created and saved in '/content/drive/MyDrive/Doutorado/Pesquisa/datasets/splits' for seeds: [42, 43, 44, 45, 46]


# **Criação das IHMs**

## Criação das classes que calculam as IHM unlabeled

In [12]:
class Measures(ABC):
    """
    Base class for measures (aka meta-features). Each measure should be implemented as a separate method.
    """

    _measures_dict: dict

    @property
    def logger(self):
        raise NotImplementedError

    def _call_method(self, name, **kwargs):
        return getattr(self, name)(**kwargs)

    def calculate_all(self, measures_list=None):
        if measures_list is None:
            measures_list = self._measures_dict.keys()
        elif isinstance(measures_list, list):
            measures_list = sorted(list(set(measures_list) & set(self._measures_dict.keys())))
        else:
            raise TypeError(f"Expected type list for parameter 'measures_list', not '{type(measures_list)}'")

        results = collections.OrderedDict()
        for k in measures_list:
            self.logger.info(f"Calculating measure {repr(k)}")
            results[k] = self._call_method(self._measures_dict[k])

        df_measures = pd.DataFrame(results)
        return df_measures.add_prefix('feature_')

In [13]:
def minmax(f: np.ndarray, y: np.ndarray) -> float:
    r"""
    For binary classes, calculates :math:`\min \max (f_i) = \min ( \max (f^{c_1}_i), \max (f^{c_2}_i) )`, where
    :math:`f^{c_j}_i` is the i-th feature values for members of class :math:`c_j`.

    Args:
        f (array-like): i-th feature vector
        y (array-like): corresponding class vector

    Returns:
        float: minmax value

    Raises:
        AssertionError: If classes are not binary

    """
    classes = np.unique(y)
    assert len(classes) == 2
    c1 = classes[0]
    c2 = classes[1]
    return min(np.max(f[y == c1]), np.max(f[y == c2]))

def maxmin(f: np.ndarray, y: np.ndarray):
    r"""
    For binary classes, calculates :math:`\max \min (f_i) = \max ( \min (f^{c_1}_i), \min (f^{c_2}_i) )`, where
    :math:`f^{c_j}_i` is the i-th feature values for members of class :math:`c_j`.

    Args:
        f (array-like): i-th feature vector
        y (array-like): corresponding class vector

    Returns:
        float: maxmin value

    Raises:
        AssertionError: If classes are not binary

    """
    classes = np.unique(y)
    assert len(classes) == 2
    c1 = classes[0]
    c2 = classes[1]
    return max(np.min(f[y == c1]), np.min(f[y == c2]))

class ClassificationMeasures(Measures):
    """
    Hardness measures for classification. It provides separate methods to compute each measure.

    Args:
        data (pd.DataFrame): a dataframe where each line is an instace and columns are features. One column should
            contain the labels. The name of the column with labels can be set with parameter `labels_col`
        target_col (str): name of the column that contains the labels of the instances (default None - uses the
            last column)
        ccp_alpha (float): pruning parameter for pruned tree measures. If none is passed, then it attempts to tune
            it automatically
    """

    _measures_dict = {
        'kDNadj': 'k_disagreeing_neighbors_adjusted',
        'CLDadj': 'class_likeliood_diff_adjusted',
        'DCPadj': 'disjunct_class_percentage_adjusted',
        'DSadj': 'disjunct_size_adjusted',
        'TD_Padj': 'tree_depth_pruned_adjusted',
        'TD_Uadj': 'tree_depth_unpruned_adjusted',
#        'F1adj': 'f1_adjusted',
        'N2adj': 'intra_extra_ratio_adjusted'
    }

    logger = logging.getLogger(__name__)

    def __init__(self, data: pd.DataFrame, target_col=None, ccp_alpha=None):
        if target_col is None:
            self.target_col = data.columns[-1]
            self.y = data.iloc[:, -1]
        else:
            self.target_col = target_col
            self.y = data[target_col]
        self.data = data.reset_index(drop=True)
        self.X = data.drop(columns=self.target_col)
        self.N = len(data)

        seed = np.random.seed(55)

        # Gower distance matrix
        self.dist_matrix_gower = gower.gower_matrix(self.X.values.copy())
        #self.dist_matrix_gower = gower_distance(self.X)
        delta = np.diag(-np.ones(self.dist_matrix_gower.shape[0]))
        self.indices_gower = np.argsort(self.dist_matrix_gower + delta, axis=1)
        self.distances_gower = np.sort(self.dist_matrix_gower, axis=1)

        self.dot = None

        # Naive Bayes classifier
        n_c = self.y.nunique()
        priors = np.ones((n_c,)) / n_c

        nb = GaussianNB()#priors=priors
        self.calibrated_nb = CalibratedClassifierCV(
            estimator=nb,
            method='sigmoid',
            cv=3,
            ensemble=False,
            n_jobs=-1
        )
        self.calibrated_nb.fit(self.X, self.y)

###################################################################################################################################

    def k_disagreeing_neighbors_adjusted(self, k: int = 10, distance: str = 'gower') -> np.ndarray:
        r"""
        k-Disagreeing Neighbors Adjusted (kDNadj) gives the percentage of the :math:`k` nearest neighbors of :math:`\\mathbf x_i`
        which do not share its label.

        .. math::

            kDNadj(\mathbf{x_i}) = \frac{ \sharp \{\mathbf x_j | \mathbf x_j \in kNN(\mathbf x_i) \wedge y_j
            \neq y_i\}}{k}

        Args:
            k (int): number of neighbors
            distance (str): distance metric (default 'gower')

        Returns:
            array-like: :math:`kDNadj(\mathbf x_i)`
        """
        data = self.data.copy()
        if distance == 'gower':
            indices = self.indices_gower[:, :k + 1]
        else:
            nbrs = NearestNeighbors(n_neighbors=k + 1, algorithm='auto').fit(self.X)
            distances, indices = nbrs.kneighbors(self.X)

        unique_values = pd.unique(data[self.target_col])
        C = [[] for _ in unique_values]
        vl_unico = sorted(unique_values.tolist())

        for j in vl_unico:
          for i in range(0, len(data)):
              v = data.loc[indices[i]][self.target_col].values
              #v[i] = None
              C[j].append(np.sum(v[1:] == vl_unico[j]) / k)

        kDNadj = []
        for i in range(len(C[0])):
          entropy = 0.0
          p = [c[i] for c in C]
          entropy -= sum(p * math.log2(p) if p > 0 else 0 for p in p)
          kDNadj.append(entropy)

        return kDNadj

###################################################################################################################################

    def intra_extra_ratio_adjusted(self, distance='gower') -> np.ndarray:
        r"""
        Ratio of the intra-class and extra-class distances (N2): first the ratio of the distance of :math:`\mathbf x_i`
        to the nearest example from its class to the distance it has to the nearest instance from a different class
        (aka nearest enemy) is computed:

        .. math::

            IntraInter(\mathbf x_i) = \frac{d(\mathbf x_i,NN(\mathbf x_i) \in y_i)}{d(\mathbf x_i, ne(\mathbf x_i))}

        where :math:`NN(\mathbf x_i)` represents a nearest neighbor of :math:`\mathbf x_i` and :math:`ne(\mathbf x_i)`
        is the nearest enemy of :math:`\mathbf x_i`:

        .. math::

            ne(\mathbf x_i) = NN(\mathbf x_i) \in y_j \neq y_i

        Then :math:`N_2` is taken as:

        .. math::

            N_2(\mathbf x_i) = 1 - \frac{1}{IntraInter(\mathbf x_i) + 1}

        Larger values of :math:`N2(\mathbf x_i)` indicate that the instance :math:`\mathbf x_i` is closer to an example
        from another class than to an example from its own class and is, therefore, harder to classify.

        Args:
            distance (str): the distance metric to use (default `'gower'`). See `this link
                <https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.DistanceMetric.html
                #sklearn.neighbors.DistanceMetric>`_ for a list of available metrics.

        Returns:
            array-like: :math:`N_2(\mathbf x_i)`
        """
        y = self.y.copy()

        if distance == 'gower':
            indices = self.indices_gower
            distances = self.distances_gower
        else:
            nbrs = NearestNeighbors(n_neighbors=len(self.y), algorithm='auto', metric=distance).fit(self.X)
            distances, indices = nbrs.kneighbors(self.X)

        N2 = np.zeros(y.values.shape)
        for i, label in y.items():
            nn = y.loc[indices[i, :]]
            nn1 = nn.iloc[1]
            intra = nn.eq(nn)
            extra = nn.ne(nn1)
            assert np.all(np.diff(distances[i, intra]) >= 0)
            assert np.all(np.diff(distances[i, extra]) >= 0)
            N2[i] = distances[i, intra][1] / max(distances[i, extra][1], 1e-15)
        return N2

###################################################################################################################################

    def class_likeliood_diff_adjusted(self) -> np.ndarray:
        r"""
        Class Likelihood Difference (CLD) takes the difference between the likelihood of :math:`\mathbf x_i` in
        relation to its class and the maximum likelihood it has to any other class.

        .. math::

            CLD(\mathbf x_i) = \frac{1 -\left (P(\mathbf x_i|y_i)P(y_i) - \max_{y_j \neq y_i}
            [P(\mathbf x_i |y_j)P(y_j)]\right )}{2}

        The difference in the class likelihood is larger for easier instances, because the confidence it belongs to its
        class is larger than that of any other class. We take the complement of the measure as indicated in the
        equation above.

        Returns:
            array-like: :math:`CLD(\mathbf x_i)`
        """
        data = self.data.copy()
        proba = self.calibrated_nb.predict_proba(self.X)
        array_of_lists = np.array(proba)

        max_indices = np.argsort(array_of_lists, axis=1)[:, -2:]
        largest_values = array_of_lists[np.arange(array_of_lists.shape[0])[:, None], max_indices]
        subtraction_result = (largest_values[:, 0] - largest_values[:, 1]) * (-1)

        resultados = subtraction_result.reshape(-1)

        return (1 - np.array(resultados)) / 2


###################################################################################################################################
###################################################################################################################################

    def disjunct_class_percentage_adjusted(self,target_col=None,ccp_alpha=None) -> np.ndarray:
        r"""
        Disjunct Class Percentage (DCP) builds a decision tree using :math:`\mathcal{D}` and considers the percentage
        of instances in the disjunct of :math:`\mathbf x_i` which share the same label as :math:`\mathbf x_i`.
        The disjunct of an example corresponds to the leaf node where it is classified by the decision tree.

        .. math::

            DCP(\mathbf x_i) = 1- \frac{\sharp\{\mathbf x_j | \mathbf x_j \in Disjunct(\mathbf x_i) \wedge y_j = y_i\}}
            {\sharp\{\mathbf x_j|\mathbf x_j \in Disjunct(\mathbf x_i)\}}

        Returns:
            array-like: :math:`DCP(\mathbf x_i)`
        """
        data = self.data.copy()
        seed = np.random.seed(55)

#        for i, row in data.iterrows():
#          df_inter = data.copy()
          #test_sample = row.to_frame().T
#          train_sample = data.drop(i)

          #rodar a árvore para estimar y e os parâmetros (gridsearch)
#          if target_col is None:
#            target_col = train_sample.columns[-1]
#            y = train_sample.iloc[:, -1]
#          else:
#            target_col = target_col
#            y = train_sample[target_col]
#          train_sample = train_sample.reset_index(drop=True)
#          X = train_sample.drop(columns=target_col)

          # Decision Tree Classifier Pruned
#          if ccp_alpha is None:
#            parameters = {'ccp_alpha': np.linspace(0.001, 0.1, num=100)}
#            dtc = tree.DecisionTreeClassifier(criterion='gini', random_state=seed)
#            clf = GridSearchCV(dtc, parameters, n_jobs=-1)
#            clf.fit(X.values, y.values)
#            ccp_alpha = clf.best_params_['ccp_alpha']

#          dtc_pruned = tree.DecisionTreeClassifier(criterion='gini', ccp_alpha=ccp_alpha, random_state=seed)
#          dtc_pruned = dtc_pruned.fit(X.values, y.values)

#          test_sample = data.iloc[i,:-1].to_frame().T
#          test_sample['severity'] = dtc_pruned.predict(test_sample)

          #devolver o valor estimado de y como valor na mesma variável y original (código abaixo faz isso?)
#          df_inter.loc[i] = test_sample.iloc[0] # Adicionar a observação excluída na mesma posição do df
#          df_dcp = df_inter # Adicionar o DataFrame à lista

          #rodar a metodologia do HM com a estimativa do y
          #Decision Tree Classifier Pruned
        df_dcp = data.copy()
        if target_col is None:
          target_col = df_dcp.columns[-1]
          y = df_dcp.iloc[:, -1]
        else:
          target_col = target_col
          y = df_dcp[target_col]
        df_dcp = df_dcp.reset_index(drop=True)
        X = df_dcp.drop(columns=target_col)

        if ccp_alpha is None:
            parameters = {'ccp_alpha': np.linspace(0.001, 0.1, num=100)}
            dtc = tree.DecisionTreeClassifier(criterion='gini', random_state=seed)
            clf = GridSearchCV(dtc, parameters, n_jobs=-1)
            clf.fit(X.values, y.values)
            ccp_alpha = clf.best_params_['ccp_alpha']

        dtc_pruned = tree.DecisionTreeClassifier(criterion='gini', ccp_alpha=ccp_alpha, random_state=seed)
        dtc_pruned = dtc_pruned.fit(X.values, y.values)

        df_dcp['leaf_id'] = dtc_pruned.apply(X.values)

        unique_values = pd.unique(y)
        C = [[] for _ in unique_values]
        vl_unico = sorted(unique_values.tolist())

        for j in vl_unico:
            for index, row in df_dcp.iterrows():
              df_leaf = df_dcp[df_dcp['leaf_id'] == row['leaf_id']]
              if row['Hospitalization'] == vl_unico[j]:
                C[j].append(len(df_leaf[df_leaf['Hospitalization'] == row['Hospitalization']]) / len(df_leaf))
              else:
                C[j].append(len(df_leaf[df_leaf['Hospitalization'] == vl_unico[j]]) / len(df_leaf))

        DCPadj = []
        for n in range(len(C[0])):
          entropy = 0.0
          p = [c[n] for c in C]
          entropy -= sum(p * math.log2(p) if p > 0 else 0 for p in p)
          DCPadj.append(entropy)
        return DCPadj


###################################################################################################################################

    def disjunct_size_adjusted(self,target_col=None) -> np.ndarray:

        data = self.data.copy()
        seed = np.random.seed(55)

#        for i, row in data.iterrows():
        df_ds = data.copy()
        train_sample = data.drop(data.index[-1])

#          #rodar a árvore para estimar y e os parâmetros (gridsearch)
#          if target_col is None:
#            target_col = train_sample.columns[-1]
#            y = train_sample.iloc[:, -1]
#          else:
#            target_col = target_col
#            y = train_sample[target_col]
#          train_sample = train_sample.reset_index(drop=True)
#          X = train_sample.drop(columns=target_col)

          # Decision Tree Classifier
#          dtc = tree.DecisionTreeClassifier(min_samples_split=2, criterion='gini', random_state=seed)
#          dtc = dtc.fit(X.values, y.values)

#          test_sample = data.iloc[i,:-1].to_frame().T
#          test_sample['severity'] = dtc.predict(test_sample)

          #devolver o valor estimado de y como valor na mesma variável y original (código abaixo faz isso?)
#          df_inter.loc[i] = test_sample.iloc[0] # Adicionar a observação excluída na mesma posição do df
#          df_ds = df_inter # Adicionar o DataFrame à lista

          #rodar a metodologia do HM com a estimativa do y
          #Decision Tree Classifier
        if target_col is None:
          target_col = df_ds.columns[-1]
          y = df_ds.iloc[:, -1]
        else:
          target_col = target_col
          y = df_ds[target_col]
        df_ds = df_ds.reset_index(drop=True)
        X = df_ds.drop(columns=target_col)

        dtc = tree.DecisionTreeClassifier(min_samples_split=2, criterion='gini', random_state=seed)
        dtc = dtc.fit(X.values, y.values)

        df_ds['leaf_id'] = dtc.apply(X.values)
        df_count = df_ds.groupby('leaf_id').count().iloc[:, 0].to_frame('count').subtract(1) #iloc referencia a posição da linha/coluna. loc retorna o label da linha/coluna
          #aqui, iloc fixando primeira coluna, varrendo todas as linhas
        df_ds = df_ds.join(df_count, on='leaf_id')
        DSadj = df_ds['count'].divide(df_ds['count'].max())
          #pd.set_option('display.max_rows', data.shape[0]+1)
          #print(data)

        return 1 - DSadj.values

###################################################################################################################################


    def tree_depth_unpruned_adjusted(self,target_col=None) -> np.ndarray:
        r"""
        Tree Depth (TD) returns the depth of the leaf node that classifies :math:`\mathbf x_i` in a  decision tree,
        normalized by the maximum depth of the tree built from :math:`D`:

        .. math::

            TD(\mathbf x_i) = \frac{depth(\mathbf x_i)}{\max(depth(D))}

        There are two versions of this measure, using pruned (:math:`TD_P(\mathbf x_i)`)
        and unpruned (:math:`TD_U(\mathbf x_i)`) decision trees. Instances harder to classify tend to be placed
        at deeper levels of the trees and present higher :math:`TD` values.

        Returns:
            array-like: :math:`TD_U(\mathbf x_i)`
        """
        data = self.data.copy()
        seed = np.random.seed(55)

        #for i, row in data.iterrows():
        df_ds = data.copy()
        train_sample = data.drop(data.index[-1])

          #rodar a árvore para estimar y e os parâmetros (gridsearch)
#          if target_col is None:
#            target_col = train_sample.columns[-1]
#            y = train_sample.iloc[:, -1]
#          else:
#            target_col = target_col
#            y = train_sample[target_col]
#          train_sample = train_sample.reset_index(drop=True)
#          X = train_sample.drop(columns=target_col)

          # Decision Tree Classifier
#          dtc = tree.DecisionTreeClassifier(min_samples_split=2, criterion='gini', random_state=seed)
#          dtc = dtc.fit(X.values, y.values)

#          test_sample = data.iloc[i,:-1].to_frame().T
#          test_sample['severity'] = dtc.predict(test_sample)

          #devolver o valor estimado de y como valor na mesma variável y original (código abaixo faz isso?)
#          df_inter.loc[i] = test_sample.iloc[0] # Adicionar a observação excluída na mesma posição do df
#          df_ds = df_inter # Adicionar o DataFrame à lista

          #rodar a metodologia do HM com a estimativa do y
          #Decision Tree Classifier
        if target_col is None:
          target_col = df_ds.columns[-1]
          y = df_ds.iloc[:, -1]
        else:
          target_col = target_col
          y = df_ds[target_col]
        df_ds = df_ds.reset_index(drop=True)
        X = df_ds.drop(columns=target_col)

        dtc = tree.DecisionTreeClassifier(min_samples_split=2, criterion='gini', random_state=seed)
        dtc = dtc.fit(X.values, y.values)

        TDUadj = X.apply(lambda x: dtc.decision_path([x]).sum() - 1, axis=1, raw=True).values / dtc.get_depth()

        return TDUadj

###################################################################################################################################

    def tree_depth_pruned_adjusted(self,target_col=None,ccp_alpha=None) -> np.ndarray:
        r"""
        Tree Depth (TD) returns the depth of the leaf node that classifies :math:`\mathbf x_i` in a  decision tree,
        normalized by the maximum depth of the tree built from :math:`D`:

        .. math::

            TD(\mathbf x_i) = \frac{depth(\mathbf x_i)}{\max(depth(D))}

        There are two versions of this measure, using pruned (:math:`TD_P(\mathbf x_i)`)
        and unpruned (:math:`TD_U(\mathbf x_i)`) decision trees. Instances harder to classify tend to be placed
        at deeper levels of the trees and present higher :math:`TD` values.

        Returns:
            array-like: :math:`TD_P(\mathbf x_i)`
        """

        data = self.data.copy()
        seed = np.random.seed(55)

#        for i, row in data.iterrows():
        df_dcp = data.copy()
          #test_sample = row.to_frame().T
        train_sample = data.drop(data.index[-1])

          #rodar a árvore para estimar y e os parâmetros (gridsearch)
#          if target_col is None:
#            target_col = train_sample.columns[-1]
#            y = train_sample.iloc[:, -1]
#          else:
#            target_col = target_col
#            y = train_sample[target_col]
#          train_sample = train_sample.reset_index(drop=True)
#          X = train_sample.drop(columns=target_col)

          # Decision Tree Classifier Pruned
#          if ccp_alpha is None:
#            parameters = {'ccp_alpha': np.linspace(0.001, 0.1, num=100)}
#            dtc = tree.DecisionTreeClassifier(criterion='gini', random_state=seed)
#            clf = GridSearchCV(dtc, parameters, n_jobs=-1)
#            clf.fit(X.values, y.values)
#            ccp_alpha = clf.best_params_['ccp_alpha']

#          dtc_pruned = tree.DecisionTreeClassifier(criterion='gini', ccp_alpha=ccp_alpha, random_state=seed)
#          dtc_pruned = dtc_pruned.fit(X.values, y.values)

#          test_sample = data.iloc[i,:-1].to_frame().T
#          test_sample['severity'] = dtc_pruned.predict(test_sample)

          #devolver o valor estimado de y como valor na mesma variável y original (código abaixo faz isso?)
#          df_inter.loc[i] = test_sample.iloc[0] # Adicionar a observação excluída na mesma posição do df
#          df_dcp = df_inter # Adicionar o DataFrame à lista

          #rodar a metodologia do HM com a estimativa do y
          #Decision Tree Classifier Pruned
        if target_col is None:
          target_col = df_dcp.columns[-1]
          y = df_dcp.iloc[:, -1]
        else:
          target_col = target_col
          y = df_dcp[target_col]
        df_dcp = df_dcp.reset_index(drop=True)
        X = df_dcp.drop(columns=target_col)

        if ccp_alpha is None:
            parameters = {'ccp_alpha': np.linspace(0.001, 0.1, num=100)}
            dtc = tree.DecisionTreeClassifier(criterion='gini', random_state=seed)
            clf = GridSearchCV(dtc, parameters, n_jobs=-1)
            clf.fit(X.values, y.values)
            ccp_alpha = clf.best_params_['ccp_alpha']

        dtc_pruned = tree.DecisionTreeClassifier(criterion='gini', ccp_alpha=ccp_alpha, random_state=seed)
        dtc_pruned = dtc_pruned.fit(X.values, y.values)

        TDPadj = X.apply(lambda x: dtc_pruned.decision_path([x]).sum() - 1, axis=1, raw=True).values / dtc_pruned.get_depth()

        return TDPadj


## Aplicação nos conjuntos de treino

In [14]:
df_hm_seed = []
for seed, splits in builder.splits.items():
    #modified_train_sets = []
    validation_data = splits['validation']

    for index, row in validation_data.iterrows():
        current_train = splits['train'].copy()
        current_train = pd.concat([current_train, pd.DataFrame([row])], ignore_index=True)
        #modified_train_sets.append(current_train)

        # Separate numerical features for KNN imputation
        numerical_features = current_train.select_dtypes(include=np.number).columns.tolist()
        modified_train_set_numerical = current_train[numerical_features]

        # Apply KNN imputation to the numerical features
        imputed_modified_train_set = SamplesBuilder.impute_missing(modified_train_set_numerical)

        # Replace the original numerical columns with the imputed ones
        current_train[numerical_features] = imputed_modified_train_set
        current_train['Hospitalization'] = current_train['Hospitalization'].astype(int)

        m = ClassificationMeasures(current_train)
        df_meta_feat = m.calculate_all()
        df_hm = pd.DataFrame(df_meta_feat, columns=['feature_kDNadj','feature_N2adj','feature_CLDadj','feature_DCPadj','feature_DSadj','feature_TD_Uadj','feature_TD_Padj'])

        # Get the last row of the current df_hm
        last_row = df_hm.tail(1).copy()

        # Add a column to identify the seed
        last_row['seed'] = seed

        # Append the last row to the list of DataFrames to stack
        df_hm_seed.append(last_row)

# Concatenate all the DataFrames in the list into a single DataFrame
df_hm_final = pd.concat(df_hm_seed, ignore_index=True)

# Now you have 'stacked_df' which contains the last row of each df_hm with seed information.
print(df_hm_final.head())


[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
INFO:__main__:Calculating measure 'TD_Uadj'
INFO:__main__:Calculating measure 'N2adj'
INFO:__main__:Calculating measure 'kDNadj'
INFO:__main__:Calculating measure 'CLDadj'
INFO:__main__:Calculating measure 'DCPadj'
INFO:__main__:Calculating measure 'DSadj'
INFO:__main__:Calculating measure 'TD_Padj'
INFO:__main__:Calculating measure 'TD_Uadj'
INFO:__main__:Calculating measure 'N2adj'
INFO:__main__:Calculating measure 'kDNadj'
INFO:__main__:Calculating measure 'CLDadj'
INFO:__main__:Calculating measure 'DCPadj'
INFO:__main__:Calculating measure 'DSadj'
INFO:__main__:Calculating measure 'TD_Padj'
INFO:__main__:Calculating measure 'TD_Uadj'
INFO:__main__:Calculating measure 'N2adj'
INFO:__main__:Calculating measure 'kDNadj'
INFO:__main__:Calculating measure 'CLDadj'
INFO:__main__:Calculating measure 'DCPadj'
INFO:__main__:Calculating measure 'DSadj'
INFO:__main__:Calculating measure 'TD_Padj'
INFO:__main__:Calculatin

   feature_kDNadj  feature_N2adj  feature_CLDadj  feature_DCPadj  \
0        0.881291       0.840417        0.165807        0.778768   
1        0.881291       0.929869        0.240814        0.982474   
2        0.970951       0.807577        0.166037        0.778768   
3        0.970951       0.910549        0.194344        0.786256   
4        0.468996       0.685558        0.165649        0.123950   

   feature_DSadj  feature_TD_Uadj  feature_TD_Padj  seed  
0       0.949275         0.368421         0.250000    42  
1       0.978261         0.684211         0.666667    42  
2       0.942029         0.578947         0.250000    42  
3       1.000000         0.526316         0.250000    42  
4       0.601449         0.210526         0.250000    42  


IP 161.24.238.38 servidor da professora

## Base com as metafeatures

In [15]:
df_hm_final.to_csv(PROJECT_PATH + 'df_hm_' + DATASET_NAME + SAMPLE + '1.csv', sep=',', encoding='utf-8')

## Estima classes para base de metafeatures via XGBoost

In [None]:
class SamplesBuilder:
    def __init__(self, csv_path, name, splits_dir="output_splits"):
        """
        Initialize the SamplesBuilder with a CSV path and output directory.

        Parameters:
        - csv_path (str): Path to the input CSV file.
        - output_dir (str): Directory to save the generated splits.
        """
        self.csv_path = csv_path
        self.splits_dir = splits_dir
        self.data = pd.read_csv(csv_path)
        self.splits = {}  # To store train/validation splits for each seed
        self.name = name

        # Ensure the output directory exists
        os.makedirs(self.splits_dir, exist_ok=True)

    @staticmethod
    def impute_missing(train, n_neighbors=3):
        """
        Static method to impute missing values using the K-nearest neighbors algorithm.
        """
        imputer = KNNImputer(n_neighbors=n_neighbors)
        imputed_data = imputer.fit_transform(train)
        imputed_df = pd.DataFrame(imputed_data, columns=train.columns, index=train.index)
        return imputed_df

    def split_samples(self, seeds=[42, 43, 44, 45, 46]):
        """
        Split the data into train and validation sets using multiple seeds and save them to CSV.

        Parameters:
        - seeds (list): List of random seeds to generate splits.
        """
        for seed in seeds:
            train, val = train_test_split(self.data, test_size=0.3, random_state=seed)

            #train = self.ih_measure(train)

            self.splits[seed] = {'train': train, 'validation': val}

            # Save splits as CSV
            train_path = os.path.join(self.splits_dir, f"train_{self.name}_seed_{seed}.csv")
            val_path = os.path.join(self.splits_dir, f"validation_{self.name}_seed_{seed}.csv")

            train.to_csv(train_path, index=False)
            val.to_csv(val_path, index=False)


        print(f"Splits created and saved in '{self.splits_dir}' for seeds: {seeds}")

In [None]:
class ResultsGenerator:
    def __init__(self, name, splits_dir= DATASET_PATH + "output_splits", results_dir= DATASET_PATH + "output_results"):
        """
        Initialize the ReportGenerator with name and input and output directory.

        Parameters:
        - name (str): name of experiment
        - input_dir (str): Directory where splits are saved.
        """
        self.splits_dir = splits_dir
        self.results_dir = results_dir
        self.name = name

        # Ensure the output directory exists
        os.makedirs(self.results_dir, exist_ok=True)

    @staticmethod
    def data_sample(X, y):

        # Define sampling strategies
        undersample = RandomUnderSampler(sampling_strategy='majority', random_state=1)
        oversample = RandomOverSampler(sampling_strategy=0.2, random_state=1)

        # Identify the minority class
        count_1 = (y == 1).sum()
        count_0 = (y == 0).sum()
        count_min = min(count_0, count_1)
        count_max = max(count_0, count_1)

        # Calculate the percentage of the minority class compared to the total number of instances
        ratio = (count_min / count_max)

        # If the minority class is more than 60% of the majority class, do not apply any resampling technique
        if ratio > 0.6:
            return X, y

        # Check if the percentage of class 1 is at least 5% of the total number of instances
        # If it is less than 5%, apply both over and under sampling
        elif ratio <= 0.2:
            X, y = oversample.fit_resample(X, y)
            return undersample.fit_resample(X, y)

        else:
            return undersample.fit_resample(X, y)


    def get_results(self):
        for seed in [42, 43, 44, 45, 46]:

#            for t in T:
                data_train = pd.read_csv(f'{self.splits_dir}/train_{self.name}_seed_{seed}.csv')
                data_test = pd.read_csv(f'{self.splits_dir}/validation_{self.name}_seed_{seed}.csv')

#                target_feature = data_train.columns[-2]
                target_feature = data_train.columns[-1]

                # Apply the function to each class group
#                filtered_train = data_train.groupby(target_feature, group_keys=False).apply(ResultsGenerator.get_easiest, t=t)

                # Reset the index of the filtered DataFrame
#                filtered_train.reset_index(drop=True, inplace=True)

                # Split X and y
                X_train = data_train.drop(columns=[target_feature])
                y_train = data_train[target_feature]
                X_test = data_test.drop(columns=[target_feature])
                y_test = data_test[target_feature]

                # Resample
                X_resampled, y_resampled = ResultsGenerator.data_sample(X_train, y_train)

                bst = CalibratedClassifierCV(estimator=base_bst, method='sigmoid', cv=5, ensemble=False, n_jobs=None )

                # Fit model
                bst.fit(X_resampled, y_resampled)

                y_pred_proba = bst.predict_proba(X_test)
                y_pred_positive = y_pred_proba[:, 1]  # Predicted probabilities for positive class
                y_pred_negative = y_pred_proba[:, 0]  # Predicted probabilities for negative class

                # Determine which class is predicted for each instance
                predicted_class = np.argmax(y_pred_proba, axis=1)  # Predicted class (0 for negative, 1 for positive)

                # Combine positive and negative confidence scores based on predicted class
                confidence_scores = np.where(predicted_class == 1, y_pred_positive, y_pred_negative)

                # Create DataFrame with true labels, predicted probabilities, and confidence scores
                results = pd.DataFrame({'True_Labels': y_test,
                                        'Predicted_Class': predicted_class,
                                        'Confidence_Scores': confidence_scores})

                results.to_csv(f'{self.results_dir}/results_seed_{seed}_{self.name}.csv')

In [None]:
# Instantiate the class
builder = SamplesBuilder(DATASET_PATH + DATASET, DATASET_NAME, splits_dir= DATASET_PATH + "splits")

# Generate splits and save them
builder.split_samples()

Splits created and saved in '/content/drive/MyDrive/Doutorado/Pesquisa/splits' for seeds: [42, 43, 44, 45, 46]


In [None]:
# Instantiate the class
results = ResultsGenerator(DATASET_NAME, splits_dir = DATASET_PATH + "splits", results_dir= DATASET_PATH + "results_dengue")

# Generate splits and save them
results.get_results()

In [None]:
df_y_pred_seed = []
df_accuracy_seed = []

for seed, splits in builder.splits.items():
    for i in range(len(modified_train_sets)):
        X = modified_train_sets[i].drop('severity', axis=1)  # Assuming 'severity' is your target column
        y = modified_train_sets[i]['severity']

        # Split data into training and testing sets.  n-1 for training, last for test
        X_train = X.iloc[:-1]
        X_test = X.iloc[-1:]
        y_train = y.iloc[:-1]
        y_test = y.iloc[-1:]

        # Initialize the XGBoost classifier
        xgb_model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(y.unique()), random_state=42)

        # Train the model
        xgb_model.fit(X_train, y_train)

        # Add a column to identify the seed
        last_row['seed'] = seed

        # Make predictions
        y_pred = xgb_model.predict(X_test)
        df_y_pred_seed.append(pd.Series(y_pred, name='y_pred'))

        # Evaluate the model
#        accuracy = accuracy_score(y_test, y_pred)
#        df_accuracy_seed.append(pd.Series([accuracy], name='accuracy'))

df_y_pred_final = pd.concat(df_y_pred_seed, ignore_index=True)
#df_accuracy_final = pd.concat(df_accuracy_seed, ignore_index=True)

In [None]:
#df_accuracy_final.to_csv('df_accuracy_final.csv', sep=',', encoding='utf-8')
df_y_pred_final.to_csv('df_y_pred_final.csv', sep=',', encoding='utf-8')

Valores reais de y

In [None]:
output_dir = "validation_csv"
os.makedirs(output_dir, exist_ok=True)

y_real = []

for seed, splits in builder.splits.items():
    validation_data = splits['validation']
    y_real.extend(validation_data['severity'].tolist())

# Create a DataFrame from the stacked 'severity' column
severity_df = pd.DataFrame({'severity': y_real})

# Save the DataFrame to a CSV file
severity_df.to_csv('y_real.csv', index=False)