In [None]:
import sklearn
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

## RemoveNullRowsTransformer

In [None]:
class RemoveNullRowsTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to drop rows with all missing values.

    ...

    Attributes
    ----------
    None

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but only with rows that have at least one non-NaN value
    """
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.dropna(how='all')

## ReplaceValuesTransformer

In [None]:
class ReplaceValuesTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to assign a universal NaN values to all missing/null/invalid values.

    ...

    Attributes
    ----------
    values_to_replace : list, default=[-99, '-99', -1, '-1', -999, '-999']
        a list of values (usually type str, int, float) to be replaced by np.nan

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but with values specified in this class all as NaN
    """
    def __init__(self, values_to_replace=[-99, '-99', -1, '-1', -999, '-999']):
        self.values_to_replace = values_to_replace

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X.replace(self.values_to_replace, np.nan, inplace=True)
        return X

## RemoveFirstDuplicateTransformer

In [None]:
class RemoveFirstDuplicateTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to remove duplicate responses.

    ...

    Attributes
    ----------
    column : str
        a string indicating the name of the column to check for duplicate values (eg. "ResponseId")

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe ("X"), but with duplicate rows removed (first one to appear in the dataset is kept), keeping the most recent response
    """
    def __init__(self, column='ResponseId', sort_column='RecordedDate'):
        self.column = column
        self.sort_column = sort_column

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.sort_values(self.sort_column, ascending=False)
        X = X[~(X[self.column].duplicated(keep='first'))]
        return X

## RemoveColumnsTransformer

In [None]:
class RemoveColumnsTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to drop columns from a dataframe.

    ...

    Attributes
    ----------
    columns_to_remove : list of str
        a list of column names to remove from dataframe

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but with the specified columns removed
    """
    def __init__(self, columns_to_remove):
        self.columns_to_remove = columns_to_remove

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.drop(self.columns_to_remove, axis=1)

## RenameColumnTransformer

In [None]:
class RenameColumnTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to rename columns.

    ...

    Attributes
    ----------
    old_column_name : str
        a string indicating the original name of the column that's to be renamed (eg. "UNGRADGRADCD")
    new_column_name : str
        a string indicating the new name of the column (eg. "Undergrad Grad")

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but the specified column has a new name
    """
    def __init__(self, old_column_name, new_column_name):
        self.old_column_name = old_column_name
        self.new_column_name = new_column_name

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X.rename(columns={self.old_column_name: self.new_column_name}, inplace=True)
        return X

## RelabelColumnTransformer

In [None]:
class RelabelColumnTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to rename values of a column in a dataframe.

    ...

    Attributes
    ----------
    column_to_relabel : str
        a string indicating the name of the column containing the values to be renamed
    new_label : str
        a string indicating the original name of the dataframe value that's to be renamed (eg. "Freshmen")
    old_label : str, default=None
        a string indicating the new name of the column value (eg. "First-year")

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but the specified value in the specified column has a new name
    """
    def __init__(self, column_to_relabel, new_label, old_label=None):
        self.column_to_relabel = column_to_relabel
        self.old_label = old_label
        self.new_label = new_label

    def fit(self, X, y=None):
        """If no old_label is specified, automatically detect the old label from the dataframe based on the new_label passed in.
           Only works for select values (new_value = "U"/"G" for undergrad/grad, new_value = "First-year" for entry status)
        """
        if self.old_label is None:
            if self.new_label == 'U':
                self.old_label = X[X[self.column_to_relabel].str.contains('(?i)^U', regex=True, na=False)][self.column_to_relabel].iat[0]
            elif self.new_label == 'G':
                self.old_label = X[X[self.column_to_relabel].str.contains('(?i)^G', regex=True, na=False)][self.column_to_relabel].iat[0]
            elif self.new_label == 'First-year':
                self.old_label = X[X[self.column_to_relabel].str.contains('(?i)FRESH', regex=True, na=False)][self.column_to_relabel].iat[0]
        return self

    def transform(self, X):
        X[self.column_to_relabel] = X[self.column_to_relabel].replace(self.old_label, self.new_label)
        return X

## ReplaceStringWithNaNTransformer

In [None]:
class ReplaceStringWithNaNTransformer(TransformerMixin):
    """
    A custom Pipeline transformer class used to replace grad student entry status of "ADVANCED STANDING" with NaN.

    ...

    Attributes
    ----------
    standing_col : str
        a string indicating the name of the column containing the values to be renamed
    string_to_replace : str, default="ADVANCED STANDING"
        a string indicating the original name of the dataframe value that's to be renamed (eg. "Freshmen")
    ug_grad : str, default=""
        a string indicating the student type: "U" for undergrad and "G" for graduate students
    ug_grad_col : str, default="Undergrad Grad"
        a string indicating the name of the column labeling responses as undergrad vs. grad students

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but grad students with an ADVANCED STANDING entry status have NaN for entry status
    """
    def __init__(self, standing_col, string_to_replace='ADVANCED STANDING', ug_grad='', ug_grad_col='Undergrad Grad'):
        self.ug_grad_col = ug_grad_col
        self.standing_col = standing_col
        self.string_to_replace = string_to_replace
        self.ug_grad = ug_grad

    def fit(self, X, y=None):
        if self.ug_grad == '': # default to grad students
            self.ug_grad = X.loc[X[self.ug_grad_col].str.contains('(?i)^g', regex=True), self.ug_grad_col].unique()[0]
        return self

    def transform(self, X, y=None):
        X_transformed = X.copy()

        mask = (X_transformed[self.ug_grad_col] == self.ug_grad) & (X_transformed[self.standing_col] == self.string_to_replace)
        X_transformed.loc[mask, self.standing_col] = np.nan

        return X_transformed

## UniqueStringListTransformer

In [None]:
# EX: Create a column where each row has a unique list of strings the student belongs to
# columns_to_list is a list of the column names or positions that the function will parse through
# and output a new colmn called unique_col_list that will be exploded later in the pipeline
class UniqueStringListTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to create a new column that prepares double-counting demographics to be double counted.

    ...

    Attributes
    ----------
    columns_to_list : list of str
        a list of column names that describe the same demographic value, to be combined into a list
    unique_col_list : str
        a string indicating the name of the new column containing all the values from the list of columns passed in

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but with an additional column ("Reporting College" or "Multiple Ethnicities") containing a list of row-wise unique values from columns_to_list
    """
    def __init__(self, columns_to_list, unique_col_list):
        self.columns_to_list = columns_to_list
        self.unique_col_list = unique_col_list

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X[self.unique_col_list] = X[self.columns_to_list].replace('-', np.nan).apply(
            lambda row: list(pd.unique(row.dropna())), axis=1
        )
        return X

## AddColumnsTransformer

In [None]:
#### Add 2 int columns together into a new column AND creates binary 'yes'/'no' if score >2
# EX: create GAD2 and PHQ2 Scores AND create DEPRESSION/ANXIETY binary
#MHLTH1 + MHLTH2 = DEPRESSION, MHLTH3 + MHLTH4 = ANXIETY (need to be in all caps)

#same as AddIntegerColumnsTransformer, except is ALSO creates a binary column
#should be named 'ANXIETY' and 'DEPRESSION' in all caps for consistency
class AddColumnsTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to add mental health numerical score and binary columns.

    ...

    Attributes
    ----------
    column_1 : str
        a string indicating one of the two mental health columns for calculating depression/anxiety (eg. "MHLTH1")
    column_2 : str
        a string indicating the other mental health column for calculating depression/anxiety (eg. "MHLTH2")
    new_column : str
        a string indicating the name of the newly created mental health score column ("GAD2" for anxiety or "PHQ2" for depression)
    binary_column : str
        a string indicating the name of the binary mental health column for ("ANXIETY" for anxiety or "DEPRESSION" for depression)

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but with two additional columns- one with anxiety/depression scores (1-6) & the other with yes/n
    """
    def __init__(self, column_1, column_2, new_column, binary_column):
        self.column_1 = column_1
        self.column_2 = column_2
        self.new_column = new_column
        self.binary_column = binary_column
        self.string_to_int_mapping = {
            'Nearly every day': 3,
            'Several days': 1,
            'Not at all': 0,
            'More than half the days': 2
        }

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X[self.new_column] = (X[self.column_1].map(self.string_to_int_mapping) +
                              X[self.column_2].map(self.string_to_int_mapping))
        X[self.binary_column] = np.where((X[self.column_1].isna()) | (X[self.column_2].isna()), np.nan,
                                         np.where(X[self.new_column] >= 3, 'YES', 'NO'))
        X[self.binary_column] = X[self.binary_column].replace('nan', np.nan)
        # add question description
        #X.loc[0, self.new_column] = self.new_column
        #X.loc[0, self.binary_column] = self.binary_column
        return X

## AddIntegerColumnsTransformer

In [None]:
##### DO NOT USE IF USING AddColumnsTransformer #####
# Add 2 int columns together into a new column (ie. create GAD2 and PHQ2 scores)
# MHLTH1 + MHLTH2 = DEPRESSION, MHLTH3 + MHLTH4 = ANXIETY (need to be in all caps)
# MHLTH1 + MHLTH2 = DEPRESSION, MHLTH3 + MHLTH4 = ANXIETY
class AddIntegerColumnsTransformer(BaseEstimator, TransformerMixin):
    """
    A custom Pipeline transformer class used to add mental health numerical score columns.

    ...

    Attributes
    ----------
    column_1 : str
        a string indicating one of the two mental health columns for calculating depression/anxiety (eg. "MHLTH1")
    column_2 : str
        a string indicating the other mental health column for calculating depression/anxiety (eg. "MHLTH2")
    new_column : str
        a string indicating the name of the newly created mental health score column ("GAD2" for anxiety or "PHQ2" for depression)

    Methods
    -------
    fit(self, X, y=None)
    transform(self, X, y=None)
        Returns the original dataframe, but with an additional column containing anxiety/depression scores (1-6)
    """
    def __init__(self, column_1, column_2, new_column):
        self.column_1 = column_1
        self.column_2 = column_2
        self.new_column = new_column

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X[self.new_column] = X[self.column_1] + X[self.column_2]
        return X