# Generate folktables datasets

In [1]:
import sys
import logging
from pathlib import Path

import pandas as pd
import numpy as np
from folktables import ACSDataSource, ACSIncome

In [2]:
# Important constants!
TRAIN_SIZE = 0.7
TEST_SIZE = 0.3
VALIDATION_SIZE = None
"""
TRAIN_SIZE = 0.6
TEST_SIZE = 0.2
VALIDATION_SIZE = 0.2
"""

# MAX_SENSITIVE_GROUPS = 4        # discard samples from small sensitive groups, keep only this many
MAX_SENSITIVE_GROUPS = 2
# MAX_SENSITIVE_GROUPS = None

SEED = 42

In [3]:
assert TRAIN_SIZE + TEST_SIZE + (VALIDATION_SIZE or 0.) == 1  # sanity check

In [4]:
# Create data dir
root_dir = Path("~").expanduser()
data_dir = root_dir / "data" / "folktables"
data_dir.mkdir(parents=True, exist_ok=True)

In [5]:
# download 2018 ACS data
from folktables.load_acs import state_list

data_source = ACSDataSource(
    survey_year='2018', horizon='1-Year', survey='person',
    root_dir=str(data_dir),
)

In [6]:
# data is 3236107 rows x 286 columns
acs_data = data_source.get_data(states=state_list, download=True)  # use download=True if not yet downloaded
acs_data.shape

(3236107, 286)

According to the dataset's datasheet, train/test splits should be stratified by state
(at least for ACSIncome, the remaining tasks seem ambiguous).

In [7]:
STATE_COL = "ST"

ACS_CATEGORICAL_COLS = {
    'COW',  # class of worker
    'MAR',  # marital status
    'OCCP', # occupation code
    'POBP', # place of birth code
    'RELP', # relationship status
    'SEX',
    'RAC1P', # race code
    'DIS',  # disability
    'ESP',  # employment status of parents
    'CIT',  # citizenship status
    'MIG',  # mobility status
    'MIL',  # military service
    'ANC',  # ancestry
    'NATIVITY',
    'DEAR',
    'DEYE',
    'DREM',
    'ESR',
    'ST',
    'FER',
    'GCL',
    'JWTR',
#     'PUMA',
#     'POWPUMA',
}

In [8]:
import logging
from copy import deepcopy
from typing import Tuple
from functools import reduce
from operator import or_

import folktables
from folktables import BasicProblem
from sklearn.model_selection import train_test_split

def split_folktables_task(
        acs_data: pd.DataFrame,
        acs_task_name: str,
        train_size: float,
        test_size: float,
        validation_size: float = None,
        max_sensitive_groups: int = None,
        stratify_by_state: bool = True,
        save_to_dir: Path = None,
        seed: int = 42,
    ) -> Tuple[pd.DataFrame, ...]:
    """Train/test split a given folktables task (or train/test/validation).
    
    According to the dataset's datasheet, (at least) the ACSIncome
    task should be stratified by state.
    
    Returns
    -------
    (train_data, test_data, validation_data) : Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
    """
    # Sanity check
    assert train_size + test_size + (validation_size or 0.0) == 1
    assert all(val is None or 0 <= val <= 1 for val in (train_size, test_size, validation_size))
    
    # Dynamically import/load task object
    acs_task = getattr(folktables, acs_task_name)

    # Add State to the feature columns so we can do stratified splits (will be removed later)
    remove_state_col_later = False # only remove the state column later if we were the ones adding it
    if stratify_by_state:
        if STATE_COL not in acs_task.features:
            acs_task = deepcopy(acs_task) # we're gonna need to change this task object
            acs_task.features.append(STATE_COL)
            remove_state_col_later = True
        else:
            remove_state_col_later = False

    # Pre-process data + select task-specific features
    features, label, group = acs_task.df_to_numpy(acs_data)

    # Make a DataFrame with all processed data
    df = pd.DataFrame(data=features, columns=acs_task.features)
    df[acs_task.target] = label

    # Correct column ordering (1st: label, 2nd: group, 3rd and onwards: features)
    cols_order = ([acs_task.target, acs_task.group] +
        list(set(acs_task.features) - {acs_task.group}))
    if remove_state_col_later:
        cols_order = [col for col in cols_order if col != STATE_COL]

    # Save state_col for stratified split
    if stratify_by_state:
        state_col_data = df[STATE_COL]

    # Enforce correct ordering in df
    df = df[cols_order]

    # Drop samples from sensitive groups with low relative size
    # (e.g., original paper has only White and Black races)
    if max_sensitive_groups is not None and max_sensitive_groups > 0:
        group_sizes = df.value_counts(acs_task.group, sort=True, ascending=False)
        big_groups = group_sizes.index.to_list()[: max_sensitive_groups]

        big_groups_filter = reduce(
            or_,
            [(df[acs_task.group].to_numpy() == g) for g in big_groups],
        )
        
        # Keep only big groups
        df = df[big_groups_filter]
        state_col_data = state_col_data[big_groups_filter]
        
        # Group values must be sorted, and start at 0
        # (e.g., if we deleted group=2 but kept group=3, the later should now have value 2)
        if df[acs_task.group].max() > df[acs_task.group].nunique():
            map_to_sequential = {g: idx for g, idx in zip(big_groups, range(len(big_groups)))}
            df[acs_task.group] = [map_to_sequential[g] for g in df[acs_task.group]]

            logging.warning(f"Using the following group value mapping: {map_to_sequential}")
            assert df[acs_task.group].max() == df[acs_task.group].nunique() - 1

    ## Try to enforce correct types
    # All columns should be encoded as integers, dtype=int
    types_dict = {
        col: int for col in df.columns
        if df.dtypes[col] != "object"
    }
    
    df = df.astype(types_dict)
    # ^ set int types right-away so that categories don't have floating points
    
    # Set categorical columns to start at value=0! (necessary for sensitive attributes)
    for col in (ACS_CATEGORICAL_COLS & set(df.columns)):
        df[col] = df[col] - df[col].min()

    # Set categorical columns to the correct dtype "category"
    types_dict.update({
        col: "category" for col in (ACS_CATEGORICAL_COLS & set(df.columns))
        # if df[col].nunique() < 10
    })

    # Plus the group is definitely categorical
    types_dict.update({acs_task.group: "category"})
    
    # And the target is definitely integer
    types_dict.update({acs_task.target: int})
    
    # Set df to correct types
    df = df.astype(types_dict)

    # ** Split data in train/test/validation **
    train_idx, other_idx = train_test_split(
        df.index,
        train_size=train_size,
        stratify=state_col_data if stratify_by_state else None,
        random_state=seed,
        shuffle=True)

    train_df, other_df = df.loc[train_idx], df.loc[other_idx]
    assert len(set(train_idx) & set(other_idx)) == 0

    # Split validation
    if validation_size is not None and validation_size > 0:
        new_test_size = test_size / (test_size + validation_size)

        val_idx, test_idx = train_test_split(
            other_df.index,
            test_size=new_test_size,
            stratify=state_col_data.loc[other_idx] if stratify_by_state else None,
            random_state=seed,
            shuffle=True)

        val_df, test_df = other_df.loc[val_idx], other_df.loc[test_idx]
        assert len(train_idx) + len(val_idx) + len(test_idx) == len(df)
        assert np.isclose(len(val_df) / len(df), validation_size)

    else:
        test_idx = other_idx
        test_df = other_df

    assert np.isclose(len(train_df) / len(df), train_size)
    assert np.isclose(len(test_df) / len(df), test_size)
    
    # Optionally, save data to disk
    if save_to_dir:
        print(f"Saving data to folder '{str(save_to_dir)}' with prefix '{acs_task_name}'.")
        train_df.to_csv(save_to_dir / f"{acs_task_name}.train.csv", header=True, index_label="index")
        test_df.to_csv(save_to_dir / f"{acs_task_name}.test.csv", header=True, index_label="index")
        
        if validation_size:
            val_df.to_csv(save_to_dir / f"{acs_task_name}.validation.csv", header=True, index_label="index")

    return (train_df, test_df, val_df) if validation_size else (train_df, test_df)

In [9]:
import cloudpickle

def save_pickle(obj: object, path: str | Path, overwrite: bool = True) -> bool:
    """Saves the given object as a pickle with the given file path.

    Parameters
    ----------
    obj : object
        The object to pickle
    path : str or Path
        The file path to save the pickle with.

    Returns
    -------
    success : bool
        Whether pickling was successful.
    """
    try:
        with open(path, "wb" if overwrite else "xb") as f_out:
            cloudpickle.dump(obj, f_out)
            return True

    except Exception as e:
        logging.error(f"Pickling failed with exception '{e}'")
        return False

In [10]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from pandas.api.types import is_categorical_dtype, is_numeric_dtype

def onehot_encode_data(
        acs_task_name: str,
        train_df: pd.DataFrame,
        test_df: pd.DataFrame,
        val_df: pd.DataFrame = None,
        save_to_dir: Path = None,
    ) -> tuple[pd.DataFrame]:
    """Preprocesses the given data for NNs and other 'numeric-only' algorithms, 
    including: one-hot encoding categorical data, and scaling numeric data to 
    zero mean and unit stddev.
    """

    # Dynamically import/load task object
    task_obj = getattr(folktables, acs_task_name)
    
    # Split DF in categorical data and numeric data
    cat_cols = [col for col in train_df.columns if is_categorical_dtype(train_df[col])]
    numeric_cols = [col for col in train_df.columns if col not in cat_cols and col != task_obj.target]

    # Fit 1-hot encoder only to the categorical data
    enc = OneHotEncoder(
        # drop="first",
        drop="if_binary",
        sparse=False,
        min_frequency=0.005,
        handle_unknown="infrequent_if_exist")
    enc.fit(train_df[cat_cols])
    
    # Fit standard scaler to numeric data (not the labels though!)
    scaler = StandardScaler()
    scaler.fit(train_df[numeric_cols])
    
    # Save encoder and scaler pickles to disk
    save_pickle(enc, save_to_dir / f"{acs_task_name}.one-hot-encoder.pkl")
    save_pickle(scaler, save_to_dir / f"{acs_task_name}.standard-scaler.pkl")

    def onehot_helper(df):
        """Helper function to transform and preprocess the given DF."""

        # Encode categorical data
        df_cat_1hot = pd.DataFrame(
            data=enc.transform(df[cat_cols]),
            columns=enc.get_feature_names_out(cat_cols),
            index=df.index,
        )
        
        # Standardize numerical data
        df_numeric_standardized = pd.DataFrame(
            data=scaler.transform(df[numeric_cols]),
            columns=numeric_cols,
            index=df.index,
        )

        # Concatenate encoded categorical data with standardized numerical data
        df_processed = pd.concat(
            (
                df[task_obj.target],
                df[task_obj.group],
                df_numeric_standardized,
                df_cat_1hot
            ), axis=1, ignore_index=False, join="inner")

        assert len(df) == len(df_processed)
        return df_processed

    # For each data split: encode categorical data and do some extra data processing
    train_df_1hot = onehot_helper(train_df)
    test_df_1hot = onehot_helper(test_df)
    val_df_1hot = onehot_helper(val_df) if val_df is not None else None

    # Optionally, save data to disk
    if save_to_dir:
        print(f"Saving ** 1-hot ** data to folder '{str(save_to_dir)}' with prefix '{acs_task_name}'.")
        train_df_1hot.to_csv(save_to_dir / f"{acs_task_name}.train.1-hot.csv", header=True, index_label="index")
        test_df_1hot.to_csv(save_to_dir / f"{acs_task_name}.test.1-hot.csv", header=True, index_label="index")
        
        if val_df is not None:
            val_df_1hot.to_csv(save_to_dir / f"{acs_task_name}.validation.1-hot.csv", header=True, index_label="index")

    return (train_df_1hot, test_df_1hot, val_df_1hot) if val_df is not None else (train_df_1hot, test_df_1hot)


In [11]:
from collections import defaultdict
all_tasks_max_const_acc = defaultdict(dict)

def get_prevalence_details(task_name: str, data: pd.Series, data_type: str) -> str:
    label_col = getattr(folktables, task_name).target
    label_data = data[label_col]

    mode = label_data.mode().values[0]
    prev = label_data.mean()
    const_acc = max(prev, 1-prev)
    
    global all_tasks_max_const_acc
    all_tasks_max_const_acc[task_name][data_type] = const_acc

    return f"const. accuracy: {const_acc:.2%} \t (prediction={mode})"

In [12]:
from tqdm.auto import tqdm

all_acs_tasks = [
    'ACSIncome',
    'ACSPublicCoverage',
    'ACSMobility',
    'ACSEmployment',
    'ACSTravelTime',
]

# Create sub-folder to save data to
subfolder_name = f"train={TRAIN_SIZE:.2}_test={TEST_SIZE:.2}"
if VALIDATION_SIZE:
    subfolder_name = f"{subfolder_name}_validation={VALIDATION_SIZE:.2}"
if MAX_SENSITIVE_GROUPS is not None and MAX_SENSITIVE_GROUPS > 0:
    subfolder_name = f"{subfolder_name}_max-groups={MAX_SENSITIVE_GROUPS}"

subfolder_path = data_dir / subfolder_name
subfolder_path.mkdir(parents=True, exist_ok=True)

# Generate data and save to disk, for all tasks
for task_name in tqdm(all_acs_tasks):

    # Process data
    data = split_folktables_task(
        acs_data,
        acs_task_name=task_name,
        train_size=TRAIN_SIZE,
        test_size=TEST_SIZE,
        validation_size=VALIDATION_SIZE,
        max_sensitive_groups=MAX_SENSITIVE_GROUPS,
        stratify_by_state=True,
        seed=SEED,
        save_to_dir=subfolder_path,
    )
    
    # Process 1-hot encodings
    data_1hot = onehot_encode_data(
        task_name,
        *data,
        save_to_dir=subfolder_path,
    )
    
    # Print prevalence for each dataset (train, validation, and test)
    data_types = ("train", "test", "validation")
    print(f"\n** {task_name} **\n")
    print("\n".join(f"{data_type}:\t{get_prevalence_details(task_name, df, data_type)}" for data_type, df in zip(data_types, data)))
    
    # Sanity check: prevalence for one-hot version should be equal to that of categorical version
    print("\none-hot-encoded version:")
    print("\n".join(f"{data_type}:\t{get_prevalence_details(task_name, df, data_type)}" for data_type, df in zip(data_types, data_1hot)))

  0%|          | 0/5 [00:00<?, ?it/s]

Saving data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSIncome'.




Saving ** 1-hot ** data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSIncome'.

** ACSIncome **

train:	const. accuracy: 62.41% 	 (prediction=0)
test:	const. accuracy: 62.45% 	 (prediction=0)

one-hot-encoded version:
train:	const. accuracy: 62.41% 	 (prediction=0)
test:	const. accuracy: 62.45% 	 (prediction=0)
Saving data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSPublicCoverage'.




Saving ** 1-hot ** data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSPublicCoverage'.

** ACSPublicCoverage **

train:	const. accuracy: 70.57% 	 (prediction=0)
test:	const. accuracy: 70.59% 	 (prediction=0)

one-hot-encoded version:
train:	const. accuracy: 70.57% 	 (prediction=0)
test:	const. accuracy: 70.59% 	 (prediction=0)
Saving data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSMobility'.




Saving ** 1-hot ** data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSMobility'.

** ACSMobility **

train:	const. accuracy: 73.60% 	 (prediction=1)
test:	const. accuracy: 73.41% 	 (prediction=1)

one-hot-encoded version:
train:	const. accuracy: 73.60% 	 (prediction=1)
test:	const. accuracy: 73.41% 	 (prediction=1)
Saving data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSEmployment'.




Saving ** 1-hot ** data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSEmployment'.

** ACSEmployment **

train:	const. accuracy: 54.28% 	 (prediction=0)
test:	const. accuracy: 54.24% 	 (prediction=0)

one-hot-encoded version:
train:	const. accuracy: 54.28% 	 (prediction=0)
test:	const. accuracy: 54.24% 	 (prediction=0)
Saving data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSTravelTime'.




Saving ** 1-hot ** data to folder '/Users/acruz/data/folktables/train=0.7_test=0.3_max-groups=2' with prefix 'ACSTravelTime'.

** ACSTravelTime **

train:	const. accuracy: 56.97% 	 (prediction=0)
test:	const. accuracy: 57.09% 	 (prediction=0)

one-hot-encoded version:
train:	const. accuracy: 56.97% 	 (prediction=0)
test:	const. accuracy: 57.09% 	 (prediction=0)


In [13]:
import json
print(json.dumps(all_tasks_max_const_acc, indent=2))

{
  "ACSIncome": {
    "train": 0.6240710126295839,
    "test": 0.6244771852159277
  },
  "ACSPublicCoverage": {
    "train": 0.7057255736710011,
    "test": 0.7059297470937229
  },
  "ACSMobility": {
    "train": 0.7359866494711265,
    "test": 0.734137080422064
  },
  "ACSEmployment": {
    "train": 0.5428142054730425,
    "test": 0.5423693864853719
  },
  "ACSTravelTime": {
    "train": 0.5696879960067078,
    "test": 0.5708901510194466
  }
}


---