# Adversarial Robustness using AdverTorch BlackBox Estimators

We have previously introduced Advertorch [link to blog post], which is a toolbox for adversarial robustness research. As part of our initiative at RBC, we have been building more tools to validate the robustness of various models and visuals to make it easy for others to understand the results. In this tutorial, we will go through how you can use Advertorch to assess the robustness of your model using the newly added blackbox estimators. These estimators allow you to use the toolbox for any type of model regardless of the deep learning framework used to develop them in the first place. To assess the robustness, we will;
- Define what we mean by adversarial robustness
- Use AdverTorch to find adversarial examples
- Visualize the results

## Adversarial Robustness

Should there be a link to a paper here or is it safe to provide the definition we normally use? 

## Dataset and Model

For this tutorial we will be using a simple 

In [9]:
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

dataset = load_breast_cancer()
X, y = dataset.data, dataset.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
print(len(X_train[0]))
print("Accuracy: ", accuracy_score(model.predict(X_test), y_test))

30
Accuracy:  0.9298245614035088


In [11]:
# Use a BlackBox Relaxation
from advertorch.attacks.blackbox import NESWrapper
import torch.nn as nn

class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd
    def forward(self, x):
        return self.lambd(x)

# Ask Giuseppe
activation = LambdaLayer(lambda x: x)
pred_fcn = NESWrapper(func=model.predict_proba,  nb_samples=100)
bb_estimator = nn.Sequential(pred_fcn, activation)

In [4]:
class RobustRegression:
    """General statement of robustness."""

    def __init__(self, neighbourhood, model):
        super().__init__(neighbourhood, model)

    @property
    def task(self):
        return "regression"

    def pred(self, x):
        with torch.no_grad():
            y = self.model.predict(x)
        return y

    def eval_input(self, X, Xprime, epsilon):
        return self.neighbourhood(Xprime, X) <= epsilon

    def eval_output(self, Y, Yprime, delta):
        return abs(Y - Yprime) > delta

    def eval_counterexample(self, X, Xprime, epsilon, delta):
        valid_input = self.eval_input(X, Xprime, epsilon)
        Y, Yprime = self.pred(X), self.pred(Xprime)
        invalid_output = self.eval_output(Y, Yprime, delta)

        return valid_input & invalid_output

class MultiLabelMixin:
    """Mixin for multi-class classifiers."""

    @property
    def task(self):
        return "classification"

    def pred(self, x):
        with torch.no_grad():
            y = self.model.predict(x).argmax(-1)
        return y

    def eval_output(self, Y, Yprime, delta):
        return Y != Yprime
        
class RobustClassification(MultiLabelMixin, RobustRegression):
    """For all inputs in a given neighbourhood, check if the model predicts the
    same class (according to argmax)"""

    def __init__(self, neighbourhood, model):
        super().__init__(neighbourhood=neighbourhood, model=model)

In [8]:
import torch
from typing import Tuple, List, Dict
from advertorch.utils import clamp as batch_clamp

def parse_config(config: List[Dict]) -> Dict[str, torch.Tensor]:
    """Extract feature information from a given config.

    Returns:
        Dictionary of torch tensors.
    """
    n_dim = len(config)

    mins = torch.zeros(n_dim)
    maxs = torch.zeros(n_dim)
    scales = torch.ones(n_dim)
    intmask = torch.zeros(n_dim)
    ignore = torch.zeros(n_dim)
    for i, feature in enumerate(config):
        if not feature["ignore"]:
            l, u = feature["ranges"][0]
            mins[i] = l
            maxs[i] = u
            scales[i] = feature["scales"][0]

        intmask[i] = (
            feature["Feature_type"] == "BIN"  # noqa: BLK100
            or feature["Feature_type"] == "INT"
        )
        ignore[i] = feature["ignore"]

    return {
        "mins": mins,
        "maxs": maxs,
        "scales": scales,
        "intmask": intmask.bool(),
        "ignore": ignore.bool(),
    }

class LinfNeighbourhood(nn.Module):
    """A neighbourhood defined by a Linf-norm constraint.

    Args:
        pert_mode: Set to 'add' for arithmetic perturbations.
            Set to 'ratio' for logarithmic perturbations.

        buff: stability buffer for pert_mode ratio
    """

    def __init__(self, pert_mode: str = "add", buff: float = 1e-9):
        nn.Module.__init__(self)
        if pert_mode not in {"add", "ratio"}:
            raise ValueError(
                "pert_mode '{}' is not currently supported".format(pert_mode)
            )
        self.pert_mode = pert_mode
        self.buff = buff

    def __call__(
        self, x1: torch.FloatTensor, x2: torch.FloatTensor
    ) -> torch.FloatTensor:
        """The Linf metric, applied according to pert_mode.

        Args:
            x1: Array of inputs.  Shape [n_batch, n_feature]

            x2: Array of inputs.  For pert_mode ratio, this is used to
                normalize the distance.  Shape [n_batch, n_feature]

        Returns:
            Distance between the rows of x1 and x2.  Shape [n_batch]
        """
        z1, z2 = self.transform(x1), self.transform(x2)

        if self.pert_mode == "add":
            return (abs(z1 - z2)).max(-1)[0]
        elif self.pert_mode == "ratio":
            # Note: the sign function is used to shift values away from zero,
            # which will depend on sign
            return abs((z1 - z2) / (z2 + self.buff * sign(z2))).max(-1)[0]

    def bounds(
        self, x: torch.FloatTensor, eps: torch.FloatTensor
    ) -> Tuple[torch.FloatTensor, torch.FloatTensor]:
        """Upper and lower bounds for the epsilon ball (which has the form of a
        square).

        Args:
            x: Array of inputs.  Shape [n_batch, n_feature]

            eps: Value of epsilon.  Shape [n_batch]

        Returns:
            (bmin, bmax): Tuple of lower and upper bounds.
                Shape [n_batch, n_feature]
        """
        if self.pert_mode == "add":
            # The epsilon ball is defined via small perturbations
            # to the original value
            bmin, bmax = x - eps[:, None], x + eps[:, None]
        elif self.pert_mode == "ratio":
            # The epsilon ball is defined as a percentage fluctuation of the
            # original value
            rel_eps = abs(x * eps[:, None])
            bmin, bmax = x - rel_eps, x + rel_eps

        return bmin, bmax

    def project(
        self,  # noqa: BLK100
        x: torch.FloatTensor,
        bounds: Tuple[torch.FloatTensor, torch.FloatTensor],
    ) -> torch.FloatTensor:
        """Clamp a given point within the bounds.

        Args:
            x: Array of inputs.  Shape [n_batch, n_feature]

            bounds: Tuple of (bmin, bmax).  These are the upper and lower
                bounds of an epsilon ball.

        Return:
            The points in x clipped to the range defined by bounds.
                Shape [n_batch, n_feature]
        """
        bmin, bmax = bounds
        return batch_clamp(x, bmin, bmax)

class ScaledLinfNeighbourhood(LinfNeighbourhood):
    """A neighbourhood defined by a scaled Linf-norm constraint.

    Args:
        config: list of dicts containing all information about the features

        pert_mode: Set to 'add' for arithmetic perturbations.
            Set to 'ratio' for logarithmic perturbations.

        buff: stability buffer for pert_mode ratio

    Attributes:
        mins (torch.FloatTensor): Absolute lower bound on the inputs

        maxs (torch.FloatTensor): Absolute upper bound on the inputs

        scales (torch.FloatTensor): Scale used to normalize the inputs

        intmask (torch.BoolTensor): Indicates whether a given feature is an
            integer

        ignore (torch.BoolTensor): Indicates whether a given dimension should
            be kept fixed
    """

    def __init__(  # noqa: BLK100
        self, config: List[Dict], pert_mode: str = "add", buff: float = 1e-9
    ):
        super().__init__(pert_mode=pert_mode, buff=buff)

        self.set_scales(config)

    def set_scales(self, config):
        for name, buff in parse_config(config).items():
            self.register_buffer(name, buff)

    def bounds(
        self, x: torch.FloatTensor, eps: torch.FloatTensor
    ):
        """Upper and lower bounds for the epsilon ball (which has the form of a
        rectangle).

        Args:
            x: Array of inputs.  Shape [n_batch, n_feature]

            eps: Value of epsilon.  Shape [n_batch]

        Returns:
            (bmin, bmax): Tuple of lower and upper bounds.
                Shape [n_batch, n_feature]
        """
        # Use a conditional mask to enforce validity of the generated
        # counterexamples.  Features that are outside of the valid range are
        # masked, and thus not perturbed
        outlier_mask = (x < self.mins) | (x > self.maxs)
        mask = outlier_mask | self.ignore

        if self.pert_mode == "add":
            # The epsilon ball is defined via small perturbations to the
            # original value, where small is defined by the overall
            # range of that value
            eps_max = torch.min(x + eps[:, None] * self.scales, self.maxs)
            eps_min = torch.max(x - eps[:, None] * self.scales, self.mins)
        elif self.pert_mode == "ratio":
            # The epsilon ball is defined as a percentage fluctuation of the
            # original value
            rel_eps = abs(x * eps[:, None])

            eps_max = torch.min(x + rel_eps, self.maxs)
            eps_min = torch.max(x - rel_eps, self.mins)

        # We need to use the 1e-9 buffer to ensure the correct traversals
        # through the trees.  More specifically, adding 1e-9 will ensure bmin
        # lies above eps_min.
        # This also ensures the masked features will be normalized correctly
        # when distances are computed
        bmin = mask * x + ~mask * eps_min - self.buff
        bmax = mask * x + ~mask * eps_max + self.buff

        # Sanity check
        assert torch.all(bmin <= bmax)

        return bmin, bmax

    def transform(self, x: torch.FloatTensor) -> torch.FloatTensor:
        """Use the mins and scales to normalize the inputs into the range.

        [0,1]
        """
        # TODO:
        # This transform will not adjust "ignored" variables
        # Ex if a feature is supposed to be ignored, and has value 100
        # It will have value 100 after transform is called
        # Does this introduce any problems? Ex with distance calculation?
        return (x - self.mins) / self.scales

    def inverse_transform(self, x: torch.FloatTensor) -> torch.FloatTensor:
        """Use the mins and scales to take the inputs from the range [0,1] to
        their raw values."""
        return x * self.scales + self.mins

    def project(
        self,  # noqa: BLK100
        x: torch.FloatTensor,
        bounds,
    ) -> torch.FloatTensor:
        """Clamp a given point within the bounds.

        Args:
            x: Array of inputs.  Shape [n_batch, n_feature]

            bounds: Tuple of (bmin, bmax).  These are the upper and lower
                bounds of an epsilon ball.

        Return:
            The points in x clipped to the range defined by bounds.
                Shape [n_batch, n_feature]
        """
        # Clamp to Linf Ball
        x = super().project(x, bounds)

        # Fix integers
        bmin, bmax = bounds
        rounded = box_round(x, bmin, bmax)
        x = torch.where(self.intmask.bool(), rounded, x)

        return x