In [132]:
import joblib
import math
import multiprocessing as mp
import os
from typing import List, Tuple
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import KFold

ZSCORE_CRITICAL_VALUE = 1.645 # equivalent to p-value = 0.05

In [2]:
try:
    _ = first_run
except NameError:
    first_run = True
    os.chdir(os.getcwd().rsplit("/", 1)[0])

# Load Data

In [3]:
X_train, y_train = joblib.load("../data/train/preprocessed/train_features_labels.joblib.gz")

X_validation, y_validation = joblib.load("../data/train/preprocessed/validation_features_labels.joblib.gz")

# Hold your SMOTE for a moment

SMOTE has become a ubiquitous way to handle imbalanced classes by oversampling the minority class. However, the fact that many of our features have low variance due to a lot of zero values, generating artificial samples from them can actually become quite counterproductive. Thus, we will experiment with both SMOTE and a custom undersampler that tries to capture most of the variance of the majority class. Whichever strategy yields better results for our baseline model will be the one we move forward with.

### 1. Custom undersampler

In [133]:
class BinaryUndersampler:
    
    def __init__(self, n_iterations: int=5, n_jobs: int=-1):
        self.n_iterations = n_iterations
        self.highest_variance_sample = (None, None)
        self.n_jobs = min(mp.cpu_count(), n_jobs) if n_jobs > 1 else mp.cpu_count()
    
    def fit(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float, float, bool]:
        """Function to find a subsample of majority class (label) and its significance

        Return
        ------
        Tuple[
            X_subsample, numpy.ndarray: The selected subsample of the majority class
            y_subsample, numpy.ndarray: The respective labels of the subsample
            sample_variance, float: The subsample mean variance (mean of variance of each feature)
            sample_variance_zscore, float: The bootstrap Z-score of the `sample_variance` from `n_iterations`,
            is_sample_var_significantly_higher, bool: Whether the `sample_variance` is significantly higher than the bootstrap mean (p-value < 0.05)
        ]
        """
        self.majority_class = pd.value_counts(pd.Series(y)).index[0]
        self.n_splits = math.floor(X.shape[0] / np.sum(y!=self.majority_class))
        # split and select the sample with highest variance (decide how significant via bootstraping)
        with mp.Pool(self.n_jobs) as pool:
            bootstrap_samples = pool.starmap(
                self.get_highest_variance_sample,
                [(X[y==self.majority_class], y[y==self.majority_class], self.n_splits, idx) for idx in range(self.n_iterations)]
            )
        highest_zscore = sorted(stats.zscore(np.array(list(map(lambda tup: tup[-1], bootstrap_samples)))))[::-1][0]
        is_sample_var_significantly_higher = highest_zscore > ZSCORE_CRITICAL_VALUE
        highest_mean_var_sample = sorted(bootstrap_samples, key=lambda tup: tup[-1])[::-1][0]
        return (*highest_mean_var_sample, highest_zscore, is_sample_var_significantly_higher)
        

    def get_highest_variance_sample(self, X: np.ndarray, y_label: np.ndarray, n_splits: int, random_state: int=42):
        disjoint_samples = self._shuffle_split_observations(X, y_label, n_splits, random_state)
        disjoint_samples_mean_var = map(self._get_mean_feature_variance, disjoint_samples)
        highest_mean_var_sample = sorted(disjoint_samples_mean_var, key=lambda tup: tup[-1])[::-1][0]
        return highest_mean_var_sample

    def _shuffle_split_observations(self, X: np.ndarray, y_label: np.ndarray, n_splits: int, random_state: int=42) -> List:
        """Return a list of disjoint samples with the `y_label` concatenated at axis=1 (column)"""
        np.random.seed(random_state)
        return np.array_split(np.random.permutation(np.concatenate((X, y_label.reshape(-1, 1)), axis=1)), n_splits)

    def _get_mean_feature_variance(self, sample: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float]:
        X, y_label = sample[:, :-1], sample[:, -1]
        mean_variance = np.mean(np.ma.var(X, axis=0))
        return (X, y_label, mean_variance)

In [134]:
BinaryUndersampler().fit(X_train, y_train)