In [1]:
import pandas as pd
import numpy as np
import pickle

from sklearn.base import clone
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.base import BaseEstimator
from sklearn.model_selection import KFold

from typing import Tuple

from src.load_models import select_model
from src.load_dataset import load_dataset
from src.config import models_features_r2, models_features_per, model_name_conversion
from src.graph_visualization import visualize_highest_score_feature_selection, feature_selection_tabularize, visualization_testing_dataset

from src.utils import find_adj_score, calculate_y_LOD, per_error

In [2]:
class ModelSelection():
    def __init__(self, model_name:str, X_train:pd.DataFrame, y_train:pd.Series):
        self.X_train, self.y_train = X_train, y_train
        self.model = select_model(model_name)
        self.all_feature_scores = []
    
        self.y_LOD = calculate_y_LOD(self.X_train, self.y_train) 

    def save(self, path:str) -> None:
        with open(path, 'wb') as f:
            pickle.dump(self.model, path) 

    def find_score(self, kf:KFold, features:list) -> np.ndarray:
        return np.array(self.calculate_r2_score(self.model, self.X_train[features], self.y_train, kf))
    
    def find_per_diff(self, kf:KFold, features:list) -> np.ndarray:
        return np.array(self.calculate_per_diff(self.model, self.X_train[features], self.y_train, kf))
    
    def calculate_per_diff(self, model:BaseEstimator, X:pd.DataFrame, y:pd.Series, kf:KFold) -> np.ndarray:
        per_diff_all = []
        
        for train_index, test_index in kf.split(X):
            model_ = clone(model)
            
            # Split the data into training and testing sets
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y.to_numpy()[train_index], y.to_numpy()[test_index]
        
            model_.fit(X_train, y_train)
            
            mask           = (y_test != 0)    # Non Zero Concentration
            zero_mask      = ~(mask)          # Zero Concentration

            y_pred         = model_.predict(X_test)
            y_pred         = np.maximum(y_pred, 0.0)

            # Only for non zero concentration
            non_zero_per_error = np.abs(y_test[mask] - y_pred[mask])/(0.5*(y_test[mask] + y_pred[mask]))
           
            # zero concentration
            zero_per_error     = np.abs(y_test[zero_mask] - y_pred[zero_mask]) / self.y_LOD

            assert not(np.isnan(zero_per_error).any())
            assert not(np.isnan(non_zero_per_error).any())

            per_error         = np.concatenate((non_zero_per_error, zero_per_error))
            per_error         = np.mean(per_error) * 100

            assert not(np.isnan(per_error)) # To check if any output is invalid or nan
            per_diff_all.append(per_error)

        
        return np.array(per_diff_all).mean()
    

    def calculate_r2_score(self, model:BaseEstimator, X:pd.DataFrame, y:pd.Series, kf:KFold) -> np.ndarray:
        scores, adj_scores = [], []

        for train_index, test_index in kf.split(X):
            model_ = clone(model)
            
            # Split the data into training and testing sets
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y.to_numpy()[train_index], y.to_numpy()[test_index]
        
            model_.fit(X_train, y_train)
            
            y_pred         = model_.predict(X_test)
            y_pred         = np.maximum(y_pred, 0.0)

            score          = r2_score(y_test, y_pred)

            adj_score      = find_adj_score(len(y_pred), X_train.shape[1], score) # N, P, R2 score

            scores.append(score)
            adj_scores.append(adj_score)

        return np.array(scores).mean(), np.array(adj_scores).mean()
    
    def fit(self, features:list) -> None:
        self.model.fit(self.X_train[features], self.y_train)

    def find_best_features(self, kf:KFold, r2_score:float) -> list:
        model = clone(self.model)

        all_features            = self.X_train.columns.values
        self.selected_features  = all_features.copy().tolist()
        self.all_feature_scores = []

        best_score        = self.calculate_r2_score(model, self.X_train[all_features], self.y_train, kf) if r2_score else \
                            self.calculate_per_diff(model, self.X_train[all_features], self.y_train, kf)
        
        flag              = False

        while len(self.selected_features) != 0:
            one_line_score    = []
            one_line_features = []
            
            for feature in all_features:
                if feature in self.selected_features: 
                    testing_feature = [i for i in self.selected_features if i != feature] # Remove the feature from the set
                    
                    if r2_score:
                        score = self.calculate_r2_score(model, self.X_train[testing_feature], self.y_train, kf)
                    else:  
                        score = self.calculate_per_diff(model, self.X_train[testing_feature], self.y_train, kf)
                    
                    one_line_score.append(score)
                    one_line_features.append(feature)
           
            one_line_score = np.array(one_line_score) if r2_score else one_line_score
            
            if r2_score==True:
                best_socre_ind      = np.argmax(one_line_score[:,0])
                one_line_best_score = one_line_score[best_socre_ind]

            else:
                best_socre_ind, one_line_best_score = np.argmin(one_line_score), np.min(one_line_score)

            sel_one_line_feature    = one_line_features[best_socre_ind] 

            temp = {}
            for key, score in zip(one_line_features, one_line_score):
                # key = [i for i in self.selected_features if i != key]
                temp[str(key)] = score

            if r2_score:
                if one_line_best_score[0] > best_score[0]:
                    best_score = one_line_best_score
                    self.selected_features.remove(sel_one_line_feature)
                    self.all_feature_scores.append(temp)
                    flag = False

                else: flag = True
                        
            else:
                if one_line_best_score <= best_score:
                    best_score = one_line_best_score
                    self.selected_features.remove(sel_one_line_feature)
                    self.all_feature_scores.append(temp)
                    flag = False

                else: flag = True

            if flag: break
        
        self.best_score = best_score
        if self.all_feature_scores == []: self.all_feature_scores.append({str(self.selected_features): best_score[0]})
        return self.all_feature_scores
    
    def find_testing_score(self, X_test: pd.DataFrame, y_test: pd.DataFrame) -> Tuple[list, list]:

        # Fit the model with selected features
        self.model.fit(self.X_train[self.selected_features], self.y_train)

        # Return both training and testing r2 score
        return self.model.score(self.X_train[self.selected_features], self.y_train), \
               self.model.score(X_test[self.selected_features], y_test)

In [3]:
def find_performance_metric(model_names: list, r2_top:pd.DataFrame) -> Tuple[dict, dict]:
    """
        Calcualte R2 Score and Percent Error on testing dataset
    """

    r2_scores  = {'Models':[], 'Scores':[]}
    per_errors = {'Models':[], 'Scores':[]}

    model_names = model_names if not(only_one_multivariate) else r2_top['Models'].values.tolist()

    print(model_names)
    for model_name in model_names:
        
        model_name = 'Linear' if ((model_name == 'multivariate')) else model_name

        model_r2   = ModelSelection(model_name, X_train, y_train)
        model_per  = ModelSelection(model_name, X_train, y_train)
        
        model_r2.fit(models_features_r2[model_name])
        model_per.fit(models_features_per[model_name])

        y_pred_r2  = model_r2.model.predict(X_test[models_features_r2[model_name]])
        y_pred_per = model_per.model.predict(X_test[models_features_per[model_name]])

        model_per_error      = per_error(y_test, y_pred_per, model_per.y_LOD)
        model_r2_score       = r2_score(y_test, y_pred_r2)
        model_r2_adj         = find_adj_score(len(y_test), len(models_features_r2[model_name]), model_r2_score)  # Numer of testing dataset, number of features, R2

        model_name = 'multivariate' if ((model_name == 'Linear') and only_one_multivariate) else model_name

        r2_scores['Models'].append(model_name) 
        per_errors['Models'].append(model_name)
        
        
        r2_scores['Scores'].append((model_r2_score, model_r2_adj))
        per_errors['Scores'].append(model_per_error)

    return r2_scores, per_errors
    
def select_features(X_train: pd.DataFrame, y_train: pd.DataFrame, model_names: list) -> Tuple[dict, dict]:
    """
        This function selects the best feature combinations for both 
        R2 score and percent error metrics for the given lists of models
    """

    feature_selection_r2score     = {}
    feature_selection_per_diff    = {}

    kf        = KFold(n_splits=5, shuffle=True, random_state=42)

    for model_name in model_names:
        dataset_model_name = model_name
       
        model = ModelSelection(model_name, X_train, y_train)

        feature_selection_r2score[dataset_model_name]  = model.find_best_features(kf, r2_score=True)
        models_features_r2[model_name] = model.selected_features

        feature_selection_per_diff[dataset_model_name] = model.find_best_features(kf, r2_score=False)
        models_features_per[model_name] = model.selected_features

        print(f"{model_name} R2 Score Best Feature",      models_features_r2[model_name])
        print(f"{model_name} Percent Error Best Feature", models_features_per[model_name])

        print("****************************************************")

    return feature_selection_r2score, feature_selection_per_diff

In [4]:
model_names = ['Linear', 'KNN', 'SVM', 'RF', 'GP']
OUTPUT_PATH = 'Outputs'

only_one_multivariate = False
adj_score             = False 

comparision_model     = 'uni_multivariate' if only_one_multivariate else 'linear_nonlinear'

X_train, X_test, y_train, y_test = load_dataset()
feature_selection_r2score, feature_selection_per_diff = select_features(X_train, y_train, model_names)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X.rename(columns={"PH": 'univariate, max(S)', 'signal_std':'univariate, std(S)', 'signal_mean':'univariate, mean(S)', 'peak area':'univariate, area(S)', \


######Data Distribution:#########
Training {0: 23, 8: 25, 16: 21}
Testing {0: 15, 8: 16, 16: 15}
#################################


ZeroDivisionError: float division by zero

In [None]:
for model in model_names:
    df = feature_selection_tabularize(feature_selection_r2score[model])
    df.to_excel(f'{OUTPUT_PATH}/feature_selection_list/feature_selection_r2score_{model}.xlsx', index=False)

    df = feature_selection_tabularize(feature_selection_per_diff[model])
    df.to_excel(f'{OUTPUT_PATH}/feature_selection_list/feature_selection_per_error_{model}.xlsx', index=False)

In [None]:

r2_top = visualize_highest_score_feature_selection(feature_selection_r2score, f"{OUTPUT_PATH}/{comparision_model}_5_fold_r2score_backward.png",    model_name_conversion, only_one_multivariate=only_one_multivariate, legends=True, adj_score=adj_score)
visualize_highest_score_feature_selection(feature_selection_per_diff, f"{OUTPUT_PATH}/{comparision_model}_5_fold_per_error_backward.png", model_name_conversion, r2_score=False, only_one_multivariate=only_one_multivariate, legends=True)


In [None]:
# Calculate R2 Score and Percent Error on Testing Dataset
test_r2_scores, test_per_errors = find_performance_metric(model_names, r2_top)

extend_name = ''

# Plot the R2 score and Percent Error in the Bar chart
visualization_testing_dataset(test_r2_scores,  f"{OUTPUT_PATH}/{comparision_model}_testing_r2_score{extend_name}.png",  model_name_conversion, only_one_multivariate, r2_score=True,  adj_score=adj_score, legends=True)
visualization_testing_dataset(test_per_errors, f"{OUTPUT_PATH}/{comparision_model}_testing_per_error{extend_name}.png", model_name_conversion, only_one_multivariate, r2_score=False, legends=True)
