In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

In [2]:

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

In [3]:

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)


In [18]:
def _compute_calibration_bin_statistics(
    num_bins, logits=None, labels_true=None, labels_predicted=None):
    """ Compute binning statistics required for calibration measures.
  Args:
    num_bins: int, number of probability bins, e.g. 10.
    logits: Tensor, (n,nlabels), with logits for n instances and nlabels.
    labels_true: Tensor, (n,), with tf.int32 or tf.int64 elements containing
      ground truth class labels in the range [0,nlabels].
    labels_predicted: Tensor, (n,), with tf.int32 or tf.int64 elements
      containing decisions of the predictive system.  If `None`, we will use
      the argmax decision using the `logits`.
  Returns:
    bz: Tensor, shape (2,num_bins), tf.int32, counts of incorrect (row 0) and
      correct (row 1) predictions in each of the `num_bins` probability bins.
    pmean_observed: Tensor, shape (num_bins,), tf.float32, the mean predictive
      probabilities in each probability bin.
    """
    
    if labels_predicted is None:
    # If no labels are provided, we take the label with the maximum probability
    # decision.  This corresponds to the optimal expected minimum loss decision
    # under 0/1 loss.
        pred_y = tf.argmax(logits, axis=1, output_type=labels_true.dtype)
    else:
        pred_y = labels_predicted

        correct = tf.cast(tf.equal(pred_y, labels_true), tf.int32)

        # Collect predicted probabilities of decisions
        pred = tf.nn.softmax(logits, axis=1)
        prob_y = tf.gather(
          pred, pred_y[:, tf.newaxis], batch_dims=1)  # p(pred_y | x)
        prob_y = tf.reshape(prob_y, (ps.size(prob_y),))

        # Compute b/z histogram statistics:
        # bz[0,bin] contains counts of incorrect predictions in the probability bin.
        # bz[1,bin] contains counts of correct predictions in the probability bin.
        bins = tf.histogram_fixed_width_bins(prob_y, [0.0, 1.0], nbins=num_bins)
        event_bin_counts = tf.math.bincount(
          correct * num_bins + bins,
          minlength=2 * num_bins,
          maxlength=2 * num_bins)
        event_bin_counts = tf.reshape(event_bin_counts, (2, num_bins))

        # Compute mean predicted probability value in each of the `num_bins` bins
        pmean_observed = tf.math.unsorted_segment_sum(prob_y, bins, num_bins)
        tiny = np.finfo(dtype_util.as_numpy_dtype(logits.dtype)).tiny
        pmean_observed = pmean_observed / (
          tf.cast(tf.reduce_sum(event_bin_counts, axis=0), logits.dtype) + tiny)

    return event_bin_counts, pmean_observed

In [11]:
def computeECE(num_bins, logits=None, labels_true=None,
                               labels_predicted=None): 
    """
    Args:
        num_bins: int, number of probability bins, e.g. 10.
        logits: Tensor, (n,nlabels), with logits for n instances and nlabels.
        labels_true: Tensor, (n,), with tf.int32 or tf.int64 elements containing
          ground truth class labels in the range [0,nlabels].
        labels_predicted: Tensor, (n,), with tf.int32 or tf.int64 elements
          containing decisions of the predictive system.  If `None`, we will use
          the argmax decision using the `logits`.
    Returns:
        ece: Tensor, scalar, tf.float32.
    """
    with tf.name_scope('expected_calibration_error'):
        logits = tf.convert_to_tensor(logits)
        labels_true = tf.convert_to_tensor(labels_true)
        if labels_predicted is not None:
            labels_predicted = tf.convert_to_tensor(labels_predicted)

        # Compute empirical counts over the events defined by the sets
        # {incorrect,correct}x{0,1,..,num_bins-1}, as well as the empirical averages
        # of predicted probabilities in each probability bin.
        event_bin_counts, pmean_observed = _compute_calibration_bin_statistics(
            num_bins, logits=logits, labels_true=labels_true,
            labels_predicted=labels_predicted)

        # Compute the marginal probability of observing a probability bin.
        event_bin_counts = tf.cast(event_bin_counts, tf.float32)
        bin_n = tf.reduce_sum(event_bin_counts, axis=0)
        pbins = bin_n / tf.reduce_sum(bin_n)  # Compute the marginal bin probability

        # Compute the marginal probability of making a correct decision given an
        # observed probability bin.
        tiny = np.finfo(np.float32).tiny
        pcorrect = event_bin_counts[1, :] / (bin_n + tiny)

        # Compute the ECE statistic as defined in reference [1].
        ece = tf.reduce_sum(pbins * tf.abs(pcorrect - pmean_observed))
    return ece

In [4]:

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    with tf.GradientTape(persistent=True) as tape:
    # Iterate over the batches of the dataset.
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

            # Open a GradientTape to record the operations run
            # during the forward pass, which enables auto-differentiation.
            

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

            # Use the gradient tape to automatically retrieve
            # the gradients of the trainable variables with respect to the loss.
            grads = tape.gradient(loss_value, model.trainable_weights)

            # Run one step of gradient descent by updating
            # the value of the variables to minimize the loss.
            optimizer.apply_gradients(zip(grads, model.trainable_weights))

            # Log every 200 batches.
            if step % 200 == 0:
                print(
                    "Training loss (for one batch) at step %d: %.4f"
                    % (step, float(loss_value))
                )
                print("Seen so far: %s samples" % ((step + 1) * batch_size))


Start of epoch 0
Training loss (for one batch) at step 0: 81.7544
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.4238
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.8191
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.0635
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.4841
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.8723
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6052
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.9201
Seen so far: 38464 samples
