# Imports

In [3]:
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.linear_model import LinearRegression
from imblearn.under_sampling import RandomUnderSampler
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import *
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold, StratifiedKFold, train_test_split, cross_validate, cross_val_score

# PpcEvaluator

In [5]:
class PpcEvaluator:
    def __init__(self, regressor, dataset):
        self.__regressor = (lambda: regressor)
        self.__dataset = dataset.get_dataframe()
        self.__scaler = MinMaxScaler()
        self.__k = 10
        self.__error_metrics = [
            'r2', 'max_error', 'neg_mean_absolute_error',
            'neg_mean_squared_error', 'neg_root_mean_squared_error',
            'neg_mean_squared_log_error', 'neg_median_absolute_error'
        ]
        self.__error_metrics_table = None
        
    def set_scaler(self, scaler):
        self.__scaler = scaler
    
    def __scale(self, dados):
        return self.__scaler.fit_transform(dados)

    def evaluate(self, metrics, only_scaled=False, display_prediction=False, display_feature_importance=False):
        ppc = self.__dataset['PrimePathCoverage'].values
        previsores_content = self.__dataset[metrics].values

        self.__error_metrics_table = pd.DataFrame(
            index=['Mean Abs Error', 'Mean Sqr Error', 'Mean Sqr Log Error', 'Mean Median Error', 'R2 Score'],
            columns=['no_scaled', 'scaled']
        )

        if not only_scaled:
            resultados = cross_validate(
                    self.__regressor(), 
                    previsores_content, 
                    ppc, 
                    cv=self.__k, 
                    scoring=self.__error_metrics, 
                    return_estimator=True
            )
            self.__build_error_metrics_table(previsores_content, ppc, resultados, 'no_scaled')

        resultados_escalonados = cross_validate(
                self.__regressor(),
                self.__scale(previsores_content), 
                ppc, 
                cv=self.__k, 
                scoring=self.__error_metrics, 
                return_estimator=True
        )
        
        self.__build_error_metrics_table(self.__scale(previsores_content), ppc, resultados_escalonados, 'scaled')
        
        if display_feature_importance:
            k_current = 0
            columns = [i for i in range(self.__k)]
            columns.append('Mean')
            fi_scaled_table = pd.DataFrame(columns=columns, index=self.__dataset[metrics].columns)

            for estimator in resultados_escalonados['estimator']:
                fi_scaled_table[k_current] = feature_importance_of(
                    estimator, escalonador(previsores_content), ppc, self.__dataset[metrics].columns
                )
                k_current += 1

            for i in range(fi_scaled_table.shape[0]):
                fi_scaled_table['Mean'][i] = fi_scaled_table.iloc[i,:-1].mean()


            fi_final = pd.DataFrame(columns=['Metrics', 'Importance'])
            fi_final['Metrics'] = fi_scaled_table.index.values
            fi_final['Importance'] = fi_scaled_table['Mean'].values

            fi_final = fi_final.sort_values(ascending=False, by='Importance')
            display(fi_final)
            plt.figure(figsize=(12,8))
            plt.title("Feature importance - K = " + str(self.__k) + " - Mean")
            plt.axis([0, fi_final['Importance'].values.max(), 0, len(fi_final.index.values)])
            sns.barplot(y=fi_final['Metrics'].values, x=fi_final['Importance'].values, orient='h')
            plt.show()


        return self.__error_metrics_table
    
    def __build_error_metrics_table(self, previsores_content, ppc, estimator, label):
        idx_best_estimator = estimator['test_neg_root_mean_squared_error'].argmax()
        self.__error_metrics_table[label]['Mean Abs Error'] = abs(estimator['test_neg_mean_absolute_error'][idx_best_estimator])
        self.__error_metrics_table[label]['Mean Sqr Error'] = abs(estimator['test_neg_mean_squared_error'][idx_best_estimator]) 
        self.__error_metrics_table[label]['Mean Sqr Log Error'] = abs(estimator['test_neg_mean_squared_log_error'][idx_best_estimator])
        self.__error_metrics_table[label]['Mean Median Error'] = abs(estimator['test_neg_median_absolute_error'][idx_best_estimator]) 
        self.__error_metrics_table[label]['R2 Score'] = abs(estimator['test_r2'][idx_best_estimator]) 

# MlPpcEvaluator

In [4]:
class MlPpcEvaluator(ABC):
    
    def __init__(self, dataset, tot_seeds=0, auto_display=True):
        self.error_noscaled_metrics_table = None
        self.error_scaled_metrics_table = None
        self.__full_error_metrics_table_scaled = None
        self.__full_error_metrics_table_noscaled = None
        self.__dataset = dataset
        self.__tot_seeds = tot_seeds
        self.__viewer = MlPpcViewer(self)
        self.__auto_display = auto_display
        self.__scaler = MinMaxScaler()
        self.__k = 10
        self.__precision = 3
    
    def evaluate(self, metrics):
        self.__build_noscaled_dataframe()
        self.__build_scaled_dataframe()
        self.evaluate_metrics(metrics)
        self.__build_predict_error_table(metrics, 'scaled')
        self.__build_predict_error_table(metrics, 'no_scaled')
        
        if self.__auto_display:
            self.display_results()
            
    def __scale(self, data):
        return self.__scaler.fit_transform(data)
    
    def __build_noscaled_dataframe(self):
        self.error_noscaled_metrics_table = self.__build_dataframe('Without scaling')
        
    def __build_scaled_dataframe(self):
        self.error_scaled_metrics_table = self.__build_dataframe('With scaling')
        
    def __build_dataframe(self, caption):
        seeds = [i for i in range(self.__tot_seeds+1)]
        
        if self.__tot_seeds > 1:
            seeds.append('Mean')
        
        dataframe = pd.DataFrame(
            index=['Mean Abs Error', 'Mean Sqr Error', 'Mean Sqr Log Error', 'Mean Median Error', 'R2 Score'],
            columns=seeds
        )
        dataframe.columns.name = 'Seed'
        dataframe.index.name = 'Error Metrics'
        
        return dataframe
    
    def __build_predict_error_table(self, metrics, label):
        has_ec = 'EdgeCoverage' in metrics
        best_mae = 1
        predict_metrics = None
        ppc_predict = None
        ppc_correct = None
        ec = []
        df = self.get_dataframe()
        all_metrics = metrics.copy()
        all_metrics.append('PrimePathCoverage')
        kf = KFold(n_splits=self.__k, shuffle=False)
        model = self.get_regressor()
        df.index = df.index * 10
        len_train = 0
        len_test = 0

        for train_index, test_index in kf.split(df):
            train = df.iloc[train_index]
            test = df.iloc[test_index]
            
            X_train = df[metrics].iloc[train_index]
            X_test = df[metrics].iloc[test_index]
            y_train = df['PrimePathCoverage'].iloc[train_index]
            y_test = df['PrimePathCoverage'].iloc[test_index]
            
            if label == 'scale':
                X_train = self.__scale(X_train)
                X_test = self.__scale(X_test)
                
            model.fit(X_train, y_train)

            y_predict = model.predict(X_test)
            mae = mean_absolute_error(y_test, y_predict)

            """
            if ppc_predict is None:
                predict_metrics = X_test
                ppc_predict = y_predict
                ppc_correct = y_test.values
            else:
                predict_metrics = pd.concat([X_test, predict_metrics])
                ppc_predict = np.concatenate([y_predict, ppc_predict])
                ppc_correct = np.concatenate([y_test.values, ppc_correct])
            """
            
            if mae < best_mae:
                best_mae = mae
                predict_metrics = X_test
                ppc_predict = y_predict
                ppc_correct = y_test.values
                len_train = train.shape[0]
                len_test = test.shape[0]

        cyclomatic = df.loc[predict_metrics.index.values].iloc[:, 8].values
        
        if has_ec:
            ec = df.loc[predict_metrics.index.values].iloc[:, 12].values
        
        if has_ec:
            predict_table = pd.DataFrame(index=[i for i in range(len(ppc_predict))],columns=['Cyclomatic', 'EC correct', 'PPC correct', 'PPC predict', 'Error'])
        else:
            predict_table = pd.DataFrame(index=[i for i in range(len(ppc_predict))],columns=['Cyclomatic', 'PPC correct', 'PPC predict', 'Error'])

        for i in range(len(ppc_predict)):
            current_ppc_correct = ppc_correct[i]
            current_ppc_predict = MathUtils.truncate(ppc_predict[i], self.__precision)

            predict_table.loc[i, 'PPC correct'] = current_ppc_correct
            predict_table.loc[i, 'PPC predict'] = current_ppc_predict
            predict_table.loc[i, 'Cyclomatic'] = cyclomatic[i]
            predict_table.loc[i, 'Error'] = abs(current_ppc_correct - current_ppc_predict)

            if has_ec:
                predict_table.loc[i, 'EC correct'] = ec[i]

        predict_table.sort_values(by='Error', ascending=False, inplace=True)
        
        #print('Train:', len_train)
        #print('Test:', len_test)
        
        if label == 'scaled':
            self.__full_error_metrics_table_scaled = predict_table.copy()
        elif label == 'no_scaled':
            self.__full_error_metrics_table_noscaled = predict_table.copy()

    @abstractmethod
    def evaluate_metrics(self, metrics):
        pass
    
    @abstractmethod
    def get_regressor(self):
        pass
        
    def display_results(self):
        self.__viewer.display_scaled_evaluation()
        self.__viewer.display_noscaled_evaluation()
        
    def get_noscaled_metrics_table(self):
        return self.error_noscaled_metrics_table
    
    def get_scaled_metrics_table(self):
        return self.error_scaled_metrics_table
    
    def get_total_seeds(self):
        return self.__tot_seeds
    
    def get_dataset(self):
        return self.__dataset
    
    def get_dataframe(self):
        return self.__dataset.get_dataframe()
    
    def get_predict_table_scaled(self):
        return self.__full_error_metrics_table_scaled
    
    def get_predict_table_noscaled(self):
        return self.__full_error_metrics_table_noscaled

## Linear Regression

In [12]:
class LinearRegressionMlPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset):
        super(LinearRegressionMlPpcEvaluator, self).__init__(dataset, auto_display=False)
    
    def evaluate_metrics(self, metrics):
        only_ec = self.__is_only_ec(metrics)
        
        for metric in metrics:
            self.__evaluate_metric(metric, only_ec)
            self.display_results(metric)
            
    def __is_only_ec(self, metrics):
        return (len(metrics) == 1) and (metrics[0] == 'EdgeCoverage')
    
    def __evaluate_metric(self, metric, only_ec=False):
        evaluator = PpcEvaluator(self.__get_regressor(only_ec), self.get_dataset())
        error_metrics = evaluator.evaluate([metric])
        self.error_noscaled_metrics_table[0] = error_metrics['no_scaled']
        self.error_scaled_metrics_table[0] = error_metrics['scaled']
        
    def __get_regressor(self, only_ec):
        return LinearRegression(fit_intercept=False) if only_ec else LinearRegression(positive=True)
    
    def get_regressor(self):
        return self.__get_regressor(false)
    
    def display_results(self, metric):
        print('Metric:', metric, end='')
        super().display_results()

## SVR

In [9]:
class SvrMlPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset):
        super(SvrMlPpcEvaluator, self).__init__(dataset)
    
    def evaluate_metrics(self, metrics):
        evaluator = PpcEvaluator(self.get_regressor(), self.get_dataset())
        error_metrics = evaluator.evaluate(metrics)
        self.error_noscaled_metrics_table[0] = error_metrics['no_scaled']
        self.error_scaled_metrics_table[0] = error_metrics['scaled']
        
    def get_regressor(self):
        return SVR(epsilon=0.3)

## Random Forest

In [10]:
class RandomForestMlPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset, tot_seeds=0, auto_display=True):
        super(RandomForestMlPpcEvaluator, self).__init__(dataset, tot_seeds, auto_display=auto_display)

    def evaluate_metrics(self, metrics):
        total_seeds = self.get_total_seeds()
        
        for i in range(0, total_seeds + 1):
            evaluator = PpcEvaluator(self.__get_regressor_using_seed(i), self.get_dataset())
            error_metrics = evaluator.evaluate(metrics)
            self.error_noscaled_metrics_table[i] = error_metrics['no_scaled']
            self.error_scaled_metrics_table[i] = error_metrics['scaled']
            
        if total_seeds > 1:
            self.__compute_mean_error()
            
    def __get_regressor_using_seed(self, seed):
        return RandomForestRegressor(random_state=seed)
    
    def get_regressor(self):
        return self.__get_regressor_using_seed(0)
    
    def __compute_mean_error(self):
        self.__compute_mean_error_of_dataframe(self.error_noscaled_metrics_table)
        self.__compute_mean_error_of_dataframe(self.error_scaled_metrics_table)
    
    @staticmethod
    def __compute_mean_error_of_dataframe(dataframe):
        dataframe['Mean']['Mean Abs Error'] = dataframe.iloc[0,:-1].mean()
        dataframe['Mean']['Mean Sqr Error'] = dataframe.iloc[1,:-1].mean()
        dataframe['Mean']['Mean Sqr Log Error'] = dataframe.iloc[2,:-1].mean()
        dataframe['Mean']['Mean Median Error'] = dataframe.iloc[3,:-1].mean()
        dataframe['Mean']['R2 Score'] = dataframe.iloc[4,:-1].mean()

## K-Neighbors

In [15]:
class KNeighborsMlPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset, auto_display=True):
        super(KNeighborsMlPpcEvaluator, self).__init__(dataset, 0, auto_display=auto_display)
    
    def evaluate_metrics(self, metrics):
        evaluator = PpcEvaluator(self.get_regressor(), self.get_dataset())
        error_metrics = evaluator.evaluate(metrics)
        self.error_noscaled_metrics_table[0] = error_metrics['no_scaled']
        self.error_scaled_metrics_table[0] = error_metrics['scaled']
        
    def get_regressor(self):
        return KNeighborsRegressor()

## Neural network

In [None]:
class NeuralNetworkMlPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset, tot_seeds=0, epsilon=0.5):
        super(NeuralNetworkMlPpcEvaluator, self).__init__(dataset, tot_seeds)
        self.__epsilon=epsilon

    def evaluate_metrics(self, metrics):
        total_seeds = self.get_total_seeds()
        
        for i in range(0, total_seeds + 1):
            evaluator = PpcEvaluator(self.__get_regressor_using_seed(i), self.get_dataset())
            error_metrics = evaluator.evaluate(metrics)
            self.error_noscaled_metrics_table[i] = error_metrics['no_scaled']
            self.error_scaled_metrics_table[i] = error_metrics['scaled']
            
        if total_seeds > 1:
            self.__compute_mean_error()
            
    def __get_regressor_using_seed(self, seed):
        return MLPRegressor(epsilon=self.__epsilon, activation='logistic', random_state=seed)
    
    def get_regressor(self):
        return self.__get_regressor_using_seed(0)
    
    def __compute_mean_error(self):
        self.__compute_mean_error_of_dataframe(self.error_noscaled_metrics_table)
        self.__compute_mean_error_of_dataframe(self.error_scaled_metrics_table)
    
    @staticmethod
    def __compute_mean_error_of_dataframe(dataframe):
        dataframe['Mean']['Mean Abs Error'] = dataframe.iloc[0,:-1].mean()
        dataframe['Mean']['Mean Sqr Error'] = dataframe.iloc[1,:-1].mean()
        dataframe['Mean']['Mean Sqr Log Error'] = dataframe.iloc[2,:-1].mean()
        dataframe['Mean']['Mean Median Error'] = dataframe.iloc[3,:-1].mean()
        dataframe['Mean']['R2 Score'] = dataframe.iloc[4,:-1].mean()

## Custom

In [None]:
class CustomPpcEvaluator(MlPpcEvaluator):
    
    def __init__(self, dataset, regressor):
        super(CustomPpcEvaluator, self).__init__(dataset)
        self.__regressor = regressor
    
    def evaluate_metrics(self, metrics):
        evaluator = PpcEvaluator(self.__regressor, self.get_dataset())
        error_metrics = evaluator.evaluate(metrics)
        self.error_noscaled_metrics_table[0] = error_metrics['no_scaled']
        self.error_scaled_metrics_table[0] = error_metrics['scaled']
        
    def get_regressor(self):
        return self.__regressor