# Oversampling con SMOTE

Provo ad applicare l'algoritmo SMOTE per fare oversampling e tentare di indurre nuovamente alberi di decisione, sempre di pronfondità massima 4.

Ho utilizzato come riferimento una classe messa a disposizione da Stefano dell'Oca, riadattandola al mio task di classificazione multi-classe

La classe da me realizzata incapsula l'applicazione di oversampling con SMOTE, SMOTENC (una variante di SMOTE applicabile in presenza di feature sia numeriche che categoriche), il training del modello selezionato e la cross validazione

La classe permette inoltre di realizzare oversampling duplicando i campioni delle classi minoritarie nel dataset e applicando ai nuovi samples "artificiali" del rumore casuale. Nella classe sono settati i parametri che stabiliscono la probabilità con cui ad un sample sintetico è applicato del rumore e con cui una feature di un sample sintatico selezionato sarà oggetto di aggiunta di rumore o meno. Il rumore random che si applica alle feature appartiene sempre al range di variazione che caratterizza la feature nel dataset

Ho implementato due versioni distinte della classe in questione: una in cui l'oversampling è applicato PRIMA di fare cross validazione e una in cui, dopo aver diviso i samples in folder, si applica l'oversampling alle folder da usare per il training

In [24]:
import numpy as np
from imblearn.over_sampling import RandomOverSampler
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import SMOTENC
from sklearn.model_selection import cross_val_score
from collections import Counter
import random
from sklearn import tree
import graphviz

#questa versione prima fa augmentation, poi fa cross val
class SEC_aug_pre_split(object):
    def __init__(self, X, Y, aug_method, learning_alg, n_fold, n_it, seed):
        self.X = X
        self.Y = Y
        self.aug_method = aug_method
        self.learning_alg = learning_alg
        self.n_fold = n_fold
        self.n_it = n_it
        self.seed = seed
        
        self.accuracy_median_scores = np.zeros(self.n_it)
        
    #oversamplo tutte le classi
    def random_oversample(self, X_train, Y_train):
        ros = RandomOverSampler(random_state = self.seed)
        X_train_aug, Y_train_aug = ros.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #roversamplo tutte le classi
    def SMOTE_augmentation(self, X_train, Y_train):
        smt = SMOTE(random_state = self.seed , k_neighbors = 2)
        X_train_aug, Y_train_aug = smt.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #anche qua oresamplo tutte le classi
    #questo va usato in caso di feature categoriche
    def SMOTENC_augmentation(self, X_train, Y_train, cat_indexes):#assumo di non aver fatto one-hot-encoding prima
        smt = SMOTENC(k_neighbors = 2, random_state=self.seed, categorical_features=cat_indexes)
        X_train_aug, Y_train_aug = smt.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #oversampling con aggiunta di rumore random
    def oversample_augmentation(self, X_train, Y_train, err_perc, prob_modifica):#qua assumo di non aver fatto one hot enconding
        
        #modifico il parametro in maniera opportuna
        if err_perc > 1:
            err_perc /= 100
            
        #ricavo range di ciascuna feature numerica
        min_features = X_train.drop(['TIC type'], axis = 1).min()
        max_features = X_train.drop(['TIC type'], axis = 1).max()
        
        #metà del range di variazione delle features numeriche
        scales = (max_features-min_features)/2
        
        #numero di samples iniziali per ogni classe
        n_samples_iniziali = Counter(Y_train)
        
        
        #indice dal quale sono presenti i samples artificiali aggiunti
        index_partenza_art = len(Y_train)
        
        #pareggio le  classi introducendo nuovi samples artificiali ottenuti duplicando quelli presenti
        X_train_aug, Y_train_aug = self.random_oversample(X_train, Y_train)
        
        #salvo numero di samples aggiunti per ogni classe
        n_samples_post_aug = Counter(Y_train_aug)
        added_samples = n_samples_post_aug-n_samples_iniziali
        
        
        #calcolo numero di samples artificiali a cui applicherò del rumore, in funzione di err_perc
        n_apply_noise = dict()
        for k in added_samples.keys():
             n_apply_noise[k] = round(added_samples[k] * err_perc)
                
        
        #ricavo gli indici dei samples artificiali delle varie classi a cui devo applicare rumore                                                                                               
        indexes_1 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 1 and index >= index_partenza_art], n_apply_noise[1])
        indexes_2 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 2 and index >= index_partenza_art], n_apply_noise[2])
        #sample di classe 3 non vengono aggiunti, sono la classe maggioritaria
        indexes_4 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 4 and index >= index_partenza_art], n_apply_noise[4])
        indexes_5 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 5 and index >= index_partenza_art], n_apply_noise[5])
       
        indici_da_modificare = pd.Series([indexes_1, indexes_2, indexes_4,indexes_5])
        indici_da_modificare.reindex([1,2,4,5])
     
        mu,sigma=0, 0.1
        
        for classe, indici in enumerate(indici_da_modificare):
            for indice in indici:
                for feature in X_train_aug.columns:
                    n = random.uniform(0,1)
                    if n < prob_modifica:
                        if feature.startswith('TIC'):#la feature è categorica
                            X_train_aug.at[indice, feature] = random.choice(X_train_aug['TIC type'].unique())
                        else:#la feature è numerica
                           
                            X_train_aug.at[indice, feature] = round(X_train_aug.at[indice, feature]+scales[feature]*np.random.normal(mu, sigma))
                            #verifico se é uscito dall'intervallo, in caso gli imposto il valore massimo                                        
                            X_train_aug.at[indice, feature]=min(X_train_aug.at[indice, feature], max_features[feature])
                            #verifico se è uscito dall'intervallo, in caso gli imposto il valore minimo
                            X_train_aug.at[indice, feature]=max(X_train_aug.at[indice, feature], min_features[feature])

        return X_train_aug, Y_train_aug
    
    #fa augmentation dei samples, fa cross validation e calcola l'accuratezza media per le varie iterazioni della cross-val
    def fit_score(self):
        for niter in range(self.n_it):
            
            #Qua prima augmento e poi faccio cross val classica  
            
            #Augmentation
            if self.aug_method == 'smote':#da applicare solo se ho tutte feature numeriche, altrimenti uso smotec o oversample
                X_train_aug, Y_train_aug = self.SMOTE_augmentation(self.X, self.Y)
            elif self.aug_method == 'smotenc':
                X_train_aug, Y_train_aug = self.SMOTENC_augmentation(self.X, self.Y, [3])
                X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'] )
            elif self.aug_method == 'oversample':
                X_train_aug, Y_train_aug = self.oversample_augmentation(self.X, self.Y,err_perc=10, prob_modifica=0.7)
                X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'] )
            else:
                X_train_aug, Y_train_aug = self.X.copy(), self.Y.copy()

          
            #Predictions
            scores = cross_val_score(self.learning_alg, X_train_aug, Y_train_aug)#di default è 5 fold cross val

            self.accuracy_median_scores[niter] = scores.mean()
               
    def print_result(self):
        print("Accuratezza: %f\n" %(np.mean(self.accuracy_median_scores)))
        
    def get_tree_smotenc(self):
        X_train_aug, Y_train_aug = self.SMOTENC_augmentation(self.X, self.Y, [3])
        X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'] )
        albero = tree.DecisionTreeClassifier(criterion = 'entropy', max_depth = 4, min_samples_leaf = 2)
        albero = albero.fit(X_train_aug, Y_train_aug)
        data = tree.export_graphviz(albero, out_file=None, 
                      feature_names=X_train_aug.columns,  
                      class_names=pd.Series(Y_train_aug.unique()).astype(str).sort_values().tolist(),  
                      filled=True, rounded=True,  
                      special_characters=True) 
        graph = graphviz.Source(data) 
        graph.render("alberoSMOTENC")
        
    def get_tree_oversample(self):
        X_train_aug, Y_train_aug = self.oversample_augmentation(self.X, self.Y, err_perc=10, prob_modifica=0.7)
        X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'] )
        albero = tree.DecisionTreeClassifier(criterion = 'entropy', max_depth = 4, min_samples_leaf = 2)
        albero = albero.fit(X_train_aug, Y_train_aug)
        data = tree.export_graphviz(albero, out_file=None, 
                      feature_names=X_train_aug.columns,  
                      class_names=pd.Series(Y_train_aug.unique()).astype(str).sort_values().tolist(),  
                      filled=True, rounded=True,  
                      special_characters=True) 
        graph = graphviz.Source(data) 
        graph.render("alberoOversample")

        

In [4]:
#versione che fa cross val prima dell'augmentazione

#usa KFold e non cross_val_score
import numpy as np
from imblearn.over_sampling import RandomOverSampler
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import SMOTENC
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from collections import Counter
import random
import pandas as pd

class SEC_aug_post_split(object):
    def __init__(self, X, Y, aug_method, learning_alg, n_fold, n_it, seed):
        self.X = X
        self.Y = Y
        self.aug_method = aug_method
        self.learning_alg = learning_alg
        self.n_fold = n_fold
        self.n_it = n_it
        self.seed = seed
        
        self.accuracy_median_scores = np.zeros(self.n_it)
        
    #oversamplo tutte le classi
    def random_oversample(self, X_train, Y_train):
        ros = RandomOverSampler(random_state = self.seed)
        X_train_aug, Y_train_aug = ros.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #oversamplo tutte le classi
    def SMOTE_augmentation(self, X_train, Y_train):
        smt = SMOTE(random_state = self.seed , k_neighbors = 2)
        X_train_aug, Y_train_aug = smt.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #anche qua resamplo tutte le classi
    #questo va usato in caso di feature categoriche
    def SMOTENC_augmentation(self, X_train, Y_train, cat_indexes):#assumo di non aver ancora fatto one-hot-encoding
        smt = SMOTENC(k_neighbors = 2, random_state=self.seed, categorical_features=cat_indexes)
        X_train_aug, Y_train_aug = smt.fit_resample(X_train, Y_train)
        return X_train_aug, Y_train_aug
    
    #oversampling con agiunta rumore random
    def oversample_augmentation(self, X_train, Y_train, err_perc, prob_modifica):
        
        if err_perc > 1:
            err_perc /= 100
        
        #ricavo range di ciascuna feature
        min_features = X_train.drop(['TIC type'], axis = 1).min()
        max_features = X_train.drop(['TIC type'], axis = 1).max()
        
         #metà del range
        scales = (max_features-min_features)/2
        
        #numero di samples iniziali per ogni classe
        n_samples_iniziali = Counter(Y_train)
        
        #indice dal quale sono presenti i samples artificiali aggiunti
        index_partenza_art = len(Y_train)
        
        #pareggio le  classi introducendo nuovi samples artificiali
        X_train_aug, Y_train_aug = self.random_oversample(X_train, Y_train)
        
        #salvo numero di samples aggiunti per ogni classe
        n_samples_post_aug = Counter(Y_train_aug)
        added_samples = n_samples_post_aug-n_samples_iniziali
        
        #calcolo numero di samples artificiali a cui applicherò del rumore, in funzione di err_perc
        n_apply_noise = dict()
        for k in added_samples.keys():
             n_apply_noise[k] = round(added_samples[k] * err_perc)
                
        #ricavo gli indici dei samples artificiali delle varie classi a cui devo applicare rumore                                                     
        indexes_1 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 1 and index >= index_partenza_art], n_apply_noise[1])
        indexes_2 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 2 and index >= index_partenza_art], n_apply_noise[2])
        indexes_4 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 4 and index >= index_partenza_art], n_apply_noise[4])
        indexes_5 = random.sample([index for index in Y_train_aug.index if Y_train_aug[index] == 5 and index >= index_partenza_art], n_apply_noise[5])
       
        indici_da_modificare = pd.Series([indexes_1, indexes_2, indexes_4,indexes_5])
        indici_da_modificare.reindex([1,2,4,5])
     
        mu,sigma=0, 0.1
        
        
        for classe, indici in enumerate(indici_da_modificare):
            for indice in indici:
                for feature in X_train_aug.columns:
                    n = random.uniform(0,1)
                    if n < prob_modifica:
                        if feature.startswith('TIC'):#la feature è categorica
                            X_train_aug.at[indice, feature] = random.choice(X_train_aug['TIC type'].unique())
                        else:#la feature è numerica
                            
                            X_train_aug.at[indice, feature] = round(X_train_aug.at[indice, feature]+scales[feature]*np.random.normal(mu, sigma))
                            #verifico che se fosse uscito dall'intervallo gli imposto il valore massimo                                        
                            X_train_aug.at[indice, feature]=min(X_train_aug.at[indice, feature], max_features[feature])
                            #verifico che se fosse uscito dall'intervallo gli imposto il valore minimo
                            X_train_aug.at[indice, feature]=max(X_train_aug.at[indice, feature], min_features[feature])

        return X_train_aug, Y_train_aug
    #prima crea le folder per cross-val, poi augmenta i sample del training set, poi fitta il modello e poi calcola accuratezza
    def fit_score(self):
        for niter in range(self.n_it):
            
            #e setto che fa shuffling
            sf = KFold(n_splits = self.n_fold, shuffle = True, random_state = self.seed)
            
            accuracy_scores = []
           
            for train_index, test_index in sf.split(self.X, self.Y):
                X_train, X_test = self.X.iloc[train_index], self.X.iloc[test_index]
                Y_train, Y_test = self.Y.iloc[train_index], self.Y.iloc[test_index]
              
                #Augmentation
                if self.aug_method == 'smote':
                    X_train_aug, Y_train_aug = self.SMOTE_augmentation(X_train, Y_train)
                elif self.aug_method == 'smotenc':
                    X_train_aug, Y_train_aug = self.SMOTENC_augmentation(X_train, Y_train, [3])
                    X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'])
                elif self.aug_method == 'oversample':
                    X_train_aug, Y_train_aug = self.oversample_augmentation(X_train, Y_train,err_perc=10, prob_modifica=0.7)
                    X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'])
                else:
                    X_train_aug, Y_train_aug = X_train, Y_train
                    X_train_aug = pd.get_dummies(X_train_aug, columns = ['TIC type'])
                
                #Training
                clf = self.learning_alg.fit(X_train_aug, Y_train_aug)
                
                #Predictions
                X_test = pd.get_dummies(X_test, columns = ['TIC type'])
                columns = X_test.columns
                if('TIC type_A' not in columns):
                    X_test['TIC type_A'] = 0
                if('TIC type_B'not in columns):
                    X_test['TIC type_B']= 0
                if('TIC type_C'not in columns):
                    X_test['TIC type_C'] = 0
                if('TIC type_D'not in columns):
                    X_test['TIC type_D'] =0
               
                predictions = clf.predict(X_test)
                
                #calcolo accuracy
                accuracy_scores.append(accuracy_score(predictions, Y_test))
                
                
            self.accuracy_median_scores[niter] = np.mean(accuracy_scores)
        #sarebbe carino salvare file pdf del'albero
            

    def print_result(self):
        print("Accuratezza: %f\n" %(np.mean(self.accuracy_median_scores)))


In [5]:
#importo il ds
import pandas as pd

df = pd.read_excel('Db_parotide_Def_REV_Samuel.xlsx')
df['ADC'] = df['ADC'].apply(lambda s: str(s).replace(',', '.')).astype(float)
df['Output Algoritmo'].iloc[0] = 3
df['Output Algoritmo'].iloc[1] = 3
df['Output Algoritmo'].iloc[20] = 3
dfAlbero = df.iloc[:, 1:8]
dfAlbero = dfAlbero.drop(['Output Algoritmo'], axis = 1)
dfAlbero


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_block(indexer, value, name)


Unnamed: 0,eta,Segni macro malignità,ADC,TIC type,T2,COD ISTO
0,33,0,2.60,D,0,1
1,50,0,2.44,D,0,1
2,81,1,0.70,C,1,1
3,28,0,1.60,A,1,1
4,51,0,1.40,A,0,1
...,...,...,...,...,...,...
102,27,0,1.30,C,1,1
103,50,0,1.90,A,0,5
104,62,0,2.10,A,0,5
105,37,0,1.80,A,0,5


In [11]:
X = dfAlbero.iloc[:,:-1]
Y = dfAlbero['COD ISTO']

from sklearn import tree
albero = tree.DecisionTreeClassifier(criterion = 'entropy', max_depth = 4 , min_samples_leaf = 2)

seed = 42
prova = SEC_aug_pre_split(X,Y, 'smotenc', albero, 5, 50, seed)
prova.fit_score()
prova.print_result()

Accuratezza: 0.458222



In [12]:
prova = SEC_aug_pre_split(X,Y, 'oversample', albero, 5, 50, seed)
prova.fit_score()
prova.print_result()

Accuratezza: 0.496593



In [13]:
prova = SEC_aug_post_split(X,Y, 'smotenc', albero, 5, 50, seed)
prova.fit_score()
prova.print_result()

Accuratezza: 0.253680



In [14]:
prova = SEC_aug_post_split(X,Y, 'oversample', albero, 5, 50, seed)
prova.fit_score()
prova.print_result()

Accuratezza: 0.179766



In [27]:
#fare oversampling prima del training e fittare il modello sui dati augmentati sembra la strategia che conduce alla
#maggiore accuratezza
#tuttavia non sono sicuro che questo sia l'approccio giusto
#ora provo a visualizzare l'albero che si ottiene facendo training su tutto il dataset augmentato con smote e 
#con oversample augmentation

prova = SEC_aug_pre_split(X,Y, 'oversample', albero, 5, 50, seed)
prova.get_tree_oversample()
prova.get_tree_smotenc()