In [1]:
import numpy as np
import pandas as pd
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_selection import SelectorMixin
from sklearn.base import BaseEstimator
from sklearn.pipeline import make_pipeline
from sklearn.feature_selection import SelectorMixin, VarianceThreshold
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils import resample

In [2]:
data_set = pd.read_csv("../data/full_data_set.csv")
data_set = data_set.drop(columns=['class'])
target_col = "label"

### 1. Drop highly correlated features

In [3]:
class CorrelationThreshold(SelectorMixin, BaseEstimator):
    """
    Feature selector that removes correlated features.

    This feature selection algorithm looks only at the features (X), not the
    desired outputs (y), and can thus be used for unsupervised learning.

    Parameters
    ----------
    threshold : float
        Pairwise correlation threshold.
    """

    def __init__(self, threshold: float = None) -> None:
        self.threshold = threshold if threshold is not None else 1.0

    def fit(self, X, y=None):
        """
        Learn empirical correlations from X.

        Parameters
        ----------
        X : {array-like, sparse matrix}, shape (n_samples, n_features)
            Data from which to compute correlations, where `n_samples` is the
            number of samples and `n_features` is the number of features.
        y : any, default=None
            Ignored. This parameter exists only for compatibility with
            sklearn.pipeline.Pipeline.

        Returns
        -------
        self : object
            Returns the instance itself.
        """
        corr = np.abs(np.corrcoef(X, rowvar=False))
        self.mask = ~(np.tril(corr, k=-1) > self.threshold).any(axis=1)
        return self

    def _get_support_mask(self):
        return self.mask

In [4]:
# Preprocessing pipeline
scaler = make_pipeline(
            StandardScaler(),
            VarianceThreshold(),
            CorrelationThreshold(0.90),
        )

In [5]:
# Fit pipeline
preprocessing_pipeline = scaler.fit(data_set.drop(columns=['label']))

In [6]:
# Standardize data
preprocessed_data_set = preprocessing_pipeline.transform(data_set.drop(columns=['label']))

In [7]:
# Retrieved highly correlated columns
input_columns = data_set.drop(columns=['label']).columns
remaining_columns = input_columns  
for step_name, step in preprocessing_pipeline.named_steps.items():
    if hasattr(step, 'get_support'):  
        mask = step.get_support()
        remaining_columns = remaining_columns[mask]

list(set(input_columns) - set(remaining_columns))

['Nntc', 'Tot2DU', 'TotMaxDU', 'MaxTotDF', 'TotMaxDF']

In [8]:
remaining_columns

Index(['CBO', 'WMC', 'DIT', 'NOC', 'RFC', 'LCOM', 'NOM', 'NOPM', 'NOSM', 'NOF',
       'NOPF', 'NOSF', 'NOSI', 'LOC', 'Tot2Op', 'Max2Op', 'TotMaxOp',
       'MaxTotOp', 'Tot2Lev', 'Max2Lev', 'TotMaxLev', 'MaxTotLev', 'Tot2DF',
       'Max2DF', 'Max2DU', 'MaxTotDU', 'TotInMetCall', 'MaxInMetCall',
       'InOutDeg', 'PubMembers', 'Ncf', 'Nuf', 'Ncs', 'Ns', 'Nf', 'Ntsc',
       'Ndsc', 'Runtime', 'PassTestRatio', 'FailTestRatio', 'TotPassTestRatio',
       'TotFailTestRatio', 'NTestRunPerRT', 'Um', 'Md', 'Nmd', 'Gs', 'DDU'],
      dtype='object')

### 2. Create a balanced set via undersampling

In [9]:
# Encode labels
labels = data_set['label']
labels = pd.Series(labels)
label_to_int = {
    'Non-Faulty': 0,
    'Weakly-Faulty': 1,
    'Fairly-Faulty': 2,
    'Faulty': 3,
    'Strongly-Faulty': 4
}
class_labels = labels.map(label_to_int)

In [10]:
# Resampling data to create balanced set to the class (Strongly-Faulty) with the minimum number of data points
unique_classes = np.unique(class_labels)
X_resampled_list = []
y_resampled_list = []
# Find size of the smallest class
min_class_size = min([np.sum(class_labels == label) for label in unique_classes])
# Undersample each class
for label in unique_classes:
    X_class = preprocessed_data_set[class_labels == label]
    y_class = class_labels[class_labels == label]
    
    X_class_undersampled, y_class_undersampled = resample(
        X_class, y_class, n_samples=min_class_size, random_state=42
    )
    
    X_resampled_list.append(X_class_undersampled)
    y_resampled_list.append(y_class_undersampled)

# Concatenate all the resampled data
X_balanced = np.vstack(X_resampled_list)
y_balanced = np.hstack(y_resampled_list)


### 3. split into development / test sets (80:20)

In [11]:
X_dev_set, X_test, y_dev_set, y_test = train_test_split(
    X_balanced, y_balanced, test_size=0.2, stratify=y_balanced, random_state=42
)

In [12]:
X_dev_set.shape

(1640, 48)

In [13]:
X_test.shape

(410, 48)

In [None]:
# Save the development set
with open('development_set.pkl', 'wb') as f: # Same set can be found in ../data/development_set.pkl
    pickle.dump((X_dev_set, y_dev_set), f)
# Save the test set
with open('test_set.pkl', 'wb') as f: # Same set can be found in ../data/test_set.pkl
    pickle.dump((X_test, y_test), f)
# Save the list of remaining_columns to a pickle file
with open('metrics_concidered.pkl', 'wb') as file: # Same set can be found in ../data/metrics_concidered.pkl
    pickle.dump(remaining_columns.tolist(), file)