# TensorFlow Constrained Optimization Tutorial

Constrains a class tensorflow_constrained_optimization for safety constraint (FNR < alpha)
and an example for a simple dataset and simple fully connected neural network.

Note: This tutorial has been run on google colab

In [None]:
# Install the toolbox
!pip install git+https://github.com/google-research/tensorflow_constrained_optimization

In [16]:
# import libraries:

# tensorflow and keras
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
import tensorflow_constrained_optimization as tfco

# sckit-learn
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import *

# others
import os
import tempfile
import numpy as np

## Class for Safe Classification

In [17]:
class ConstrainedModel:
    """A class using constrained optimization in Keras Model
    This class works only constrained on false negative rate
    Constraint is: FNR < alpha
    """

    def __init__(self, model, **kwargs):
        self.model = model

    ############# Training ##################
    def fit(self, X_train, y_train, batch_size, epochs, learning_rate, alpha):
        
        # Split the training set to Train and Valdation
        X_train, X_vali, y_train, y_vali = train_test_split(X_train, y_train, test_size=0.2)
        
        # for training purpose
        num_examples = X_train.shape[0]


        # Create features and labels tensors to hold minibatch content.
        batch_shape = (batch_size, X_train.shape[1])
        features_tensor = tf.Variable(np.zeros(batch_shape, dtype="int32"), name="features")

        batch_shape = (batch_size, 1)
        labels_tensor = tf.Variable(np.zeros(batch_shape, dtype="float32"), name="labels")

        # The prediction tensor is used in the constext
        def predictions():
            return self.model(features_tensor)
        
        # Set up separate contexts.
        context = tfco.rate_context(predictions, lambda: labels_tensor)


        # Compute the objective: This is the loss function
        objective = tfco.error_rate(context)
        
        # Set the constraints
        constraints = [tfco.false_negative_rate(context) <= alpha]

        # Create a rate minimization problem.
        problem = tfco.RateMinimizationProblem(objective, constraints)
        
        # Set up a constrained optimizer.
        optimizer = tfco.ProxyLagrangianOptimizerV2(
          optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
          num_constraints=problem.num_constraints)

        # the constrained optimizer.
        var_list = (self.model.trainable_weights + problem.trainable_variables +
                    optimizer.trainable_variables())

        
        # this is the function for computing constraint violation
        def false_negative_rate(labels, predictions):
          if np.sum(labels > 0) == 0:  # Any positives?
            return 0.0
          else:
            return np.mean(predictions[labels > 0] <= 0)

        
        # Create temporary directory to record model snapshots.
        temp_directory = tempfile.mktemp()
        os.mkdir(temp_directory)

        # List of recorded objectives and constrained violations.
        objectives_list = []
        violations_list = []

        # Loop over minibatches.
        for batch_index in range(epochs):
            # Indices for current minibatch in the first stream.
            batch_indices = np.arange(batch_index * batch_size, (batch_index + 1) * batch_size)
            batch_indices = [ind % num_examples for ind in batch_indices]

            # Assign features, labels, groups from the minibatches to the respective tensors.
            features_tensor.assign(X_train[batch_indices, :])

            labels_tensor.assign(y_train[batch_indices])

            # Gradient update.
            optimizer.minimize(problem, var_list=var_list)
            
            # To record the error and contrainted violations
            scores = self.model.predict(X_vali)
            error = self.error_rate(y_vali, scores)

            fnrs = false_negative_rate(
              y_vali, scores)
            violations = [fnrs - alpha]

            objectives_list.append(error)
            violations_list.append(violations)

            # Save model weights to temporary directory.
            self.model.save_weights(
              temp_directory + "constrained_" +
              str(int(batch_index)) + ".h5")
              
        
        
        # Select the best model from the recorded iterates using TFCO's find best
        # candidates heuristic.
        best_index = tfco.find_best_candidate_index(
          np.array(objectives_list), np.array(violations_list), rank_objectives=False)
              
        # Load model weights for the best iterate from the snapshots saved previously.
        self.model.load_weights(
          temp_directory + "constrained_" + str(best_index) + ".h5")

    ################### Evaluation #######################
    def error_rate(self, y_true, y_pred):
        # Returns error rate for given labels and predictions.
        # Recall that the labels are binary (0 or 1).
        signed_labels = (y_true * 2) - 1
        return np.mean(signed_labels * y_pred <= 0.0)
    
    def false_negative_rate(self, y_true, y_pred):
        # Returns false negative rate for given labels and predictions.
        if np.sum(y_true > 0) == 0:  # Any positives?
            return 0.0
        else:
            return np.mean(y_pred[y_true > 0] <= 0)

    def false_positive_rate(self, y_true, y_pred):
        # Returns false positive rate for given labels and predictions.
        if np.sum(y_true <= 0) == 0:  # Any negatives?
            return 0.0
        else:
            return np.mean(y_pred[y_true <= 0] > 0)

    def evaluate(self, X_test, y_test, verbose=0):

        y_true = y_test.reshape(-1, 1)
        y_pred = (self.model.predict(X_test) > 0.5).astype("int32")
        f1_measure = f1_score(y_true, y_pred)
        
        fn_rate = self.false_negative_rate(y_true, y_pred)
        fp_rate = self.false_positive_rate(y_true, y_pred)

        if verbose == 1:
            print("F1 Measure: {}".format(f1_measure))
            print("False Negative error: {}".format(fn_rate))
            print("False Positive error: {}".format(fp_rate))

        
        result = {'F1 Measure': f1_measure, 'False Negative error': fn_rate, 'False Positive error': fp_rate}
        self.result = result

# Dataset

In [18]:
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=2)
y_train = y_train.reshape(-1, 1)

## Set the hyper-parameters

In [19]:
params = {
    'alpha': 0.1,
    'nb_epoch': 100,
    'batch_size': 20,
    'learning_rate': 0.001}

## Train the Constrained Model

In [20]:
def create_model():
    model = keras.Sequential()
    model.add(layers.Dense(26, input_shape=(X_train.shape[1],),
                           activation='relu'))

    model.add(layers.Dense(1))
    return model

In [21]:
# Set random seed.
np.random.seed(232312)
tf.random.set_seed(323221)
model = create_model()
constrained_model = ConstrainedModel(model)

# fit the constrained model
constrained_model.fit(X_train, y_train,
                      batch_size=params['batch_size'], epochs=params['nb_epoch'],
                      learning_rate=params['learning_rate'], alpha=params['alpha'])
constrained_model.evaluate(X_test, y_test, verbose=1)

F1 Measure: 0.7389380530973452
False Negative error: 0.045714285714285714
False Positive error: 1.0
