# Preparing the dataset and split it to 5 folds

In [None]:
from sklearn.model_selection import StratifiedKFold
import pandas as pd
import numpy as np

n_splits = 5
stratified_kfold = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
fold_count = 1

for train_index, test_index in stratified_kfold.split(X, y_filtered):
    train_index, test_index = np.array(train_index), np.array(test_index)

    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y_filtered.iloc[train_index], y_filtered.iloc[test_index]
    
    # use arbitrary for each fold name  
    train_csv_filename = f'_fold_{fold_count}_train.csv'
    test_csv_filename = f'_{fold_count}_test.csv'

    train_df = pd.DataFrame(data=np.column_stack((X_train, y_train)), columns=np.append(X.columns.values, 'target'))
    test_df = pd.DataFrame(data=np.column_stack((X_test, y_test)), columns=np.append(X.columns.values, 'target'))

    train_df.to_csv(train_csv_filename, index=False)
    test_df.to_csv(test_csv_filename, index=False)

    fold_count += 1


In [None]:
import pandas as pd

# Use this for each fold in each iteration 


merged_df_train = pd.read_csv('_fold_1_train.csv')
new_tr_X_train = merged_df_train.drop(columns=['target'])
new_tr_Y_train = merged_df_train['target'] 

merged_df_test = pd.read_csv('_1_test.csv')
new_tr_X_test = merged_df_test.drop(columns=['target'])
new_tr_y_test = merged_df_test['target'] 

# DeepInsight Transformation Code


This cell of code was obtained from the following URL: https://github.com/alok-ai-lab/pyDeepInsight

In [None]:
import numpy as np

"""
A Numpy implementation of the Asymmetric Greedy Search (AGS) algorithm
described in 'A heuristic for the time constrained asymmetric linear sum
assignment problem'

DOI:10.1007/s10878-015-9979-2
"""


def _initial(benefit_matrix, shuffle: bool = False):
    """Initialize the assignment solution array. Sequentially assign each
    row to an unassigned column with the maximum benefit

    Args:
        benefit_matrix: a 2d array of benefit values
        shuffle: if True, randomly shuffle the order of the rows prior to the
            initial assignment. The default value is False to match the
            description in the paper

    Returns:
        a 1d array of row assignments

    """
    bm = benefit_matrix.copy()
    assignment = np.empty((bm.shape[0]), dtype=np.int64)
    rows = np.arange(bm.shape[0])
    if shuffle:
        np.random.shuffle(rows)
    for n in rows:
        max_idx = np.argmax(bm[n, :])
        assignment[n] = max_idx
        bm[:, max_idx] = np.NINF

    return assignment


def _row_swap_cost(benefit_matrix, assignment, row_idx):
    """Calculate the costs of swapping a column assignments for a given row
    with all other rows and return the swap with the greatest benefit

    Args:
        benefit_matrix: a 2d array of benefit values
        assignment: a 1d array of column assignments
        row_idx: the row index on which to calculate swap costs

    Returns:
        a tuple of the best swap row and the associated benefit

    """
    swap_cost = benefit_matrix[row_idx, assignment] + \
        benefit_matrix[:, assignment[row_idx]]
    curr_cost = benefit_matrix[row_idx, assignment[row_idx]] + \
        benefit_matrix[np.arange(benefit_matrix.shape[0]), assignment]
    cost = swap_cost - curr_cost
    cost[row_idx] = np.NINF
    best_row = np.argmax(cost)
    best_row_benefit = cost[best_row]
    return best_row, best_row_benefit


def _best_row_swap(benefit_matrix, assignment):
    """Determine the best row swap for all rows

    Args:
        benefit_matrix: a 2d array of benefit values
        assignment: a 1d array of column assignments

    Returns:
        a tuple of arrays for best swap row and the associated benefits
    """
    best_row, best_row_benefit = np.stack(
        [_row_swap_cost(benefit_matrix, assignment, r) for r in
         np.arange(assignment.shape[0])]).T
    best_row = best_row.astype(np.int64)
    return best_row, best_row_benefit


def _col_swap_cost(benefit_matrix, assignment, row_idx):
    """Calculate the cost of swapping column assignments for a given row to
    unassigned columns and return the best column and the associated benefit

    benefit_matrix: a 2d array of benefit values
        assignment: a 1d array of column assignments
        row_idx: the row index on which to calculate swap costs

    Returns:
        a tuple of the best swap column and the associated benefit
    """
    valid_idx = np.delete(np.arange(benefit_matrix.shape[1]), assignment)
    best_col = valid_idx[benefit_matrix[row_idx, valid_idx].argmax()]
    best_col_benefit = benefit_matrix[row_idx, best_col]
    return best_col, best_col_benefit


def _best_col_swap(benefit_matrix, assignment):
    """Determine the best unassigned column swap for all rows

    Args:
        benefit_matrix: a 2d array of benefit values
        assignment: a 1d array of column assignments

    Returns:
        a tuple of arrays for best unassigned columns and the associated
         benefits
    """
    row_idx = np.arange(assignment.shape[0])
    bm_unused = benefit_matrix.copy()
    bm_unused[:, assignment] = np.NINF
    best_col = np.argmax(bm_unused, axis=1)
    best_col_benefit = bm_unused[row_idx, best_col]
    return best_col, best_col_benefit


def _row_swap(benefit_matrix, assignment, best_row, br_benefit,
              best_col, bc_benefit, r_idx):
    """Swap columns assignments of given row with the best option and then
    update swap benefit matrices.

    Args:
        benefit_matrix: a 2d array of benefit values
        assignment: a 1d array of column assignments
        best_row: a 1d array of the best row swap for each row
        br_benefit: a 1d array of the benefit associated with the best row swap
        best_col: a 1d array of the best unassigned column for each row
        bc_benefit: a 1d array of the benefit associated with the best column
        r_idx: row to swap

    Return:
        A tuple of updated assignment and benefit/swap matrices
    """
    rs_idx = best_row[r_idx]
    # switch assignments
    assignment[[r_idx, rs_idx]] = assignment[[rs_idx, r_idx]]
    # update row swap matrices
    for idx in (r_idx, rs_idx):
        new_row, new_benefit = _row_swap_cost(benefit_matrix, assignment, idx)
        best_row[idx] = new_row
        br_benefit[idx] = new_benefit
    # update the column assignment matrices
    for idx in (r_idx, rs_idx):
        new_row, new_benefit = _col_swap_cost(benefit_matrix, assignment, idx)
        best_col[idx] = new_row
        bc_benefit[idx] = new_benefit
    return assignment, best_row, br_benefit, best_col, bc_benefit


def _col_swap(benefit_matrix, assignment, best_row, br_benefit,
              best_col, bc_benefit, r_idx):
    """Swap columns assignment of given row with the best option unassigned
        column and then update swap benefit matrices.

        Args:
            benefit_matrix: a 2d array of benefit values
            assignment: a 1d array of column assignments
            best_row: a 1d array of the best row swap for each row
            br_benefit: a 1d array of the benefit associated with the best
            row swap
            best_col: a 1d array of the best unassigned column for each row
            bc_benefit: a 1d array of the benefit associated with the best
                column
            r_idx: row to swap

        Return:
            A tuple of updated assignment and benefit/swap matrices
    """
    assignment[r_idx] = best_col[r_idx]
    # update best row (benefit)
    new_row, new_benefit = _row_swap_cost(benefit_matrix, assignment, r_idx)
    best_row[r_idx] = new_row
    br_benefit[r_idx] = new_benefit
    # update best column (benefit)
    new_row, new_benefit = _col_swap_cost(benefit_matrix, assignment, r_idx)
    best_col[r_idx] = new_row
    bc_benefit[r_idx] = new_benefit
    return assignment, best_row, br_benefit, best_col, bc_benefit


def asymmetric_greedy_search(benefit_matrix, shuffle=False, minimize=False):
    """A python implementation of the algorithm described in 'A heuristic for
        the time constrained asymmetric linear sum assignment problem'

        Args:
            benefit_matrix: a 2d array of benefit or cost values
            shuffle: set to True to randomize order of row initialization
            minimize: set to True if a cost matrix rather than a benefit
                matrix is provided

        Returns:
            a tuple of row indices and assigned column indices
    """

    bm = benefit_matrix
    if minimize:
        bm = -benefit_matrix.copy()

    assignment = _initial(bm, shuffle=shuffle)
    brs, brb = _best_row_swap(bm, assignment)
    bcs, bcb = _best_col_swap(bm, assignment)

    brb_max = np.amax(brb)
    bcb_max = np.amax(bcb)

    while brb_max > 0 or bcb_max > 0:
        while brb_max > 0 or bcb_max > 0:
            if brb_max > bcb_max:
                r = np.argmax(brb)
                assignment, brs, brb, bcs, bcb = \
                    _row_swap(bm, assignment, brs, brb, bcs, bcb, r)
            else:
                r = np.argmax(bcb)
                assignment, brs, brb, bcs, bcb = \
                    _col_swap(bm, assignment, brs, brb, bcs, bcb, r)
            brb_max = np.amax(brb)
            bcb_max = np.amax(bcb)
        brs, brb = _best_row_swap(bm, assignment)
        bcs, bcb = _best_col_swap(bm, assignment)
        brb_max = np.amax(brb)
        bcb_max = np.amax(bcb)

    return np.arange(bm.shape[0]), assignment


from typing import Optional
import numpy as np

class Norm2Scaler:
    """Log normalize and scale data

    Log normalization and scaling procedure as described as norm-2 in the
    DeepInsight paper supplementary information.
    """

    def __init__(self) -> None:
        pass

    def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None):
        self._min0 = X.min(axis=0)
        self._max = np.log(X + np.abs(self._min0) + 1).max()
        return self

    def fit_transform(self, X: np.ndarray, y: Optional[np.ndarray] = None
                      ) -> np.ndarray:
        self._min0 = X.min(axis=0)
        X_norm = np.log(X + np.abs(self._min0) + 1)
        self._max = X_norm.max()
        return X_norm / self._max

    def transform(self, X: np.ndarray, y: Optional[np.ndarray] = None
                  ) -> np.ndarray:
        X_norm = np.log(X + np.abs(self._min0) + 1).clip(0, None)
        return (X_norm / self._max).clip(0, 1)

    
from typing import Sequence
import math
import torch
from torch.utils.data.sampler import Sampler, RandomSampler


class StratifiedEventBatchSampler(Sampler):
    """Samples elements with from a set with binary labelling to ensure
    the event label (1) is evenly distributed across batches.

    This sampler is useful when the loss function requires at least one event,
    such as in the case of a Cox Proportional Hazard based loss.
    """

    events: torch.Tensor
    batch_size: int

    def __init__(self, events: Sequence[int], batch_size: int) -> None:
        """Generate an StratifiedBinaryBatchSampler instance

        Args:
            events: int sequence of binary event labels (0, 1)
            batch_size: int that defines size of mini-batch.
        """
        if not isinstance(batch_size, int) or batch_size <= 0:
            raise ValueError("batch_size should be a positive integer value, "
                             "but got batch_size={}".format(batch_size))
        self.events = torch.as_tensor(events, dtype=torch.int64)
        self.batch_size = batch_size

        self.events0_idx = torch.where(self.events == 0)[0]
        self.events1_idx = torch.where(self.events == 1)[0]

        self._len = math.ceil(len(self.events) / self.batch_size)

        if self.events1_idx.shape[0] < self._len:
            raise ValueError("the number of events ({}) must be equal or "
                             "larger than the number of batches ({})"
                             .format(self.events1_idx.shape[0], self._len))

        # Get batch sizes for each
        self.batch0_size = math.ceil(
            self.events0_idx.shape[0] / self.events.shape[0] * batch_size)
        self.batch1_size = math.floor(
            self.events1_idx.shape[0] / self.events.shape[0] * batch_size)

        self.sampler0 = RandomSampler(self.events0_idx, replacement=False)
        self.sampler1 = RandomSampler(self.events1_idx, replacement=False)

    def __iter0__(self):
        """Iterate the non-event (0) label sampler"""
        batch = torch.tensor([], dtype=torch.int64)
        for idx in self.sampler0:
            idx0 = self.events0_idx[idx, None]
            batch = torch.cat((batch, idx0), 0)
            if batch.shape[0] == self.batch0_size:
                yield batch
                batch = torch.tensor([], dtype=torch.int64)
        if batch.shape[0] > 0:
            yield batch

    def __iter1__(self):
        """Iterate the event (0) label sampler"""
        batch = torch.tensor([], dtype=torch.int64)
        for idx in self.sampler1:
            idx1 = self.events1_idx[idx, None]
            batch = torch.cat((batch, idx1), 0)
            if batch.shape[0] == self.batch1_size:
                yield batch
                batch = torch.tensor([], dtype=torch.int64)
        if batch.shape[0] > 0:
            yield batch

    def __iter__(self):
        """Generate the indices for the next batch of elements"""
        for batch0, batch1 in zip(self.__iter0__(), self.__iter1__()):
            batch = torch.cat((batch0, batch1), 0).sort()[0]
            yield batch

    # Removed - https://github.com/Lightning-AI/lightning/issues/2429
    # def __len__(self):
    #     """Return the number of batches"""
    #     return self._len

    
from typing import Union, Any, Optional, Tuple
from typing_extensions import Protocol
from numpy.typing import ArrayLike

import numpy as np
import pandas as pd
from sklearn.decomposition import PCA, KernelPCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from scipy.spatial import ConvexHull
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment
from matplotlib import pyplot as plt
import inspect

# from .utils import asymmetric_greedy_search

class ManifoldLearner(Protocol):
    def fit_transform(self: 'ManifoldLearner',
                      X: np.ndarray) -> np.ndarray: pass


class ImageTransformer:
    """Transform features to an image matrix using dimensionality reduction

    This class takes in data normalized between 0 and 1 and converts it to a
    CNN compatible 'image' matrix

    """

    def __init__(self, feature_extractor: Union[str, ManifoldLearner] = 'tsne',
                 discretization: str = 'bin',
                 pixels: Union[int, Tuple[int, int]] = (224, 224)) -> None:
        """Generate an ImageTransformer instance

        Args:
            feature_extractor: string of value ('tsne', 'pca', 'kpca') or a
                class instance with method `fit_transform` that returns a
                2-dimensional array of extracted features.
            discretization: string of values ('bin', 'assignment'). Defines
                the method for discretizing dimensionally reduced data to pixel
                coordinates.
            pixels: int (square matrix) or tuple of ints (height, width) that
                defines the size of the image matrix.
        """
        self._fe = self._parse_feature_extractor(feature_extractor)
        self._dm = self._parse_discretization(discretization)
        self._pixels = self._parse_pixels(pixels)
        self._xrot = np.empty(0)
        self._coords = np.empty(0)

    @staticmethod
    def _parse_pixels(pixels: Union[int, Tuple[int, int]]) -> Tuple[int, int]:
        """Check and correct pixel parameter

        Args:
            pixels: int (square matrix) or tuple of ints (height, width) that
                defines the size of the image matrix.
        """
        if isinstance(pixels, int):
            pixels = (pixels, pixels)
        return pixels

    @staticmethod
    def _parse_feature_extractor(
            feature_extractor: Union[str, ManifoldLearner]) -> ManifoldLearner:
        """Validate the feature extractor value passed to the
        constructor method and return correct method

        Args:
            feature_extractor: string of value ('tsne', 'pca', 'kpca') or a
                class instance with method `fit_transform` that returns a
                2-dimensional array of extracted features.

        Returns:
            function
        """
        if isinstance(feature_extractor, str):
            fe = feature_extractor.casefold()
            if fe == 'tsne'.casefold():
                fe_func = TSNE(n_components=2, metric='cosine')
            elif fe == 'pca'.casefold():
                fe_func = PCA(n_components=2)
            elif fe == 'kpca'.casefold():
                fe_func = KernelPCA(n_components=2, kernel='rbf')
            else:
                raise ValueError(
                    f"feature_extractor '{feature_extractor}' not valid")
        elif hasattr(feature_extractor, 'fit_transform') and \
                inspect.ismethod(feature_extractor.fit_transform):
            fe_func = feature_extractor
        else:
            raise TypeError('Parameter feature_extractor is not a '
                            'string nor has method "fit_transform"')
        return fe_func

    @classmethod
    def _parse_discretization(cls, method: str):
        """Validate the discretization value passed to the
        constructor method and return correct function

        Args:
            method: string of value ('bin', 'assignment')

        Returns:
            function
        """
        if method == 'bin':
            func = cls.coordinate_binning
        elif method == 'assignment' or method == 'lsa':
            func = cls.coordinate_optimal_assignment
        elif method == 'ags':
            func = cls.coordinate_heuristic_assignment
        else:
            raise ValueError(f"discretization method '{method}' not valid")
        return func

    @classmethod
    def coordinate_binning(cls, position: np.ndarray,
                           px_size: Tuple[int, int]) -> np.ndarray:
        """Determine the pixel locations of each feature based on the overlap of
        feature position and pixel locations.

        Args:
            position: a 2d array of feature coordinates
            px_size: tuple with image dimensions

        Returns:
            a 2d array of feature to pixel mappings
        """
        scaled = cls.scale_coordinates(position, px_size)
        px_binned = np.floor(scaled).astype(int)
        # Need to move maximum values into the lower bin
        px_binned[:, 0][px_binned[:, 0] == px_size[0]] = px_size[0] - 1
        px_binned[:, 1][px_binned[:, 1] == px_size[1]] = px_size[1] - 1
        return px_binned

    @staticmethod
    def lsap_optimal_solution(cost_matrix):
        return linear_sum_assignment(cost_matrix)

    @staticmethod
    def lsap_heuristic_solution(cost_matrix):
        return asymmetric_greedy_search(cost_matrix,
                                        shuffle=True,
                                        minimize=True)

    @classmethod
    def coordinate_optimal_assignment(cls, position: np.ndarray,
                                      px_size: Tuple[int, int]) -> np.ndarray:
        """Determine the pixel location of each feature using a linear sum
        assignment problem solution on the exponential on the euclidean
        distances between the features and the pixels

        Args:
            position: a 2d array of feature coordinates
            px_size: tuple with image dimensions

        Returns:
            a 2d array of feature to pixel mappings
        """
        scaled = cls.scale_coordinates(position, px_size)
        px_centers = cls.calculate_pixel_centroids(px_size)

        # calculate distances
        k = np.prod(px_size)
        clustered = scaled.shape[0] > k
        if clustered:
            kmeans = KMeans(n_clusters=k).fit(scaled)
            cl_labels = kmeans.labels_
            cl_centers = kmeans.cluster_centers_
            dist = cdist(cl_centers, px_centers, metric='euclidean')
        else:
            dist = cdist(scaled, px_centers, metric='euclidean')
        # assignment of features/clusters to pixels
        lsa = cls.lsap_optimal_solution(dist)
        px_assigned = np.empty(scaled.shape, dtype=int)
        for i in range(scaled.shape[0]):
            if clustered:
                # The feature at i
                # Is mapped to the cluster j=clabl[i]
                # Which is mapped to the pixel center clust_cntr[j]
                # Which is mapped to the pixel k = lsa[1][j]
                # For pixel k, x = k % px_size[0] and y = k // px_size[0]
                j = cl_labels[i]
            else:
                j = i
            ki = lsa[1][j]
            xi = ki % px_size[0]
            yi = ki // px_size[0]
            px_assigned[i] = [yi, xi]
        return px_assigned

    @classmethod
    def coordinate_heuristic_assignment(cls, position: np.ndarray,
                                        px_size: Tuple[int, int]) -> np.ndarray:

        scaled = cls.scale_coordinates(position, px_size)
        px_centers = cls.calculate_pixel_centroids(px_size)

        # calculate distances
        # AGS requires asymmetric assignment so k must be less than pixels
        k = np.prod(px_size) - 1
        clustered = scaled.shape[0] > k
        if clustered:
            kmeans = KMeans(n_clusters=k).fit(scaled)
            cl_labels = kmeans.labels_
            cl_centers = kmeans.cluster_centers_
            dist = cdist(cl_centers, px_centers, metric='euclidean')
        else:
            dist = cdist(scaled, px_centers, metric='euclidean')
        # assignment of features/clusters to pixels
        lsa = cls.lsap_heuristic_solution(dist)
        px_assigned = np.empty(scaled.shape, dtype=int)
        for i in range(scaled.shape[0]):
            if clustered:
                j = cl_labels[i]
            else:
                j = i
            ki = lsa[1][j]
            xi = ki % px_size[0]
            yi = ki // px_size[0]
            px_assigned[i] = [yi, xi]
        return px_assigned

    @staticmethod
    def calculate_pixel_centroids(px_size: Tuple[int, int]) -> np.ndarray:
        """Generate a 2d array of the centroid of each pixel

        Args:
            px_size: tuple with image dimensions

        Returns:
            a 2d array of pixel centroid locations
        """
        px_map = np.empty((np.prod(px_size), 2))
        for i in range(0, px_size[0]):
            for j in range(0, px_size[1]):
                px_map[i * px_size[0] + j] = [i, j]
        px_centroid = px_map + 0.5
        return px_centroid

    def fit(self, X: np.ndarray, y: Optional[ArrayLike] = None,
            plot: bool = False):
        """Train the image transformer from the training set (X)

        Args:
            X: {array-like, sparse matrix} of shape (n_samples, n_features)
            y: Ignored. Present for continuity with scikit-learn
            plot: boolean of whether to produce a scatter plot showing the
                feature reduction, hull points, and minimum bounding rectangle

        Returns:
            self: object
        """
        # perform dimensionality reduction
        x_new = self._fe.fit_transform(X.T)
        # get the convex hull for the points
        chvertices = ConvexHull(x_new).vertices
        hull_points = x_new[chvertices]
        # determine the minimum bounding rectangle
        mbr, mbr_rot = self._minimum_bounding_rectangle(hull_points)
        # rotate the matrix
        # save the rotated matrix in case user wants to change the pixel size
        self._xrot = np.dot(mbr_rot, x_new.T).T
        # determine feature coordinates based on pixel dimension
        self._calculate_coords()
        # plot rotation diagram if requested
        if plot is True:
            plt.scatter(x_new[:, 0], x_new[:, 1], s=1,
                        cmap=plt.cm.get_cmap("jet", 10), alpha=0.2)
            plt.fill(x_new[chvertices, 0], x_new[chvertices, 1],
                     edgecolor='r', fill=False)
            plt.fill(mbr[:, 0], mbr[:, 1], edgecolor='g', fill=False)
            plt.gca().set_aspect('equal', adjustable='box')
            plt.show()
        return self

    @property
    def pixels(self) -> Tuple[int, int]:
        """The image matrix dimensions

        Returns:
            tuple: the image matrix dimensions (height, width)

        """
        return self._pixels

    @pixels.setter
    def pixels(self, pixels: Union[int, Tuple[int, int]]) -> None:
        """Set the image matrix dimension

        Args:
            pixels: int or tuple with the dimensions (height, width)
            of the image matrix

        """
        if isinstance(pixels, int):
            pixels = (pixels, pixels)
        self._pixels = pixels
        # recalculate coordinates if already fit
        if hasattr(self, '_coords'):
            self._calculate_coords()

    @staticmethod
    def scale_coordinates(coords: np.ndarray, dim_max: ArrayLike) -> np.ndarray:
        """Transforms a list of n-dimensional coordinates by scaling them
        between zero and the given dimensional maximum

        Args:
            coords: a 2d ndarray of coordinates
            dim_max: a list of maximum ranges for each dimension of coords

        Returns:
            a 2d ndarray of scaled coordinates
        """
        data_min = coords.min(axis=0)
        data_max = coords.max(axis=0)
        std = (coords - data_min) / (data_max - data_min)
        scaled = np.multiply(std, dim_max)
        return scaled

    def _calculate_coords(self) -> None:
        """Calculate the matrix coordinates of each feature based on the
        pixel dimensions.
        """
        scaled = self.scale_coordinates(self._xrot, self._pixels)
        px_coords = self._dm(scaled, self._pixels)
        self._coords = px_coords

    def transform(self, X: np.ndarray, img_format: str = 'rgb',
                  empty_value: int = 0) -> np.ndarray:
        """Transform the input matrix into image matrices

        Args:
            X: {array-like, sparse matrix} of shape (n_samples, n_features)
                where n_features matches the training set.
            img_format: The format of the image matrix to return.
                'scalar' returns an array of shape (M, N). 'rgb' returns
                a numpy.ndarray of shape (M, N, 3) that is compatible with PIL.
            empty_value: numeric value to fill elements where no features are
                mapped. Default = 0.

        Returns:
            A list of n_samples numpy matrices of dimensions set by
            the pixel parameter
        """
        img_coords = pd.DataFrame(np.vstack((
            self._coords.T,
            X
        )).T).groupby([0, 1], as_index=False).mean()

        img_list = []
        blank_mat = np.zeros(self._pixels)
        if empty_value != 0:
            blank_mat[:] = empty_value
        for z in range(2, img_coords.shape[1]):
            img_matrix = blank_mat.copy()
            img_matrix[img_coords[0].astype(int),
                       img_coords[1].astype(int)] = img_coords[z]
            img_list.append(img_matrix)

        # img_matrices = np.empty(0) ---- REMOVE?
        if img_format == 'rgb':
            img_matrices = np.array([self._mat_to_rgb(m) for m in img_list])
        elif img_format == 'scalar':
            img_matrices = np.stack(img_list)
        else:
            raise ValueError(f"'{img_format}' not accepted for img_format")

        return img_matrices

    def fit_transform(self, X: np.ndarray, **kwargs: Any) -> np.ndarray:
        """Train the image transformer from the training set (X) and return
        the transformed data.

        Args:
            X: {array-like, sparse matrix} of shape (n_samples, n_features)

        Returns:
            A list of n_samples numpy matrices of dimensions set by
            the pixel parameter
        """
        self.fit(X)
        return self.transform(X, **kwargs)

    def inverse_transform(self, img: np.ndarray) -> np.ndarray:
        """Transform an image layer back to its original space.
            Args:
                img:

            Returns:
                A list of n_samples numpy matrices of dimensions set by
                the pixel parameter
        """
        if img.ndim == 2 and img.shape == self._pixels:
            X = img[self._coords[:, 0], self._coords[:, 1]]
        elif img.ndim == 3 and img.shape[-2:] == self._pixels:
            X = img[:, self._coords[:, 0], self._coords[:, 1]]
        elif img.ndim == 3 and img.shape[0:2] == self._pixels:
            X = img[self._coords[:, 0], self._coords[:, 1], :]
        elif img.ndim == 4 and img.shape[1:3] == self._pixels:
            X = img[:, self._coords[:, 0], self._coords[:, 1], :]
        else:
            raise ValueError((f"Expected dimensions of (B, {self._pixels[0]}, "
                              f"{self._pixels[1]}, C) where B and C are "
                              f"optional, but got {img.shape}"))
        return X

    def feature_density_matrix(self) -> np.ndarray:
        """Generate image matrix with feature counts per pixel

        Returns:
            img_matrix (ndarray): matrix with feature counts per pixel
        """
        fdmat = np.zeros(self._pixels)
        np.add.at(fdmat, tuple(self._coords.T), 1)
        return fdmat

    def coords(self) -> np.ndarray:
        """Get feature coordinates

        Returns:
            ndarray: the pixel coordinates for features
        """
        return self._coords.copy()

    @staticmethod
    def _minimum_bounding_rectangle(hull_points: np.ndarray
                                    ) -> Tuple[np.ndarray, np.ndarray]:
        """Find the smallest bounding rectangle for a set of points.

        Modified from JesseBuesking at https://stackoverflow.com/a/33619018
        Returns a set of points representing the corners of the bounding box.

        Args:
            hull_points : an nx2 matrix of hull coordinates

        Returns:
            (tuple): tuple containing
                coords (ndarray): coordinates of the corners of the rectangle
                rotmat (ndarray): rotation matrix to align edges of rectangle
                    to x and y
        """

        pi2 = np.pi / 2
        # calculate edge angles
        edges = hull_points[1:] - hull_points[:-1]
        angles = np.arctan2(edges[:, 1], edges[:, 0])
        angles = np.abs(np.mod(angles, pi2))
        angles = np.unique(angles)
        # find rotation matrices
        rotations = np.vstack([
            np.cos(angles),
            -np.sin(angles),
            np.sin(angles),
            np.cos(angles)]).T
        rotations = rotations.reshape((-1, 2, 2))
        # apply rotations to the hull
        rot_points = np.dot(rotations, hull_points.T)
        # find the bounding points
        min_x = np.nanmin(rot_points[:, 0], axis=1)
        max_x = np.nanmax(rot_points[:, 0], axis=1)
        min_y = np.nanmin(rot_points[:, 1], axis=1)
        max_y = np.nanmax(rot_points[:, 1], axis=1)
        # find the box with the best area
        areas = (max_x - min_x) * (max_y - min_y)
        best_idx = np.argmin(areas)
        # return the best box
        x1 = max_x[best_idx]
        x2 = min_x[best_idx]
        y1 = max_y[best_idx]
        y2 = min_y[best_idx]
        rotmat = rotations[best_idx]
        # generate coordinates
        coords = np.zeros((4, 2))
        coords[0] = np.dot([x1, y2], rotmat)
        coords[1] = np.dot([x2, y2], rotmat)
        coords[2] = np.dot([x2, y1], rotmat)
        coords[3] = np.dot([x1, y1], rotmat)

        return coords, rotmat

    @staticmethod
    def _mat_to_rgb(mat: np.ndarray) -> np.ndarray:
        """Convert image matrix to numpy rgb format

        Args:
            mat: {array-like} (M, N)

        Returns:
            An numpy.ndarray (M, N, 3) with original values repeated across
            RGB channels.
        """
        return np.repeat(mat[:, :, np.newaxis], 3, axis=2)

# Normalizing data and Transoformation to images

In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()

normalized_data = scaler.fit_transform(new_tr_X_train)

normalized_df = pd.DataFrame(normalized_data, columns=new_tr_X_train.columns)

normalized_X_train = normalized_df

normalized_X_train= normalized_X_train.to_numpy()

In [None]:
IMGTR = ImageTransformer(feature_extractor='tsne',
discretization='assignment', pixels=(28, 28))

In [None]:
transformed_dataset = IMGTR.fit_transform(normalized_X_train)

In [None]:
normalized_data_test = scaler.transform(new_tr_X_test)
normalized_X_test = normalized_data_test


transformed_test_dataset = IMGTR.transform(normalized_X_test)

In [None]:
new_dataset_train = transformed_dataset[:, :, :, 0:1]
new_dataset_test = transformed_test_dataset[:, :, :, 0:1]

# cGAN Architectures

In [None]:
# two hidden layers

from keras.models import Model
from keras.layers import Input, Dense, Reshape, Flatten, Concatenate
from keras.layers import BatchNormalization, Activation
from keras.layers import LeakyReLU
from keras.optimizers import Adam

def build_generator():
    input_noise = Input(shape=(100,))
    input_condition = Input(shape=(1,))
    merged_input = Concatenate()([input_noise, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    generated_output = Dense(784, activation='sigmoid')(hidden_layer)
    generated_output = Reshape((28, 28, 1))(generated_output)
    model = Model(inputs=[input_noise, input_condition], outputs=generated_output)
    model.summary()
    return model

def build_discriminator():
    input_image = Input(shape=(28, 28, 1,))
    input_condition = Input(shape=(1,))
    flattened_image = Flatten()(input_image)
    merged_input = Concatenate()([flattened_image, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    validity = Dense(1, activation='sigmoid')(hidden_layer)
    model = Model(inputs=[input_image, input_condition], outputs=validity)
    model.summary()
    return model

discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])

generator = build_generator()

noise = Input(shape=(100,))
condition = Input(shape=(1,))
generated_image = generator([noise, condition])

discriminator.trainable = False
validity = discriminator([generated_image, condition])

combined_model = Model(inputs=[noise, condition], outputs=validity)
combined_model.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))


In [None]:
# three hidden layers

from keras.models import Model
from keras.layers import Input, Dense, Reshape, Flatten, Concatenate
from keras.layers import BatchNormalization, Activation
from keras.layers import LeakyReLU
from keras.optimizers import Adam

def build_generator():
    input_noise = Input(shape=(100,))
    input_condition = Input(shape=(1,))
    merged_input = Concatenate()([input_noise, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer) 
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    generated_output = Dense(784, activation='sigmoid')(hidden_layer)
    generated_output = Reshape((28, 28, 1))(generated_output)
    model = Model(inputs=[input_noise, input_condition], outputs=generated_output)
    model.summary()
    return model

def build_discriminator():
    input_image = Input(shape=(28, 28, 1,))
    input_condition = Input(shape=(1,))
    flattened_image = Flatten()(input_image)
    merged_input = Concatenate()([flattened_image, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    validity = Dense(1, activation='sigmoid')(hidden_layer)
    model = Model(inputs=[input_image, input_condition], outputs=validity)
    model.summary()
    return model

discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])

generator = build_generator()

noise = Input(shape=(100,))
condition = Input(shape=(1,))
generated_image = generator([noise, condition])

discriminator.trainable = False
validity = discriminator([generated_image, condition])

combined_model = Model(inputs=[noise, condition], outputs=validity)
combined_model.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))


In [None]:
# Four hidden layers

from keras.models import Model
from keras.layers import Input, Dense, Reshape, Flatten, Concatenate
from keras.layers import BatchNormalization, Activation
from keras.layers import LeakyReLU
from keras.optimizers import Adam

def build_generator():
    input_noise = Input(shape=(100,))
    input_condition = Input(shape=(1,))
    merged_input = Concatenate()([input_noise, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer) 
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer) 
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    generated_output = Dense(784, activation='sigmoid')(hidden_layer)
    generated_output = Reshape((28, 28, 1))(generated_output)
    model = Model(inputs=[input_noise, input_condition], outputs=generated_output)
    model.summary()
    return model

def build_discriminator():
    input_image = Input(shape=(28, 28, 1,))
    input_condition = Input(shape=(1,))
    flattened_image = Flatten()(input_image)
    merged_input = Concatenate()([flattened_image, input_condition])
    hidden_layer = Dense(512)(merged_input)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    hidden_layer = Dense(512)(hidden_layer)
    hidden_layer = LeakyReLU(alpha=0.2)(hidden_layer)
    validity = Dense(1, activation='sigmoid')(hidden_layer)
    model = Model(inputs=[input_image, input_condition], outputs=validity)
    model.summary()
    return model

discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])

generator = build_generator()

noise = Input(shape=(100,))
condition = Input(shape=(1,))
generated_image = generator([noise, condition])

discriminator.trainable = False
validity = discriminator([generated_image, condition])

combined_model = Model(inputs=[noise, condition], outputs=validity)
combined_model.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))


In [None]:
# Training cGAN for 2 class generation


epochs = 20000
batch_size = 2

num_classes = 2

import matplotlib.pyplot as plt

save_dir = 'YOUR_DIRECTORY_PATH'

def save_images(epoch):
    random_conditions = generate_random_conditions(num_classes)

    
    noise = np.random.normal(0, 1, (num_classes, 100))
    generated_images = generator.predict([noise, random_conditions])

    
    fig, axs = plt.subplots(num_classes, 1, figsize=(8, 8))
    for i in range(num_classes):
        axs[i].imshow(generated_images[i])
        axs[i].axis('off')

    
    filename = f'{save_dir}generated_images_epoch_{epoch}.png'
    plt.savefig(filename)
    plt.close()

def generate_random_conditions(batch_size):
    return np.random.randint(0, num_classes, (batch_size, 1))




for epoch in range(epochs):

    idx = np.random.randint(0, new_dataset_train.shape[0], batch_size)
    real_images = new_dataset_train[idx]
    real_labels = new_tr_Y_train[idx]


    noise = np.random.normal(0, 1, (batch_size, 100))
    generated_images = generator.predict([noise, real_labels])

    discriminator.trainable = True
    d_loss_real = discriminator.train_on_batch([real_images, real_labels], np.ones((batch_size, 1)))
    d_loss_fake = discriminator.train_on_batch([generated_images, real_labels], np.zeros((batch_size, 1)))
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    discriminator.trainable = False


    random_conditions = generate_random_conditions(batch_size)
    

    g_loss = combined_model.train_on_batch([noise, random_conditions], np.ones((batch_size, 1)) )


    if epoch % 1000 == 0:
        print(f"Epoch {epoch}/{epochs}")
        print(f"Discriminator Loss: {d_loss[0]}, Accuracy: {100 * d_loss[1]}%")
        print(f"Generator Loss: {g_loss}")

    if epoch % 100 == 0:
        save_images(epoch)


generator.save("generator_model.h5")
discriminator.save("discriminator_model.h5")


In [None]:
# Generating the minority class

num_images = # as per needed for each dataset to balance that.
noise = np.random.normal(0, 1, (num_images, 100))
new_images_1 = generator.predict([noise, np.ones(num_images)])

In [None]:
# Using inverse transformation to get back tabular data
inv_tran1 = IMGTR.inverse_transform(new_images_1)

# CNN using Augmentation and CNN w/o Augmentation

In [None]:


import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
from sklearn.metrics import roc_auc_score
from keras.layers import Dense, LeakyReLU


new_labels = np.ones(new_images_1.shape[0])

new_y_train = np.concatenate((new_tr_Y_train, new_labels), axis=0)

new_X_train = np.concatenate((new_dataset_train, new_images_1), axis=0)

auc_metric = tf.keras.metrics.AUC()




model = keras.Sequential()


model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))


model.add(layers.Flatten())

# Add fully connected layers
model.add(layers.Dense(64, activation='relu'))

model.add(layers.Dense(1, activation='sigmoid'))





model.compile(optimizer='adam', loss='binary_crossentropy',  metrics=[tf.keras.metrics.AUC(from_logits=True)])

# Train the model
# Use this line for CNN w/o augmentation
model.fit(new_dataset_train, new_tr_Y_train, epochs=20, batch_size=4,  validation_data=(new_dataset_test, new_tr_y_test))

# Use this line for CNN w augmentation
model.fit(new_X_train, new_y_train, epochs=20, batch_size=4,  validation_data=(new_dataset_test, new_tr_y_test))

# Evaluate the model on the test set
loss, auc = model.evaluate(new_dataset_test, new_tr_y_test)
print("Test auc:", auc)


# XGBoost using Augmentation and XGBoost w/o Augmentation

In [None]:
# XGBoost w/o Augmentation

import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score

# Define your XGBoost model
model = xgb.XGBClassifier(
    learning_rate=0.1,
    n_estimators=100,
    max_depth=3,
    objective='binary:logistic'
)


model.fit(normalized_X_train, new_tr_Y_train)


y_prob1 = model.predict_proba(normalized_X_test)[:, 1]


roc_auc = roc_auc_score(new_tr_y_test, y_prob1)
print(f"ROC AUC Score: {roc_auc:.4f}")


In [None]:
#XGBoost using Augmentation

new_labels = np.ones(new_images_1.shape[0])

new_y_train = np.concatenate((new_tr_Y_train, new_labels), axis=0)

new_X_train = np.concatenate((normalized_X_train, inv_tran1)), axis=0)



import xgboost as xgb
from sklearn.metrics import accuracy_score


model = xgb.XGBClassifier(
    learning_rate=0.1,
    n_estimators=100,
    max_depth=3,
    objective='binary:logistic'
)

model.fit(new_X_train, new_y_train)


y_prob2 = model.predict_proba(normalized_X_test)[:, 1]

# Calculate the ROC AUC score
roc_auc = roc_auc_score(new_tr_y_test, y_prob2)

# Three class generation using cGAN

In [None]:
# In this context, the architecture will remain consistent; however, the only variation will be in the num_classes.


epochs = 20000
batch_size = 2

num_classes = 3

import matplotlib.pyplot as plt

save_dir = 'YOUR_DIRECTORY_PATH'

def save_images(epoch):
    random_conditions = generate_random_conditions(num_classes)

    
    noise = np.random.normal(0, 1, (num_classes, 100))
    generated_images = generator.predict([noise, random_conditions])

    
    fig, axs = plt.subplots(num_classes, 1, figsize=(8, 8))
    for i in range(num_classes):
        axs[i].imshow(generated_images[i])
        axs[i].axis('off')

    
    filename = f'{save_dir}generated_images_epoch_{epoch}.png'
    plt.savefig(filename)
    plt.close()

def generate_random_conditions(batch_size):
    return np.random.randint(0, num_classes, (batch_size, 1))




for epoch in range(epochs):

    idx = np.random.randint(0, new_dataset_train.shape[0], batch_size)
    real_images = new_dataset_train[idx]
    real_labels = new_tr_Y_train[idx]


    noise = np.random.normal(0, 1, (batch_size, 100))
    generated_images = generator.predict([noise, real_labels])

    discriminator.trainable = True
    d_loss_real = discriminator.train_on_batch([real_images, real_labels], np.ones((batch_size, 1)))
    d_loss_fake = discriminator.train_on_batch([generated_images, real_labels], np.zeros((batch_size, 1)))
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    discriminator.trainable = False


    random_conditions = generate_random_conditions(batch_size)
    

    g_loss = combined_model.train_on_batch([noise, random_conditions], np.ones((batch_size, 1)) )


    if epoch % 1000 == 0:
        print(f"Epoch {epoch}/{epochs}")
        print(f"Discriminator Loss: {d_loss[0]}, Accuracy: {100 * d_loss[1]}%")
        print(f"Generator Loss: {g_loss}")

    if epoch % 100 == 0:
        save_images(epoch)


generator.save("generator_model.h5")
discriminator.save("discriminator_model.h5")


In [None]:
num_imgs = # as per needed for each class to balance that.

random_conditions_class_2 = np.full((2500, 1), 1)
noise = np.random.normal(0, 1, (2500, 100))
generated_images_class_2 = generator.predict([noise, random_conditions_class_2])


random_conditions_class_3 = np.full((2500, 1), 2)
noise = np.random.normal(0, 1, (2500, 100))
generated_images_class_3 = generator.predict([noise, random_conditions_class_3])

# ADASYN, SMOTE, and CTGAN

In [None]:
# Smote

from imblearn.over_sampling import SMOTE


smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(normalized_X_train, new_tr_Y_train)

In [None]:

# Boosting using augmentation



import xgboost as xgb
from sklearn.metrics import accuracy_score

from sklearn.metrics import roc_auc_score


model = xgb.XGBClassifier(
    learning_rate=0.1,
    n_estimators=100,
    max_depth=3,
    objective='binary:logistic'
)


model.fit(X_resampled, y_resampled)


y_prob2 = model.predict_proba(normalized_X_test)[:, 1]

# Calculate the ROC AUC score
roc_auc = roc_auc_score(new_tr_y_test, y_prob2)
print(f"ROC AUC Score: {roc_auc:.4f}")


In [None]:
# ADASYN

from imblearn.over_sampling import ADASYN
adasyn = ADASYN(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = adasyn.fit_resample(normalized_X_train, new_tr_Y_train)

In [None]:

# Boosting using augmentation



import xgboost as xgb
from sklearn.metrics import accuracy_score

from sklearn.metrics import roc_auc_score


model = xgb.XGBClassifier(
    learning_rate=0.1,
    n_estimators=100,
    max_depth=3,
    objective='binary:logistic'
)


model.fit(X_resampled, y_resampled)


y_prob2 = model.predict_proba(normalized_X_test)[:, 1]

# Calculate the ROC AUC score
roc_auc = roc_auc_score(new_tr_y_test, y_prob2)
print(f"ROC AUC Score: {roc_auc:.4f}")


In [None]:
from ctgan import CTGAN

num_epochs = #

# list of conditions (here the target variable)
discrete_columns []

ctgan = CTGAN(epochs=num_epochs)
ctgan.fit(df, discrete_columns)

In [None]:
condition_column = '' # condition_column name
condition_value = # the minority class
num_samples = # number of samples needed to balance data

synthetic_data = ctgan.sample(num_samples, condition_column=condition_column, condition_value=condition_value)

In [None]:
filtered_data = synthetic_data[synthetic_data[condition_column] == condition_value]
new_tr_Xxx_train1 = filtered_data.drop(columns=[condition_column])
new_tr_Yxx_train1 = filtered_data[condition_column] 


In [None]:

# Boosting using augmentation



new_y_train = np.concatenate((new_tr_Y_train,new_tr_Yxx_train1), axis=0)

new_X_train = np.concatenate((new_tr_X_train, new_tr_Xxx_train1), axis=0)



import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score


model = xgb.XGBClassifier(
    learning_rate=0.1,
    n_estimators=100,
    max_depth=3,
    objective='binary:logistic'
)


model.fit(new_X_train, new_y_train)

y_prob2 = model.predict_proba(new_tr_X_test)[:, 1]

# Calculate the ROC AUC score
roc_auc = roc_auc_score(new_tr_y_test, y_prob2)
print(f"ROC AUC Score: {roc_auc:.4f}")
