In [None]:
"""
Scikit-like estimators for the attack model and shadow models.
"""

import sklearn
import numpy as np

from tqdm import tqdm


class ShadowModelBundle(sklearn.base.BaseEstimator):
    """
    A bundle of shadow models.

    :param model_fn: Function that builds a new shadow model
    :param shadow_dataset_size: Size of the training data for each shadow model
    :param num_models: Number of shadow models
    :param seed: Random seed
    :param ModelSerializer serializer: Serializer for the models. If None,
            the shadow models will be stored in memory. Otherwise, loaded
            and saved when needed.
    """

    MODEL_ID_FMT = "shadow_%d"

    def __init__(
        self, model_fn, shadow_dataset_size, num_models=20, seed=42, serializer=None
    ):
        super().__init__()
        self.model_fn = model_fn
        self.shadow_dataset_size = shadow_dataset_size
        self.num_models = num_models
        self.seed = seed
        self.serializer = serializer
        self._reset_random_state()

    def fit_transform(self, X, y, verbose=False, fit_kwargs=None):
        """Train the shadow models and get a dataset for training the attack.

        :param X: Data coming from the same distribution as the target
                  training data
        :param y: Data labels
        :param bool verbose: Whether to display the progressbar
        :param dict fit_kwargs: Arguments that will be passed to the fit call for
                each shadow model.

        .. note::
            Be careful when holding out some of the passed data for validation
            (e.g., if using Keras, passing `fit_kwargs=dict(validation_split=0.7)`).
            Such data will be marked as "used in training", whereas it was used for
            validation. Doing so may decrease the success of the attack.
        """
        self._fit(X, y, verbose=verbose, fit_kwargs=fit_kwargs)
        return self._transform(verbose=verbose)

    def _reset_random_state(self):
        self._prng = np.random.RandomState(self.seed)

    def _get_model_iterator(self, indices=None, verbose=False):
        if indices is None:
            indices = range(self.num_models)
        if verbose:
            indices = tqdm(indices)
        return indices

    def _get_model(self, model_index):
        if self.serializer is not None:
            model_id = ShadowModelBundle.MODEL_ID_FMT % model_index
            model = self.serializer.load(model_id)
        else:
            model = self.shadow_models_[model_index]
        return model

    def _fit(self, X, y, verbose=False, pseudo=False, fit_kwargs=None):
        """Train the shadow models.

        .. note::
        Be careful not to hold out some of the passed data for validation
        (e.g., if using Keras, passing `fit_kwargs=dict(validation_split=0.7)`).
        Such data will be incorrectly marked as "used in training", whereas
        it was not.

        :param X: Data coming from the same distribution as the target
                  training data
        :param y: Data labels
        :param bool verbose: Whether to display the progressbar
        :param bool pseudo: If True, does not fit the models
        :param dict fit_kwargs: Arguments that will be passed to the fit call for
                each shadow model.
        """
        self.shadow_train_indices_ = []
        self.shadow_test_indices_ = []

        if self.serializer is None:
            self.shadow_models_ = []

        fit_kwargs = fit_kwargs or {}
        indices = np.arange(X.shape[0])

        for i in self._get_model_iterator(verbose=verbose):
            # Pick indices for this shadow model.
            shadow_indices = self._prng.choice(
                indices, 2 * self.shadow_dataset_size, replace=False
            )
            train_indices = shadow_indices[: self.shadow_dataset_size]
            test_indices = shadow_indices[self.shadow_dataset_size :]
            X_train, y_train = X[train_indices], y[train_indices]
            self.shadow_train_indices_.append(train_indices)
            self.shadow_test_indices_.append(test_indices)

            if pseudo:
                continue

            # Train the shadow model.
            shadow_model = self.model_fn()
            shadow_model.fit(X_train, y_train, **fit_kwargs)
            if self.serializer is not None:
                self.serializer.save(ShadowModelBundle.MODEL_ID_FMT % i, shadow_model)
            else:
                self.shadow_models_.append(shadow_model)

        self.X_fit_ = X
        self.y_fit_ = y
        self._reset_random_state()
        return self

    def _pseudo_fit(self, X, y, verbose=False, fit_kwargs=None):
        self._fit(X, y, verbose=verbose, fit_kwargs=fit_kwargs, pseudo=True)

    def _transform(self, shadow_indices=None, verbose=False):
        """Produce in/out data for training the attack model.

        :param shadow_indices: Indices of the shadow models to use
                for generating output data.
        :param verbose: Whether to show progress
        """
        shadow_data_array = []
        shadow_label_array = []

        model_index_iter = self._get_model_iterator(
            indices=shadow_indices, verbose=verbose
        )

        for i in model_index_iter:
            shadow_model = self._get_model(i)
            train_indices = self.shadow_train_indices_[i]
            test_indices = self.shadow_test_indices_[i]

            train_data = self.X_fit_[train_indices], self.y_fit_[train_indices]
            test_data = self.X_fit_[test_indices], self.y_fit_[test_indices]
            shadow_data, shadow_labels = prepare_attack_data(
                shadow_model, train_data, test_data
            )

            shadow_data_array.append(shadow_data)
            shadow_label_array.append(shadow_labels)

        X_transformed = np.vstack(shadow_data_array).astype("float32")
        y_transformed = np.hstack(shadow_label_array).astype("float32")
        return X_transformed, y_transformed


def prepare_attack_data(model, data_in, data_out):
    """
    Prepare the data in the attack model format.

    :param model: Classifier
    :param (X, y) data_in: Data used for training
    :param (X, y) data_out: Data not used for training

    :returns: (X, y) for the attack classifier
    """
    X_in, y_in = data_in
    X_out, y_out = data_out

    #y_hat_in = model.predict_proba(X_in)
    y_hat_in = model.predict(X_in)

    #y_hat_out = model.predict_proba(X_out)
    y_hat_out = model.predict(X_out)

    labels = np.ones(y_in.shape[0])
    labels = np.hstack([labels, np.zeros(y_out.shape[0])])
    # TODO: this does not work for non-one-hot labels.
    # data = np.hstack([y_hat_in, y_in])
    data = np.c_[y_hat_in, y_in]
    data = np.vstack([data, np.c_[y_hat_out, y_out]])
    return data, labels


class AttackModelBundle(sklearn.base.BaseEstimator):
    """
    A bundle of attack models, one for each target model class.

    :param model_fn: Function that builds a new shadow model
    :param num_classes: Number of classes
    :param ModelSerializer serializer: Serializer for the models. If not None,
            the models will not be stored in memory, but rather loaded
            and saved when needed.
    :param class_one_hot_encoded: Whether the shadow data uses one-hot encoded
            class labels.
    """

    MODEL_ID_FMT = "attack_%d"

    def __init__(
        self, model_fn, num_classes, serializer=None, class_one_hot_coded=True
    ):
        self.model_fn = model_fn
        self.num_classes = num_classes
        self.serializer = serializer
        self.class_one_hot_coded = class_one_hot_coded

    def fit(self, X, y, verbose=False, fit_kwargs=None):
        """Train the attack models.

        :param X: Shadow predictions coming from
                  :py:func:`ShadowBundle.fit_transform`.
        :param y: Ditto
        :param verbose: Whether to display the progressbar
        :param fit_kwargs: Arguments that will be passed to the fit call for
                each attack model.
        """
        X_total = X[:, : self.num_classes]
        classes = X[:, self.num_classes :]

        datasets_by_class = []
        data_indices = np.arange(X_total.shape[0])
        for i in range(self.num_classes):
            if self.class_one_hot_coded:
                class_indices = data_indices[np.argmax(classes, axis=1) == i]
            else:
                class_indices = data_indices[np.squeeze(classes) == i]

            datasets_by_class.append((X_total[class_indices], y[class_indices]))

        if self.serializer is None:
            self.attack_models_ = []

        dataset_iter = datasets_by_class
        if verbose:
            dataset_iter = tqdm(dataset_iter)
        for i, (X_train, y_train) in enumerate(dataset_iter):
            model = self.model_fn
            fit_kwargs = fit_kwargs or {}
            model.fit(X_train, y_train, **fit_kwargs)

            if self.serializer is not None:
                model_id = AttackModelBundle.MODEL_ID_FMT % i
                self.serializer.save(model_id, model)
            else:
                self.attack_models_.append(model)

    def _get_model(self, model_index):
        if self.serializer is not None:
            model_id = AttackModelBundle.MODEL_ID_FMT % model_index
            model = self.serializer.load(model_id)
        else:
            model = self.attack_models_[model_index]
        return model

    def predict_proba(self, X):
        result = np.zeros((X.shape[0], 2))
        shadow_preds = X[:, : self.num_classes]
        classes = X[:, self.num_classes :]

        data_indices = np.arange(shadow_preds.shape[0])
        for i in range(self.num_classes):
            model = self._get_model(i)
            if self.class_one_hot_coded:
                class_indices = data_indices[np.argmax(classes, axis=1) == i]
            else:
                class_indices = data_indices[np.squeeze(classes) == i]

            membership_preds = model.predict(shadow_preds[class_indices])
            for j, example_index in enumerate(class_indices):
                prob = np.squeeze(membership_preds[j])
                result[example_index, 1] = prob
                result[example_index, 0] = 1 - prob

        return result

    def predict(self, X):
        probs = self.predict_proba(X)[:, 1]
        return probs > 0.5

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

import tensorflow as tf
from tensorflow import keras
from keras.datasets import cifar10
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPooling2D
from keras.losses import binary_crossentropy, categorical_crossentropy
from keras.optimizers import Adam
from keras.activations import relu, sigmoid
from sklearn.model_selection import train_test_split

In [None]:
(X_train, y_train), (X_test, y_test) = cifar10.load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


# Preprocess data

In [None]:
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

X_train = X_train.astype('float32')
X_test = X_test.astype('float32')

y_test = y_test.astype('float32')
y_test = y_test.astype('float32')

X_train /= 255.0
X_test /= 255.0

In [None]:
def target_model_fn():
  model = Sequential([
      Conv2D(32, (3,3), activation = 'relu', padding = "same", input_shape=(32, 32, 3)),
      Conv2D(32, (3, 3), activation = 'relu'),
      MaxPooling2D((2, 2)),
      Dropout(0.25),

      Conv2D(64, (3, 3), activation = 'relu', padding='same'),
      Conv2D(64, (3, 3), activation = 'relu'),
      MaxPooling2D((2, 2)),
      Dropout(0.25),

      Flatten(),

      Dense(512, activation='relu'),
      Dropout(0.5),

      Dense(10, activation='softmax')
  ])

  model.compile(Adam(), loss=categorical_crossentropy, metrics=['accuracy'])

  return model

In [None]:
def attack_models():
  nn_model = Sequential([
      Dense(128, activation = relu, input_shape=(10,)),
      Dropout(0.3, noise_shape=None, seed=None),
      Dense(64, activation=relu),
      Dropout(0.2, noise_shape=None, seed=None),
      Dense(64, activation=relu),
      Dense(1, activation=sigmoid)
  ])

  nn_model.compile(optimizer = Adam(), loss = binary_crossentropy, metrics=['accuracy'])

  random_forest = RandomForestClassifier()
  logistic_regression = LogisticRegression()

  return [nn_model]

In [None]:
target_model = target_model_fn()

In [None]:
target_model.fit(X_train, y_train, epochs=20, validation_split=0.3, verbose=2)

Epoch 1/20
1094/1094 - 172s - loss: 1.3105 - accuracy: 0.5303 - val_loss: 1.1302 - val_accuracy: 0.6024 - 172s/epoch - 157ms/step
Epoch 2/20
1094/1094 - 170s - loss: 1.1051 - accuracy: 0.6053 - val_loss: 0.9781 - val_accuracy: 0.6526 - 170s/epoch - 155ms/step
Epoch 3/20
1094/1094 - 170s - loss: 0.9848 - accuracy: 0.6509 - val_loss: 0.8874 - val_accuracy: 0.6849 - 170s/epoch - 155ms/step
Epoch 4/20
1094/1094 - 172s - loss: 0.9020 - accuracy: 0.6840 - val_loss: 0.8041 - val_accuracy: 0.7197 - 172s/epoch - 157ms/step
Epoch 5/20
1094/1094 - 168s - loss: 0.8340 - accuracy: 0.7061 - val_loss: 0.7678 - val_accuracy: 0.7340 - 168s/epoch - 154ms/step
Epoch 6/20
1094/1094 - 174s - loss: 0.7771 - accuracy: 0.7249 - val_loss: 0.7482 - val_accuracy: 0.7393 - 174s/epoch - 159ms/step
Epoch 7/20
1094/1094 - 175s - loss: 0.7307 - accuracy: 0.7439 - val_loss: 0.7479 - val_accuracy: 0.7443 - 175s/epoch - 160ms/step
Epoch 8/20
1094/1094 - 169s - loss: 0.7017 - accuracy: 0.7550 - val_loss: 0.7148 - val_acc

<keras.src.callbacks.History at 0x7ffa936b1b40>

In [None]:
smb = ShadowModelBundle(
    target_model_fn,
    shadow_dataset_size=3000,
    num_models=1,
  )

In [None]:
X_train_shadow, X_test_shadow, y_train_shadow, y_test_shadow = train_test_split(
    X_test,
    y_test,
    test_size=0.33
)

In [None]:
X_shadow, y_shadow = smb.fit_transform(
    X_train_shadow,
    y_train_shadow,
    fit_kwargs=dict(
        epochs=20,
        verbose=True,
        validation_data=(X_test_shadow, y_test_shadow),
    ),
)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
for attack_model in attack_models():
  amb = AttackModelBundle(attack_model, num_classes=10)

  amb.fit(
      X_shadow,
      y_shadow,
      fit_kwargs=dict(
          epochs=20,
          verbose=True
          )
  )

  data_in = X_train[:3000], y_train[:3000]
  data_out = X_test[:3000], y_test[:3000]

  attack_test_data, real_membership_labels = prepare_attack_data(
                                                                target_model,
                                                                data_in,
                                                                data_out
                                                              )

  attack_guesses = amb.predict(attack_test_data)
  attack_accuracy = np.mean(attack_guesses == real_membership_labels)

  print(f'Attack accuracy with {attack_model. __class__. __name__} is \n{attack_accuracy}')

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20