<div style="text-align:center;">
    <img src="http://www.cs.wm.edu/~rml/images/wm_horizontal_single_line_full_color.png">
    <h1>CSCI 416-01/516-01, Fundamentals of AI/ML</h1>
    <h1>Fall 2025</h1>
    <h1>An introduction to Keras via CNNs</h1>
</div>

# Contents
* [Keras](#Keras)
* [Getting started](#Getting-started)
* [Prepare the data](#Prepare-the-data)
    * [Add a fake color channel](@Add-a-fake-color_channel])
* Specify the CNN
    * Examine the layers in the CNN
* Training the CNN

# Keras

Tensorflow is Google's low-level framework for neural networks.  Keras is a high-level interface to Tensorflow.

# Getting started

There is a version of Keras in Tensorflow as well as a standalone Keras module.  The one in Tensorflow lags the standalone version by a few versions.

Tensorflow is **big**.  Note how long it takes to import it.

In [None]:
import tensorflow as tf
from tensorflow import keras  # We'll use the Keras in Tensorflow.
import numpy as np

In [None]:
print(f"{tf.__version__ = }")

There is a lot of randomness in these tools.  We set seeds for the pseudo-random number generators (PRNGs) so that there is some hope we will get the same results if we execute the notebook again.

In [None]:
# Try to make the randomness repeatable.
np.random.seed(42)
tf.random.set_seed(54)

# Prepare the data

We will use the Fashion MNIST image dataset.  This is an image dataset of 10 types of clothing (e.g., t-shirts, sandals, shoes).

In [None]:
fmnist = keras.datasets.fashion_mnist
(X_train_all, y_train_all), (X_test, y_test) = fmnist.load_data()

In [None]:
# Split the training set into a training set and a validation set.
X_valid, X_train = X_train_all[:5000], X_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]

## Add a fake color channel

Keras expects images to have one or more color channels.  The greyscale images do not have one, so we need to add one.  This changes the tensors to have rank 4.  For instance, X_train changes from (55000,28,28) to (55000,28,28,1).

We also make an explicit conversion from numpy arrays to tensorflow tensors

In [None]:
X_train.shape

In [None]:
X_train = tf.expand_dims(tf.convert_to_tensor(X_train), axis=-1)
y_train = tf.convert_to_tensor(y_train)

X_valid = tf.expand_dims(tf.convert_to_tensor(X_valid), axis=-1)
y_valid = tf.convert_to_tensor(y_valid)

X_test = tf.expand_dims(tf.convert_to_tensor(X_test), axis=-1)
y_test = tf.convert_to_tensor(y_test)

In [None]:
X_train.shape

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress',
               'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag',
               'Ankle boot']

In [None]:
# The number of classes.
num_classes = len(class_names)

# Specify the CNN

NNs do not like large input values &ndash; they can cause problems during training, so our model will begin with rescaling the data to the interval $[0,1]$.

The layers of the CNN are as follows:
1. the input layer,
2. a rescaling layer to map the data to the interval $[0,1]$.
3. a convolutional layer consisting of 32 filters, each 3 x 3,
4. a convolutional layer consisting of 64 filters, each 3 x 3,
5. a max pooling layer to downsample the output of the previous layer by a factor of 2 in each dimension,
6. a dropout layer which randomly selects nodes to turn off for dropout regularization,
7. a flattening layer to turn the 2d image into a 1d vector,
8. a dense layer with all-to-all connections,
9. a second dropout layer, and finally
10. a standard layer with all-to-all connections that produces as output softmax estimates of the class probabilities.

In addition, in the call to the call to the model's [<code>compile</code>](https://keras.io/models/sequential) method we must also specify the [optimization algorithm](https://keras.io/optimizers) to be used in training the model.

In [None]:
print('constructing the model...', end='')

model = keras.models.Sequential()
model.add(keras.layers.InputLayer(shape=(28,28,1)))
model.add(keras.layers.Rescaling(scale=1.0/255.0))
model.add(keras.layers.Conv2D(32, (3, 3), activation='relu'))
model.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Dropout(0.25))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Dense(num_classes, activation='softmax'))

print('done!')

In [None]:
model.compile(loss=keras.losses.sparse_categorical_crossentropy,
              optimizer='adam',
              metrics=['accuracy'])

## Examine the layers in the CNN

In [None]:
for layer in model.layers:
    print(layer)

In [None]:
model.summary()

# Training the CNN

With large training sets one frequently takes optimization steps based on an approximate direction of steepest descent computed using only a subset of the the data.  We refer to the size of the subset used as the **batch size**.

An **epoch** is a pass through the entire data set in the process of training/optimization.

In each epoch, the batches are chosen randomly, whence the name **stochastic gradient descent** (SGD).

SGD allows us to try steps more quickly, and avoid the time and space requirements of processing the entire training set.  If the training data are reasonably uniform in their distribution, a subset of the data should give results similar to the entire training set.

In [None]:
# Check for GPUs.
print("GPUs detected:", tf.config.list_physical_devices("GPU"))
if not tf.config.list_physical_devices("GPU"):
    print('\a')
    print(90*'*')
    print("*** Â¡Cuidado, llamas! ðŸ¦™ðŸ¦™ðŸ¦™  Â¡GPU detected!  Â¡Training will be much slower on a CPU! ***")
    print(90*'*')    

## A few epochs of training

Training is done with the method [<code>fit</code>](https://keras.io/models/sequential) in the <code>Sequential</code> model class.

We also will use the test data as validation data.  This is useful to detect overfitting.

In [None]:
history = model.fit(X_train, y_train,
                    batch_size=256,
                    epochs=4,
                    verbose=1,
                    validation_data=(X_valid, y_valid))

Let's look at the training history:

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

pd.DataFrame(history.history).plot(figsize=(10, 6), use_index=[i for i in range(1,len(history.history)+1)])
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()
print(len(history.history))

Let's evaluate the model:

In [None]:
score = model.evaluate(X_test, y_test)
print('Test loss:    ', score[0])
print('Test accuracy:', score[1])

## Why is the training loss higher than the testing loss?

Sometimes the accuracy reported for the training set is **lower** than that reported for the test set, and the loss report for the training set is **higher** than that reported for the test set.  This seems backwards.

Per the [Keras FAQ](https://keras.io/getting-started/faq/#why-is-the-training-loss-much-higher-than-the-testing-loss):
<blockquote>
<p>
A Keras model has two modes: training and testing. Regularization mechanisms, such as Dropout and L1/L2 weight regularization, are turned off at testing time.
</p>
<p>
Besides, the training loss is the average of the losses over each batch of training data. Because your model is changing over time, the loss over the first batches of an epoch is generally higher than over the last batches. On the other hand, the testing loss for an epoch is computed using the model as it is at the end of the epoch, resulting in a lower loss.
</p>
</blockquote>

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

y_pred = model.predict(X_test)

cm = confusion_matrix(y_test, y_pred.argmax(axis=1))

print(cm)

In [None]:
print(class_names)

In [None]:
# Create a wrapper for our neural network.  It returns the 
# estimated class (rather than probabilities of class membership)
# and pretends to be a scikit-learn classifier.

class Wrapper:
    def __init__(self, model):
        self.model = model
        self._estimator_type = 'classifier'
    
    def predict(self, X_test):
        return self.model.predict(X_test).argmax(axis=1)
    
clf = Wrapper(model)

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

fig, ax = plt.subplots(figsize=(10,10))

ConfusionMatrixDisplay.from_estimator(clf, X_test, y_test, display_labels=class_names, ax=ax)

## More epochs 

Keras makes it easy to continue the training where we left off earlier.  It saves the information needed to continue training the neural network.

When we specify <code>initial_epoch</code>, the value of <code>epochs</code> is the epoch at which training ends, rather than the number of epochs we train over.

That is, if we set <code>initial_epoch=3</code> and <code>epochs=4</code>, then we will only optimize over a single epoch.

This convention is useful if we want to keep track of how many epochs of training have been performed.

In [None]:
model.fit(X_train, y_train,
          batch_size=256,
          epochs=8,
          verbose=1,
          validation_data=(X_valid, y_valid),
          initial_epoch=4)

score = model.evaluate(X_test, y_test)
print('Test loss:    ', score[0])
print('Test accuracy:', score[1])

# Keras callbacks 

Keras has a number of [callback](https://keras.io/callbacks) functions that can be used to specify actions to occur during training.

Callbacks include
* <code>ModelCheckpoint</code>, which saves the model after every epoch;
* <code>EarlyStopping</code>, which stops the training when a specified metric of quality has stopped improving;
* <code>ReduceLROnPlateau</code>, which reduces the learning rate when a specified metric of quality has stopped improving; and
* <code>TerminateOnNaN</code>, which terminates the training when a NaN is computed as the loss value.

# Looking at the neural network

We can look at the weights for layer <code>k</code> using <code>model.layers[k].get_weights()</code>.

In [None]:
print(model.layers[1].get_weights())

# Saving the model

We can use the model's <code>save</code> method to [save the model](https://keras.io/api/models/model_saving_apis/model_saving_and_loading/).

Saving the model saves the layout and the weights in the neural network, and also the optimizer's state so that you can continue training the model.

In [None]:
print('saving the model...', end='')
model.save('fashion_mnist.keras')
print('done!')

We can load the saved model using <code>keras.models.load_model</code>.

In [None]:
from keras.models import load_model

model2 = load_model('fashion_mnist.keras')

In [None]:
del model2

# Data augmentation

**Data augmentation** refers to creating new training examples from our training data.  This is especially useful when dealing with image data.

We can create new images by applying random horizontal and vertical shifts to existing images.  This is useful for the digit data since the digits might not be perfectly centered.

Other possible transmogrifications include flipping the image horizontally or vertically, rotating the images, and applying various types of scaling.  These augmentations are not helpful for the digit data.

We will reconstruct the model and train it using data augmentation to increase the effective size of the training set.

In [None]:
import tensorflow as tf

print('reconstructing the model...', end='')

model2 = keras.models.Sequential()
model2.add(keras.layers.InputLayer(shape=(28,28,1)))
model2.add(keras.layers.Rescaling(scale=1.0/255.0))
model2.add(keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu'))
model2.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
model2.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model2.add(keras.layers.Dropout(0.25))
model2.add(keras.layers.Flatten())
model2.add(keras.layers.Dense(128, activation='relu'))
#model2.add(keras.layers.Dropout(0.5))
model2.add(keras.layers.Dense(num_classes, activation='softmax'))

model2.compile(loss=keras.losses.sparse_categorical_crossentropy,
              optimizer=keras.optimizers.Adadelta(),
              metrics=['accuracy'])

print('done!')

In [None]:
# Try to control the randomness.
np.random.seed(42)
tf.random.set_seed(54)

datagen = keras.preprocessing.image.ImageDataGenerator(
  width_shift_range=0.1,               # randomly shift images horizontally (fraction of total width)
  height_shift_range=0.1,              # randomly shift images vertically (fraction of total height)
                                       # The remaining options are here for illustrative purposes only.
  featurewise_center=False,            # set input mean to 0 over the dataset
  samplewise_center=False,             # set each sample mean to 0
  featurewise_std_normalization=False, # divide inputs by std of the dataset
  samplewise_std_normalization=False,  # divide each input by its std
  zca_whitening=False,                 # apply ZCA whitening 
  rotation_range=0,                    # randomly rotate images in the range (in degrees, 0 to 180)
  horizontal_flip=False,               # randomly flip images
  vertical_flip=False)                 # randomly flip images

# Compute quantities required for feature-wise normalization
# (std, mean, and principal components if ZCA whitening is applied).
datagen.fit(X_train)

In [None]:
# Fit the model on the batches generated by datagen.flow().
model.fit(datagen.flow(X_train, y_train, batch_size=128),
                       epochs=4,
                       verbose=1,
                       validation_data=(X_valid, y_valid))

score = model.evaluate(X_test, y_test)
print('Test loss:    ', score[0])
print('Test accuracy:', score[1])