# Setup

In [1]:
%autoreload 2
%load_ext autoreload
%matplotlib widget

import sys, os, pickle, pdb, shutil, re, math
from copy import deepcopy, copy
from pathlib import Path
from pprint import pprint

from tqdm.notebook import tqdm
import pandas as pd, numpy as np, torch
import torchvision.transforms as T
from torch.utils.tensorboard import SummaryWriter

from optimizing_for_explainability.utils import lime_fit, sample_around, CatLayer, finite_diff

paths = [Path("").parent.absolute() / "shap", Path("").parent.absolute() / "shap_original"]
for path in paths:
    if str(path) not in sys.path:
        sys.path.insert(0, str(path))
import shap, shap_original

DTYPE, DEVICE = torch.float32, torch.device("cuda")
TOPTS = dict(dtype=DTYPE, device=DEVICE)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
df = pd.read_csv(Path("") / "data" / "compas" / "cox-violent-parsed_filt.csv")

# Preprocess the data

# Filter out entries with no indication of recidivism or no compass score
df = df[df["is_recid"] != -1]
df = df[df["decile_score"] != -1]

# Rename recidivism column
df["recidivism_within_2_years"] = df["is_recid"]

# Make the COMPASS label column numeric (0 and 1), for use in our model
df["COMPASS_determination"] = np.where(df["score_text"] == "Low", 0, 1)

df = pd.get_dummies(df, columns=["sex", "race"])

# Get list of all columns from the dataset we will use for model input or output.
input_features = [
    "sex_Female",
    "sex_Male",
    "age",
    "race_African-American",
    "race_Caucasian",
    "race_Hispanic",
    "race_Native American",
    "race_Other",
    "priors_count",
    "juv_fel_count",
    "juv_misd_count",
    "juv_other_count",
]

to_keep = input_features + ["recidivism_within_2_years", "COMPASS_determination"]

to_remove = [col for col in df.columns if col not in to_keep]
df = df.drop(columns=to_remove)

input_columns = df.columns.tolist()
labels = df["COMPASS_determination"]

In [3]:
# Create data structures needing for training and testing.
# The training data doesn't contain the column we are predicting,
# 'COMPASS_determination', or the column we are using for evaluation of our
# trained model, 'recidivism_within_2_years'.
df_for_training = df.drop(columns=["COMPASS_determination", "recidivism_within_2_years"])
train_size = int(len(df_for_training) * 0.8)

train_data = df_for_training[:train_size]
train_labels = labels[:train_size]
test_data = df_for_training[train_size:]
test_labels = labels[train_size:]

test_data_with_labels = df[train_size:]

In [4]:
Xtr, Ytr = train_data.values, train_labels.values
Xts, Yts = test_data.values, test_labels.values
MU, STD = np.mean(Xtr, -2), np.std(Xtr, -2)
normalize_fn = lambda x: (x - MU[None, ...]) / STD[None, ...]
Xtr = normalize_fn(Xtr)
Xts = normalize_fn(Xts)
train_loader = torch.utils.data.DataLoader(list(zip(*[Xtr, Ytr])), batch_size=1024, num_workers=8)
test_loader = torch.utils.data.DataLoader(list(zip(*[Xts, Yts])), batch_size=1024, num_workers=8)

In [5]:
# Create the model

# This is the size of the array we'll be feeding into our model for each example
input_size = len(train_data.iloc[0])

RACE_IDX = [i for (i, z) in enumerate(train_data.columns) if re.match(r"race_.*", z) is not None]

# activation = torch.nn.ReLU()
activation = torch.nn.Softplus(1e1)


def generate_model():
    model = (
        torch.nn.Sequential(
            torch.nn.Linear(input_size, 128),
            copy(activation),
            torch.nn.Linear(128, 128),
            copy(activation),
            torch.nn.Linear(128, 1),
            torch.nn.Sigmoid(),
        )
        .to(DTYPE)
        .to(DEVICE)
    )
    loss_obj = torch.nn.BCELoss()
    lam = 1e-3
    loss_fn = lambda Yp, Y: loss_obj(Yp, Y) + sum(lam * torch.sum(param**2) / 2 for param in model.parameters())

    x0 = torch.as_tensor(Xtr[torch.randint(0, Xtr.shape[0], size=(1000,)), :], device=DEVICE, dtype=DTYPE)
    Xs = sample_around(x0, torch.tensor(STD, device=DEVICE, dtype=DTYPE), N=int(1e2), alf=1e-2)
    Xs = Xs.transpose(0, 1)
    loss_fn_ = loss_fn

    def loss_fn(Yp, Y, penalize=True, penalty="exact", gam=1e1):
        ret = loss_fn_(Yp, Y)
        if penalize:
            Yp = model(Xs)
            W, b = lime_fit(Xs, Yp)
            if penalty == "exact":
                ret = ret + gam * torch.mean(torch.norm(W[..., RACE_IDX, 0], dim=-1))
            elif penalty == "mse":
                ret = ret + gam * torch.mean(torch.norm(W[..., RACE_IDX, 0], dim=-1) ** 2)
            elif penalty == "super-exact":
                ret = ret + gam * torch.mean(torch.sqrt(torch.norm(W[..., RACE_IDX, 0], dim=-1)))
            else:
                raise ValueError(f"penalty [{penalty}] is not supported")
        return ret

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.3)
    return model, loss_fn, optimizer, scheduler, Xs


def accuracy(model, loader):
    correct = 0
    for X, Y in loader:
        X, Y = X.to(DTYPE).to(DEVICE), Y.to(DEVICE)
        Yp = model(X)
        correct += torch.sum((Yp > 0.5).reshape(Y.shape) == Y).detach()
    return correct / len(loader.dataset)

# LIME Experiments

In [17]:
accs, metrics = [], []
for (penalize, penalty, gam) in [
    (False, "exact", 1e1),
    (True, "mse", 1e2),
    (True, "exact", 1e1),
    (True, "super-exact", 1.5e0),
]:
    model, loss_fn, optimizer, scheduler, Xs = generate_model()
    shutil.rmtree(Path("") / "runs")
    writer = SummaryWriter()
    rng = tqdm(range(int(10)))
    for epoch in rng:
        for (i, (X, Y)) in enumerate(train_loader):
            X, Y = X.to(DTYPE).to(DEVICE), Y.to(DTYPE).to(DEVICE)
            loss = loss_fn(model(X).reshape(Y.shape), Y, penalize=penalize, penalty=penalty, gam=gam)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            # rng.set_description(f"{loss.detach():.4e}")
            writer.add_scalar("loss/train", float(loss), i + epoch * len(train_loader))
            writer.add_scalar("step_size", float(optimizer.param_groups[0]["lr"]), i + epoch * len(train_loader))
            writer.flush()
        scheduler.step()
        # tqdm.write(f"Accuracy = {1e2 * accuracy(model, test_loader):.3f}%")
        rng.set_description(
            f"Accuracy = (test = {1e2 * accuracy(model, test_loader):.3f}%,"
            + f"train = {1e2 * accuracy(model, train_loader):.3f}%)"
        )
        # tqdm.write(f"Loss =     {loss_obj(model(X).reshape(Y.shape), Y):.5e}")

    W, b = lime_fit(Xs, model(Xs))
    print(f"Penalize = {penalize}")
    if penalize:
        print(f"Penalty = {penalty}")
    metric = torch.mean(torch.norm(W[..., RACE_IDX, 0], dim=-1))
    print(float(metric))
    accs.append(float(1e2 * accuracy(model, test_loader)))
    metrics.append(float(metric))
    print("#" * 80)

  0%|          | 0/10 [00:00<?, ?it/s]

Penalize = False
0.22673706710338593
################################################################################


  0%|          | 0/10 [00:00<?, ?it/s]

Penalize = True
Penalty = exact
0.0029133434873074293
################################################################################


  0%|          | 0/10 [00:00<?, ?it/s]

Penalize = True
Penalty = mse
0.007907169871032238
################################################################################


  0%|          | 0/10 [00:00<?, ?it/s]

Penalize = True
Penalty = super-exact
0.0021191039122641087
################################################################################


In [19]:
def check_gam(gam):
    model, loss_fn, optimizer, scheduler, Xs = generate_model()
    shutil.rmtree(Path("") / "runs")
    writer = SummaryWriter()
    rng = tqdm(range(int(10)))
    for epoch in rng:
        for (i, (X, Y)) in enumerate(train_loader):
            X, Y = X.to(DTYPE).to(DEVICE), Y.to(DTYPE).to(DEVICE)
            loss = loss_fn(model(X).reshape(Y.shape), Y, penalize=True, penalty="exact", gam=gam)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            writer.add_scalar("loss/train", float(loss), i + epoch * len(train_loader))
            writer.add_scalar("step_size", float(optimizer.param_groups[0]["lr"]), i + epoch * len(train_loader))
            writer.flush()
        scheduler.step()
        # tqdm.write(f"Accuracy = {1e2 * accuracy(model, test_loader):.3f}%")
        rng.set_description(
            f"Accuracy = (test = {1e2 * accuracy(model, test_loader):.3f}%,"
            + f"train = {1e2 * accuracy(model, train_loader):.3f}%)"
        )
        # tqdm.write(f"Loss =     {loss_obj(model(X).reshape(Y.shape), Y):.5e}")

    W, b = lime_fit(Xs, model(Xs))
    print(f"Penalize = {penalize}")
    if penalize:
        print(f"Penalty = {penalty}")
        print(f"Gam = {gam}")
    metric = torch.mean(torch.norm(W[..., RACE_IDX, 0], dim=-1))
    print(metric)
    print("#" * 80)
    return metric


gams = 10.0 ** np.linspace(-2, 2, 10)
vals = [check_gam(gam) for gam in gams]

--------------------------------------------------------------------------------

# Testing differentiating through Shapley Values

### SHAP differentiability test 

In [13]:
# Create a SHAP explainer by passing a subset of our training data
model, loss_fn, optimizer, scheduler, Xs = generate_model()
X_background = torch.as_tensor(Xtr[torch.randperm(Xtr.shape[0])[:100], ...], **TOPTS)
X_test = torch.as_tensor(Xtr[torch.randperm(Xtr.shape[0])[:10], ...], **TOPTS)
explainer = shap.DeepExplainer(model, X_background)

In [30]:
vals1 = explainer.shap_values(X_test)

explainer_original = shap_original.DeepExplainer(deepcopy(model), X_background)
vals2 = torch.as_tensor(explainer_original.shap_values(X_test), **TOPTS)

print(torch.norm(vals1 - vals2) / torch.norm(vals1))

tensor(0.0073, device='cuda:0', grad_fn=<DivBackward0>)


In [26]:
J2 = torch.autograd.grad(explainer.shap_values(X_test).reshape(-1)[0], next(model.parameters()))[0]
print(J2)
#f = lambda x: explainer.shap_values(x).detach().reshape(-1)[0]
#J = finite_diff(f, X_test, 1e-5)
#X_test.requires_grad = True

Using a non-full backward hook when the forward contains multiple autograd Nodes is deprecated and will be removed in future versions. This hook will be missing some grad_input. Please use register_full_backward_hook to get the documented behavior.


tensor([[-3.5033e-06,  7.4467e-08, -3.2063e-07,  ...,  7.6408e-08,
          1.4895e-08,  5.8486e-08],
        [-9.7314e-04,  4.6529e-08,  1.4900e-07,  ..., -6.3004e-09,
         -1.2282e-09,  1.9573e-09],
        [ 6.3739e-05,  2.4759e-07, -1.7470e-06,  ...,  2.9007e-07,
          5.6548e-08,  2.2286e-07],
        ...,
        [ 1.4102e-04,  1.7241e-08, -8.3069e-08,  ...,  2.4468e-08,
          4.7699e-09,  2.1345e-08],
        [-5.7081e-06, -1.2926e-08,  2.7734e-07,  ..., -4.1700e-08,
         -8.1292e-09, -3.5948e-08],
        [-5.5522e-05,  2.3477e-08, -2.0407e-07,  ...,  3.3517e-08,
          6.5339e-09,  2.6414e-08]], device='cuda:0')


### Multi-input test (we probably won't need this)

In [None]:
model, loss_fn, optimizer, scheduler, Xs = generate_model()
X_background = torch.as_tensor(Xtr[torch.randperm(Xtr.shape[0])[:100], ...], **TOPTS)
X_test = torch.as_tensor(Xtr[torch.randperm(Xtr.shape[0])[:10], ...], **TOPTS)
Xb1, Xb2 = X_background[..., :3], X_background[..., 3:]
Xt1, Xt2 = X_test[..., :3], X_test[..., 3:]

explainer = shap.DeepExplainer(deepcopy(model2), [Xb1, Xb2])
vals1 = explainer.shap_values([Xt1, Xt2])

explainer_original = shap_original.DeepExplainer(deepcopy(model2), [Xb1, Xb2])
vals2 = explainer_original.shap_values([Xt1, Xt2])

#print(torch.norm(vals1 - vals2) / torch.norm(vals1))

--------------------------------------------------------------------------------