<a href="https://colab.research.google.com/github/ipeirotis/autoencoders_census/blob/main/pandas2vector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Code that transforms a dataframe to vector format and vice versa

Transform and reverse transform the data, allowing for preprocessing and postprocessing steps in pipelines. It provides functionality to handle missing values, encode categorical variables, and scale numeric variables.

In [None]:
import pandas as pd
import numpy as np

from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from pandas.api.types import is_numeric_dtype

In [None]:
class Table2Vector:
    """
    Class for transforming data for machine learning.

    This class handles transformations like one-hot encoding for categorical data,
    min-max scaling for numerical data, and handling missing data.

    This class does not handle textual data or datetime variabls.
    """

    # Ignore for now. We will use it later
    VAR_TYPES = [
        'categorical',
        'numeric',
        'datetime',
        'text',
        'binary',
        'missing_indicator' # indicator variable for missing values in another column
    ]

    def __init__(self, variable_types):
        """Initialize the transformer with the variable types dictionary."""
        self.SEP = '__'
        self.MISSING = 'MISSING__'

        self.var_types = {
            'categorical': [],
            'numeric': [],
            'datetime': [],
            'text': [],
            'binary': [],
            'missing_indicator': []
        }

        for k in  self.var_types:
            self.var_types[k] = [var for var,var_type in variable_types.items() if var_type == k]

        self.one_hot_encoders = {}
        self.min_max_scalers = {}

    def vectorize_table(self, original_df, add_missing_indicators=False):
        """
        Transform the dataframe according to the variable types.

        Categorical variables are one-hot encoded, numeric variables are min-max scaled,
        and missing values are replaced with dummy variables.

        Returns:
        - The transformed dataframe.
        - Dictionaries with fitted OneHotEncoders and MinMaxScalers for each column.
        """
        vectorized_df = original_df.copy()



        for column in vectorized_df.columns:
            # We use a MixMaxScaler for numeric variables
            if column in self.var_types['numeric'] and is_numeric_dtype(vectorized_df[column]):
                min_max_scaler = MinMaxScaler()
                non_na_rows = vectorized_df[column].notna()
                vectorized_df.loc[non_na_rows, column] = min_max_scaler.fit_transform(vectorized_df.loc[non_na_rows, [column]]).ravel()
                self.min_max_scalers[column] = min_max_scaler
            elif column in self.var_types['categorical']:
                one_hot_encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore', drop=(np.nan,))
                df_encoded = pd.DataFrame(one_hot_encoder.fit_transform(vectorized_df[[column]]))
                df_encoded.columns = [f"{column}{self.SEP}{cat}" for cat in one_hot_encoder.categories_[0] if str(cat) != 'nan']
                vectorized_df = pd.concat([vectorized_df, df_encoded], axis=1)
                vectorized_df = vectorized_df.drop(column, axis=1)
                self.one_hot_encoders[column] = one_hot_encoder

        # Add missing indicators
        if add_missing_indicators:
          missing_indicators = self.add_missing_indicators(original_df)
          vectorized_df = pd.concat([vectorized_df, missing_indicators], axis='columns')

        return vectorized_df


    def add_missing_indicators(self, df):
        """
        Adds binary columns to the dataframe indicating the presence of missing values.

        For each column in the dataframe, this function adds a corresponding column
        with a binary indicator of whether the value in that row is missing (NaN).
        These new columns are named 'missing_<column_name>' and are appended to the dataframe.

        Args:
            df (pd.DataFrame): The input pandas DataFrame.

        Returns:
            result (pd.DataFrame): The DataFrame with added missing value indicator columns.
        """

        # Create DataFrame with indicator of missing values

        # We will create missing value indicators if
        # (a) there is no such missing value indicator already for the column and
        # (b) the column is not already a missing value indicator
        cols = [c for c in df.columns if not c.startswith(self.MISSING) and f'{self.MISSING}{c}' not in df.columns]
        df_missing = pd.concat([df[c].isnull().astype(int) for c in cols], axis=1)
        df_missing.columns = [f'{self.MISSING}{c}' for c in cols]



        return df_missing

    @staticmethod
    def proba_to_onehot(proba):
        """Convert a vector of probabilities into a max-likelihood one-hot vector."""
        onehot = np.zeros_like(proba)
        onehot[np.arange(len(proba)), np.argmax(proba, axis=1)] = 1
        return onehot


    def tabularize_vector(self, vectorized_df, restore_missing_values=False):
        """
        Reverse the transformations applied to the dataframe.

        One-hot encoded categorical variables are decoded and min-max scaled numeric variables
        are inverse scaled.

        Returns the original dataframe.
        """
        df = vectorized_df.copy()


        for column in self.var_types['categorical']:
            one_hot_encoder = self.one_hot_encoders[column]
            original_cols = [col for col in df.columns if col.startswith(f"{column}{self.SEP}")]
            onehot_encoded = df[original_cols].values
            # Identify rows where all one-hot columns are zero
            all_zero_rows = (onehot_encoded == 0).all(axis=1)

            # Convert probabilities to one-hot encoding and perform inverse transformation
            onehot = self.proba_to_onehot(onehot_encoded)
            df_original = pd.DataFrame(one_hot_encoder.inverse_transform(onehot), columns=[column])

            # Set original value to NaN for rows that were all zeros in the one-hot encoded data
            df_original.loc[all_zero_rows, column] = np.nan

            df = pd.concat([df.drop(original_cols, axis=1), df_original], axis=1)




        for column in self.var_types['numeric']:
            min_max_scaler = self.min_max_scalers[column]
            non_na_rows = df[column].notna()
            inverse_transformed = min_max_scaler.inverse_transform(df.loc[non_na_rows, [column]])
            df.loc[non_na_rows, column] = inverse_transformed.flatten()

        # Remove missing indicators
        df = df.drop([col for col in df.columns if col.startswith(self.MISSING)], axis=1)

        if restore_missing_values:
            # TODO: Need to also set to NULL the corresponding values in the corresponding columns
            pass

        return df


In [None]:
import unittest

class TestDataTransformer(unittest.TestCase):
    def setUp(self):
        self.variable_types = {
            'age': 'numeric',
            'gender': 'categorical',
            'income': 'numeric',
            'gender_at_birth': 'categorical',

        }
        self.vectorizer = Table2Vector(self.variable_types)
        self.SEP = self.vectorizer.SEP
        self.MISSING = self.vectorizer.MISSING

        self.data = pd.DataFrame({
            'age': [25, 30, 35, np.nan],
            'gender': ['male', 'female', np.nan, 'female'],
            'income': [50000.0, np.nan, 70000.0, 80000.0],
            'gender_at_birth': ['female', 'female', np.nan, 'male'],
        })

    def test_add_missing_indicators(self):

        df = self.data.copy()
        vectorized = self.vectorizer.vectorize_table(df, add_missing_indicators=True)

        # Check that the output DataFrame has the correct number of columns
        expected_columns = ['age', 'income',
                            'gender__female', 'gender__male',

                            'gender_at_birth__female', 'gender_at_birth__male',
                            self.MISSING+'age',
                            self.MISSING+'gender', self.MISSING+'income',
                            self.MISSING+'gender_at_birth']
        print(vectorized.columns)
        self.assertListEqual(list(vectorized.columns), expected_columns)

        # Check that the added columns in the output DataFrame start with 'missing_'
        missing_cols = [col for col in vectorized.columns if col.startswith(self.MISSING)]
        self.assertEqual(len(missing_cols), 4)

        # Check that 'missing_' columns contain only 0s and 1s
        for col in missing_cols:
            self.assertTrue(set(vectorized[col].unique()).issubset({0, 1}))

        # Check that the number of 1s in 'missing_' columns matches the number of NaN values in the original DataFrame
        for original_col in self.data.columns:
                missing_col = self.MISSING+original_col
                self.assertEqual(vectorized[missing_col].sum(), self.data[original_col].isnull().sum())


    def test_transform_dataframe(self):

        vectorized_df = self.vectorizer.vectorize_table(self.data, add_missing_indicators=True)

        # Check that original DataFrame has been transformed properly
        self.assertNotIn('gender', vectorized_df.columns)
        self.assertIn('gender__male', vectorized_df.columns)
        self.assertIn('gender__female', vectorized_df.columns)

        # Check that missing values have been handled correctly
        self.assertEqual(vectorized_df.loc[3, 'MISSING__age'], 1)
        self.assertEqual(vectorized_df.loc[0, 'MISSING__age'], 0)

        # Check that numeric columns have been scaled correctly
        self.assertEqual(vectorized_df.loc[0, 'age'], 0)
        self.assertEqual(vectorized_df.loc[1, 'age'], 0.5)
        self.assertEqual(vectorized_df.loc[2, 'age'], 1)
        self.assertTrue(np.isnan(vectorized_df.loc[3, 'age']))


    def test_proba_to_onehot(self):
        proba = np.array([[0.1, 0.9], [0.7, 0.3]])
        expected_onehot = np.array([[0, 1], [1, 0]])

        np.testing.assert_array_equal(self.vectorizer.proba_to_onehot(proba), expected_onehot)

    def test_reverse_transform_dataframe(self):
        vector_df = self.vectorizer.vectorize_table(self.data)
        reversed_df = self.vectorizer.tabularize_vector(vector_df)

        # Check that DataFrame has been reversed correctly
        pd.testing.assert_frame_equal(reversed_df, self.data, check_like=True)

        # Check that the missing data has been reversed correctly
        self.assertTrue(pd.isnull(reversed_df.loc[3, 'age']))

        # Check that the numeric scaling has been reversed correctly
        self.assertTrue('age' in reversed_df.columns)
        self.assertTrue('income' in reversed_df.columns)
        self.assertListEqual(list(self.data['age'].dropna()), list(reversed_df['age'].dropna()))
        self.assertTrue(np.array_equal(np.isnan(self.data['income']), np.isnan(reversed_df['income'])))

        # Check that the categorical encoding has been reversed correctly
        self.assertTrue('gender' in reversed_df.columns)
        self.assertListEqual(list(self.data['gender']), list(reversed_df['gender']))

def run_tests(test_class):
    suite = unittest.TestLoader().loadTestsFromTestCase(test_class)
    runner = unittest.TextTestRunner()
    runner.run(suite)


In [None]:
if __name__ == "__main__":
  run_tests(TestDataTransformer)