In [29]:
import matplotlib.pyplot as plt
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import seaborn as sns
from sklearn.model_selection import KFold
from sklearn.feature_selection import mutual_info_classif
import copy
import plotly.graph_objs as go
from sklearn.decomposition import PCA
from sklearn.metrics import classification_report
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import Birch
import numpy as np

In [None]:
# Local Outlier Factor = KNN
# Isolation forest = random forest
# One-Class SVM = SVM
# Elliptic Envelope = Logistic Regression

In [61]:
class DataAnalyzer:
    def __init__(self, balance_weights = True, algo = "knn"):
        file_path = 'Taiwanese Bankruptcy Prediction.csv'

        self.dataset = pd.read_csv(file_path, header=None)
        self.dataset.columns = self.dataset.iloc[0]

        self.dataset = self.dataset.drop(0)
        self.original_dataset = self.dataset
        self.fill_nan_vals()
        self.convert_columns()
        self.balance_weights = balance_weights
        self.algo = algo
        self.knn_weight = "uniform"
        self.class_weight = None
        if balance_weights:
            self.knn_weight = "distance"
            self.class_weight = "balanced"
        print(self.class_weight)

    def fill_nan_vals(self):
        for column in self.dataset.columns:
            if self.dataset[column].dtype == 'object':  # Non-numerical column
                mode_val = self.dataset[column].mode()[0]
                self.dataset[column] = self.dataset[column].fillna(mode_val)
            else:  # Numerical column
                mean_val = self.dataset[column].mean()
                self.dataset[column] = self.dataset[column].fillna(mean_val)
    
    def convert_columns(self):
        # Convert all columns to float except 'Bankrupt?' and 'Net Income Flag'
        for column in self.dataset.columns:
            if column not in ['Bankrupt?', 'Net Income Flag']:
                self.dataset[column] = self.dataset[column].astype(float)
            else:
                self.dataset[column] = self.dataset[column].astype(int)

    def show_class_distribution(self):
        target_column = 'Bankrupt?'
        # Load your dataset into a pandas DataFrame

        # Step 2: Inspect the Target Variable
        class_distribution = self.dataset[target_column].value_counts()

        # Step 3: Visualize the Distribution
        class_distribution.plot(kind='bar', title='Target Variable Distribution')
        print(class_distribution)
        plt.xlabel('Class')
        plt.ylabel('Count')
        plt.show()

    def show_heat_map(self, dataset = None):
        if dataset is None:
            corr_matrix = self.dataset.corr()
        else:
            corr_matrix = dataset.corr()
        plt.figure(figsize=(20, 20))
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f")
        plt.title("Correlation Matrix")
        plt.show()

    def get_x_and_Y(self):
        self.X = self.dataset.drop('Bankrupt?', axis=1)  # Assuming 'p' is the target variable
        self.y = self.dataset['Bankrupt?']
        return (self.X, self.y)
    
    def perform_manual_splitting_without_cv(self):
        X_train, self.X_test_final, y_train, self.y_test_final = train_test_split(self.X, self.y, test_size=0.2, random_state=0)
        self.X_train, self.y_train = X_train, y_train
    
    def perform_manual_splitting_cv(self):
        # First split to get training set and first test set
        X_train, X_temp, y_train, y_temp = train_test_split(self.X, self.y, test_size=0.4, random_state=0)
        self.X_train, self.y_train = X_train, y_train
        self.X_cv, self.X_test_final, self.y_cv, self.y_test_final = train_test_split(X_temp, y_temp, test_size=0.5, random_state=0)
    
    def perform_kfold_cv(self, num_of_columns = None):
        X_train, X_test_final, y_train, y_test_final = train_test_split(self.X, self.y, test_size=0.2, random_state=0)
        if num_of_columns:
            X_train = copy.deepcopy(X_train.iloc[:, :num_of_columns])
            X_test_final = copy.deepcopy(X_test_final.iloc[:, :num_of_columns])
        self.X_train = X_train
        self.y_train = y_train.astype(int)
        self.X_test_final = X_test_final.reindex(X_train.columns, axis=1)
        self.y_test_final = y_test_final.astype(int)
        if self.algo == "1_svm":
            classifier = OneClassSVM(gamma='auto')
        elif self.algo == "iso_f":
            classifier = IsolationForest(random_state=42, contamination='auto')
        elif self.algo == "out_fa":
            classifier = LocalOutlierFactor(novelty=True)
        elif self.algo == "ee":
            classifier = EllipticEnvelope()
        elif self.algo == "1_b":
            classifier = Birch()

        # Perform k-fold cross-validation
        kf = KFold(n_splits=5, shuffle=True, random_state=0)

        scores = []
        best_model = None
        best_avg_score = 0.0 

        for train_index, val_index in kf.split(X_train):
            X_train_fold, X_val_fold = X_train.iloc[train_index], X_train.iloc[val_index]
            y_train_fold, y_val_fold = y_train.iloc[train_index].astype(int), y_train.iloc[val_index].astype(int)

            # Train the model on the training fold
            X_train_smote, y_train_smote = X_train_fold, y_train_fold
            classifier.fit(X_train_smote, y_train_smote)

            # Evaluate the model on the validation fold
            y_val_pred = classifier.predict(X_val_fold)
            score = accuracy_score(y_val_fold, y_val_pred)

            # Update the best model if the current model has a better average performance
            if score > best_avg_score:
                best_avg_score = score
                best_model = classifier
            scores.append(score)
        
        self.classifier = best_model
        return(best_avg_score, scores)
    
    def perform_filter_methods(self):
        data = copy.deepcopy(self.dataset)
        X = copy.deepcopy(data).drop(columns=['Bankrupt?'])  # Features
        print("total features = ", len(X.columns))
        y = copy.deepcopy(data['Bankrupt?']).astype(int) 
        # Calculate Mutual Information scores
        mi_scores = mutual_info_classif(X, y)
        # Select features with MI score > 0.5
        selected_features = X.columns[mi_scores > 0.01]
        print("selected features with high mutual information with the target = ", len(selected_features))
        # Calculate correlation matrix
        corr_matrix = X[selected_features].corr().abs()
        copy_ds = copy.deepcopy(data[selected_features])
        # Remove one of two highly correlated features
        to_drop = set()
        dropped_columns = {}
        for i in range(len(corr_matrix.columns)):
            if i not in dropped_columns:
                for j in range(len(corr_matrix.columns)):
                    if abs(corr_matrix.iloc[i, j]) > 0.8 and i!=j and j not in dropped_columns and i not in dropped_columns and i!=j:
                        
                        colname_i = corr_matrix.columns[i]
                        colname_j = corr_matrix.columns[j]
                        # Calculate the Mutual Information score of each feature with the target variable
                        mi_i = mutual_info_classif(X[colname_i].values.reshape(-1, 1), y)[0]
                        mi_j = mutual_info_classif(X[colname_j].values.reshape(-1, 1), y)[0]
                        # Keep the feature with higher Mutual Information score
                        if mi_i > mi_j:
                            to_drop.add(colname_j)
                            dropped_columns[j] = colname_j
                        else:
                            to_drop.add(colname_i)
                            dropped_columns[i] = colname_i
        final_selected_features = selected_features.drop(to_drop)
        print("final selected features after removing one of two correlated features = ", len(final_selected_features))
        self.dataset = self.dataset[final_selected_features]
        self.dataset["Bankrupt?"] = data["Bankrupt?"]
        self.show_heat_map()
        return self.dataset
    
    def perform_wrapper_method(self):
        feature_scores = []
        feature_scores_dict = {}
        total_num_of_columns = len(self.X.columns)
        for i in range(1,total_num_of_columns+1):
            newAnlyzr = DataAnalyzer(balance_weights=self.balance_weights)
            newAnlyzr.get_x_and_Y()
            (best_avg_score, scores) = newAnlyzr.perform_kfold_cv(num_of_columns=i)
            mean_score, y_tst, y_prd = newAnlyzr.get_accuracy()
            feature_scores.append((i, mean_score))
            feature_scores_dict[i] = mean_score
        self.feature_scores = feature_scores
        additional_feature_penalty = -0.05
        score_weight = 0.95
        adjusted_scores = []
        best_num_of_features = 1
        best_adjusted_score = 0
        for (j, mn_score) in feature_scores:
            adjusted_score = mn_score*score_weight + j*additional_feature_penalty/total_num_of_columns
            adjusted_scores.append((j, adjusted_score))
            if adjusted_score > best_adjusted_score:
                best_adjusted_score = adjusted_score
                best_num_of_features = j
        best_score = feature_scores_dict[best_num_of_features]

        return {
            "best_num_of_features": best_num_of_features,
            "best_score": best_score
        }
    
    def plot_wrapper_scores(self):
        x_values, y_values = zip(*self.feature_scores)
        # Create a Plotly trace
        trace = go.Scatter(x=x_values, y=y_values, mode='lines+markers')

        # Create a Plotly layout
        layout = go.Layout(
            title='Wrapper Method Feature Scores',
            xaxis=dict(title='Number of Features'),
            yaxis=dict(title='Accuracy')
        )

        # Create a Plotly figure
        fig = go.Figure(data=[trace], layout=layout)

        # Display the plot
        fig.show()

    def perform_pca(self, n_components=None):
        # Standardize the data
        scaler = StandardScaler()
        data = self.X
        standardized_data = scaler.fit_transform(data)

        # Create PCA object
        pca = PCA(n_components=n_components)

        # Fit and transform the data
        pca_data = pca.fit_transform(standardized_data)

        # Create a DataFrame for the PCA results
        pca_columns = [f"PC{i+1}" for i in range(pca_data.shape[1])]
        pca_df = pd.DataFrame(data=pca_data, columns=pca_columns)

        # Concatenate with original dataset
        self.X = pca_df

    
    def perform_without_cv(self):
        if self.algo == "1_svm":
            classifier = OneClassSVM(gamma='auto')
        elif self.algo == "iso_f":
            classifier = IsolationForest(random_state=42, contamination='auto')
        elif self.algo == "out_fa":
            classifier = LocalOutlierFactor(novelty=True)
        elif self.algo == "ee":
            classifier = EllipticEnvelope()
        elif self.algo == "1_b":
            classifier = Birch()
        self.classifier = classifier
        self.classifier.fit(self.X_train, self.y_train)
        y_pred1 = self.classifier.predict(self.X_test_final)
        self.ac_final = accuracy_score(self.y_test_final,y_pred1)
        return self.ac_final, self.y_test_final
    
    def one_svm(self):
        self.classifier = OneClassSVM(gamma='auto')
        self.classifier.fit(self.X_train)
        y_pred1 = self.classifier.predict(self.X_cv)
        y_pred1 = [0 if pred == 1 else 1 for pred in y_pred1]
        self.ac_cv = accuracy_score(self.y_cv,y_pred1)
        return self.ac_cv
    
    def isolation_forest(self):
        self.classifier = IsolationForest(random_state=42, contamination='auto')
        self.classifier.fit(self.X_train)
        y_pred = self.classifier.predict(self.X_cv)  # Perform prediction on the cross-validation set
        # distinct_values = np.unique(y_pred)
        # print(distinct_values)
        y_pred = [0 if pred == 1 else 1 for pred in y_pred]
        self.ac_cv = accuracy_score(self.y_cv, y_pred)  # Calculate accuracy
        return self.ac_cv
    
    def outlier_factor(self):
        self.classifier = LocalOutlierFactor(novelty=True)
        self.classifier.fit(self.X_train)
        y_pred = self.classifier.predict(self.X_cv)  # Perform prediction on the cross-validation set
        # distinct_values = np.unique(y_pred)
        # print(distinct_values)
        y_pred = [0 if pred == 1 else 1 for pred in y_pred]
        self.ac_cv = accuracy_score(self.y_cv, y_pred)  # Calculate accuracy
        return self.ac_cv
    
    def elliptic_envelope(self):
        self.classifier = EllipticEnvelope()
        self.classifier.fit(self.X_train)
        y_pred = self.classifier.predict(self.X_cv)  # Perform prediction on the cross-validation set
        # distinct_values = np.unique(y_pred)
        # print(distinct_values)
        y_pred = [0 if pred == 1 else 1 for pred in y_pred]
        self.ac_cv = accuracy_score(self.y_cv, y_pred)  # Calculate accuracy
        return self.ac_cv
    
    def one_birch(self):
        self.classifier = Birch()
        self.classifier.fit(self.X_train)
        y_pred = self.classifier.predict(self.X_cv)  # Perform prediction on the cross-validation set
        # distinct_values = np.unique(y_pred)
        # print(distinct_values)
        smallest_cluster_id = min(y_pred)
        # Treat data points in the smallest cluster as anomalies (label them as 1)
        y_pred = [1 if pred == smallest_cluster_id else 0 for pred in y_pred]
        # distinct_values = np.unique(y_pred)
        # print(distinct_values)
        self.ac_cv = accuracy_score(self.y_cv, y_pred)  # Calculate accuracy
        return self.ac_cv
    
    
    def get_accuracy(self):
        y_pred_final = self.classifier.predict(self.X_test_final)
        if self.algo == '1_b':
            smallest_cluster_id = min(y_pred_final)
            # Treat data points in the smallest cluster as anomalies (label them as 1)
            y_pred_final = [1 if pred == smallest_cluster_id else 0 for pred in y_pred_final]
        else:
            y_pred_final = [0 if pred == 1 else 1 for pred in y_pred_final]
        self.ac_final = accuracy_score(self.y_test_final,y_pred_final)
        return self.ac_final, self.y_test_final, y_pred_final

class Master:
    def __init__(self, balance_weights = True, algo = "knn"):
        self.results = {}
        self.balance_weights = balance_weights
        self.algo = algo

    def print_classification_report(self, y_true, y_pred):
        report = classification_report(y_true, y_pred, target_names=['Negative Class', 'Positive Class'], output_dict=True)
        print("{:<20} {:<15} {:<15} {:<15} {:<15}".format('', 'precision', 'recall', 'f1-score', 'support'))
        for class_name, metrics in report.items():
            if class_name in ['accuracy', 'macro avg', 'weighted avg']:
                continue
            print("{:<20} {:<15.2f} {:<15.2f} {:<15.2f} {:<15}".format(class_name,
                                                                       metrics['precision'],
                                                                       metrics['recall'],
                                                                       metrics['f1-score'],
                                                                       metrics['support']))
        
    def organize_results(self):
        items = []
        for k, v in self.results.items():
            print(k, v)
            if isinstance(v, dict):
                for xk, xv in v.items():
                    new_x_key = xk
                    items.append((new_x_key, xv))
            else:
                new_key = k
                items.append((new_key, v))
        r_items = dict(items)
        df = pd.DataFrame({'method': list(r_items.keys()), 'result': list(r_items.values())})
        return df

    def run_without_cv(self):
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.get_x_and_Y()
        anlyzr.perform_manual_splitting_without_cv()
        rslts, y_pred = anlyzr.perform_without_cv()
        self.results["run without cv"] = rslts
        self.print_classification_report(anlyzr.y_test_final, y_pred)

    def run_with_cv(self):
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.get_x_and_Y()
        
        anlyzr.perform_manual_splitting_cv()
        if self.algo == "1_svm":
            anlyzr.one_svm()
        elif self.algo == "iso_f":
            anlyzr.isolation_forest()
        elif self.algo == "out_fa":
            anlyzr.outlier_factor()
        elif self.algo == "ee":
            anlyzr.elliptic_envelope()
        elif self.algo == "1_b":
            anlyzr.one_birch()
        self.results["run with cv"], y_test_final, y_pred_final = anlyzr.get_accuracy()
        self.print_classification_report(y_test_final, y_pred_final)

    def run_with_kfold(self):
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.get_x_and_Y()
        (best_avg_score, scores) = anlyzr.perform_kfold_cv()
        rslt, y_test_final, y_pred_final =  anlyzr.get_accuracy()
        self.results["after_k_fold_run"] = {
            "best kfold cv score": best_avg_score,
            "scores": scores,
            "final kfold score on test dataset": rslt
        }
        self.print_classification_report(y_test_final, y_pred_final)

    def run_with_filter(self):
        # Implement feature selection using filter method (e.g., correlation)
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.perform_filter_methods()
        anlyzr.get_x_and_Y()
        anlyzr.perform_manual_splitting_cv()
        if self.algo == "1_svm":
            anlyzr.one_svm()
        elif self.algo == "iso_f":
            anlyzr.isolation_forest()
        elif self.algo == "out_fa":
            anlyzr.outlier_factor()
        elif self.algo == "ee":
            anlyzr.elliptic_envelope()
        elif self.algo == "1_b":
            anlyzr.one_birch()
        self.results["with_filter_method"], y_test_final, y_pred_final  = anlyzr.get_accuracy()
        self.print_classification_report(y_test_final, y_pred_final)

    def run_with_wrapper(self):
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.get_x_and_Y()
        anlyzr.perform_manual_splitting_cv()
        self.results = anlyzr.perform_wrapper_method()
        anlyzr.plot_wrapper_scores()

    def run_with_pca(self):
        # Implement feature selection using PCA
        anlyzr = DataAnalyzer(balance_weights=self.balance_weights, algo=self.algo)
        anlyzr.get_x_and_Y()
        anlyzr.perform_pca(n_components=2)
        anlyzr.perform_manual_splitting_cv()
        if self.algo == "1_svm":
            anlyzr.one_svm()
        elif self.algo == "iso_f":
            anlyzr.isolation_forest()
        elif self.algo == "out_fa":
            anlyzr.outlier_factor()
        elif self.algo == "ee":
            anlyzr.elliptic_envelope()
        elif self.algo == "1_b":
            anlyzr.one_birch()
        self.results["with_pca"], y_test_final, y_pred_final  = anlyzr.get_accuracy()
        self.print_classification_report(y_test_final, y_pred_final)
    
    def run_cv_problem(self):
        self.run_without_cv()
        self.run_with_cv()
        self.run_with_kfold()
        return self.results



In [25]:
filter_mstr = Master(balance_weights=True, algo="1_svm")
filter_mstr.run_with_pca()
filter_mstr.organize_results()

balanced
                     precision       recall          f1-score        support        
Negative Class       0.99            0.50            0.67            1315.0         
Positive Class       0.06            0.86            0.11            49.0           
with_pca 0.5146627565982405


Unnamed: 0,method,result
0,with_pca,0.514663


In [43]:
filter_mstr = Master(balance_weights=True, algo="iso_f")
filter_mstr.run_with_pca()
filter_mstr.organize_results()

balanced
                     precision       recall          f1-score        support        
Negative Class       0.98            0.89            0.93            1315.0         
Positive Class       0.13            0.45            0.20            49.0           
with_pca 0.8709677419354839


Unnamed: 0,method,result
0,with_pca,0.870968


In [47]:
filter_mstr = Master(balance_weights=True, algo="out_fa")
filter_mstr.run_with_pca()
filter_mstr.organize_results()

balanced
                     precision       recall          f1-score        support        
Negative Class       0.97            0.99            0.98            1315.0         
Positive Class       0.23            0.10            0.14            49.0           
with_pca 0.9552785923753666




Unnamed: 0,method,result
0,with_pca,0.955279


In [52]:
filter_mstr = Master(balance_weights=True, algo="ee")
filter_mstr.run_with_pca()
filter_mstr.organize_results()

balanced
                     precision       recall          f1-score        support        
Negative Class       0.98            0.91            0.94            1315.0         
Positive Class       0.15            0.43            0.22            49.0           
with_pca 0.8900293255131965


Unnamed: 0,method,result
0,with_pca,0.890029


In [62]:
filter_mstr = Master(balance_weights=True, algo="1_b")
filter_mstr.run_with_pca()
filter_mstr.organize_results()

balanced
                     precision       recall          f1-score        support        
Negative Class       0.84            0.17            0.28            1315.0         
Positive Class       0.01            0.12            0.01            49.0           
with_pca 0.16495601173020527


Unnamed: 0,method,result
0,with_pca,0.164956
