<a href="https://colab.research.google.com/github/tombackert/CS411-ml-for-ds/blob/main/MNIST_NN_tensorflow_datasets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Training a neural network on MNIST with Keras

This simple example demonstrates how to plug TensorFlow Datasets (TFDS) into a Keras model.


Copyright 2020 The TensorFlow Datasets Authors, Licensed under the Apache License, Version 2.0

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/datasets/keras_example"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/datasets/blob/master/docs/keras_example.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/datasets/blob/master/docs/keras_example.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
  <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/datasets/docs/keras_example.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

## Step 1: Create your input pipeline

Start by building an efficient input pipeline using advices from:
* The [Performance tips](https://www.tensorflow.org/datasets/performances) guide
* The [Better performance with the `tf.data` API](https://www.tensorflow.org/guide/data_performance#optimize_performance) guide


### Load a dataset

Load the MNIST dataset with the following arguments:

* `shuffle_files=True`: The MNIST data is only stored in a single file, but for larger datasets with multiple files on disk, it's good practice to shuffle them when training.
* `as_supervised=True`: Returns a tuple `(img, label)` instead of a dictionary `{'image': img, 'label': label}`.

In [None]:
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

### Build a training pipeline

Apply the following transformations:

* `tf.data.Dataset.map`: TFDS provide images of type `tf.uint8`, while the model expects `tf.float32`. Therefore, you need to normalize images.
* `tf.data.Dataset.cache` As you fit the dataset in memory, cache it before shuffling for a better performance.<br/>
__Note:__ Random transformations should be applied after caching.
* `tf.data.Dataset.shuffle`: For true randomness, set the shuffle buffer to the full dataset size.<br/>
__Note:__ For large datasets that can't fit in memory, use `buffer_size=1000` if your system allows it.
* `tf.data.Dataset.batch`: Batch elements of the dataset after shuffling to get unique batches at each epoch.
* `tf.data.Dataset.prefetch`: It is good practice to end the pipeline by prefetching [for performance](https://www.tensorflow.org/guide/data_performance#prefetching).

In [None]:
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)

### Build an evaluation pipeline

Your testing pipeline is similar to the training pipeline with small differences:

 * You don't need to call `tf.data.Dataset.shuffle`.
 * Caching is done after batching because batches can be the same between epochs.

In [None]:
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

## Step 2: Create and train the model

Plug the TFDS input pipeline into a simple Keras model, compile the model, and train it.

In [None]:
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),  # 784
  tf.keras.layers.Dense(128, activation='relu'),  # 128
  tf.keras.layers.Dense(64, activation='relu'),   # 64
  tf.keras.layers.Dense(10)                       # 10
])
model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)

history = model.fit(
    ds_train,
    epochs=20,
    validation_data=ds_test,
)

In [None]:
import matplotlib.pyplot as plt
def plot_loss(history):
  plt.plot(history.history['loss'], label='loss')
  plt.plot(history.history['val_loss'], label='val_loss')
  plt.ylim([0, 1])
  plt.xlabel('Epoch')
  plt.ylabel('Error')
  plt.legend()
  plt.grid(True)

plot_loss(history)

# Q2: Diffrent Model Testing

In [None]:
### Automate testing of diffrent models

def create_model(hidden_layers, neurons_per_layer):
    """
    Function for creating new model with num of hidden layers and nerons per layer as parameter
    """
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
    for neurons in neurons_per_layer:
        model.add(tf.keras.layers.Dense(neurons, activation='relu'))
    model.add(tf.keras.layers.Dense(10))
    return model

def compile_model(model):
    """
    Function for compiling model
    """
    model.compile(
        optimizer=tf.keras.optimizers.Adam(0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
    )

def plot_metrics(history, model_number):
    """
    Function for plotting metrics of model
    """

    """
    # Accuracy
    plt.figure(figsize=(8, 4))
    plt.plot(history.history['sparse_categorical_accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_sparse_categorical_accuracy'], label='Validation Accuracy')
    plt.title(f'Modell {model_number}: Genauigkeit')
    plt.xlabel('Epoche')
    plt.ylabel('Genauigkeit')
    plt.legend()
    plt.grid(True)
    plt.show()
    """

    # Loss
    plt.figure(figsize=(8, 4))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'Modell {model_number}: Verlust')
    plt.xlabel('Epoche')
    plt.ylabel('Verlust')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
# Creating all three models

model1 = create_model(hidden_layers=1, neurons_per_layer=[128])
model2 = create_model(hidden_layers=2, neurons_per_layer=[128, 64])
model3 = create_model(hidden_layers=3, neurons_per_layer=[256, 128, 64])

In [None]:
# Compiling all three models

compile_model(model1)
compile_model(model2)
compile_model(model3)

In [None]:
# Training all three models

# Model 1
start_time1 = time.time()
history1 = model1.fit(
    ds_train,
    epochs=20,
    validation_data=ds_test,
)
end_time1 = time.time()
training_time1 = end_time1 - start_time1
print(training_time1)

# Model 2
start_time2 = time.time()
history2 = model2.fit(
    ds_train,
    epochs=20,
    validation_data=ds_test,
)
end_time2 = time.time()
training_time2 = end_time2 - start_time2
print(training_time2)

# Model 3
start_time3 = time.time()
history3 = model3.fit(
    ds_train,
    epochs=20,
    validation_data=ds_test,
)
end_time3 = time.time()
training_time3 = end_time3 - start_time3
print(training_time3)

In [None]:
# Plotting loss
plot_metrics(history1, 1)
plot_metrics(history2, 2)
plot_metrics(history3, 3)

In [None]:
import time

def evaluate_model(model, ds_test, time):
    test_loss, test_accuracy = model.evaluate(ds_test, verbose=0)
    return test_loss, test_accuracy, time

In [None]:
loss1, acc1, time1 = evaluate_model(model1, ds_test, training_time1)
loss2, acc2, time2 = evaluate_model(model2, ds_test, training_time2)
loss3, acc3, time3 = evaluate_model(model3, ds_test, training_time3)

In [None]:
print(training_time1)

In [None]:
import pandas as pd

results = pd.DataFrame({
    'Modell': ['Modell 1', 'Modell 2', 'Modell 3'],
    'Accuracy': [acc1, acc2, acc3],
    'Loss': [loss1, loss2, loss3],
    'Training Time (s)': [time1, time2, time3]
})

print(results)

# Q3: Hyperparameter Testing

In [None]:
def train_model(learning_rate, batch_size, epochs):
    # Create a new instance of the model
    model = create_model(hidden_layers=2, neurons_per_layer=[128, 64])


    # Compile the model with the specified learning rate
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(
        optimizer=optimizer,
        loss='sparse_categorical_crossentropy',
        metrics=['sparse_categorical_accuracy'],
    )

    # Unbatch and re-batch the data to change the batch size
    ds_train_batched = ds_train.unbatch().batch(batch_size)
    ds_test_batched = ds_test.unbatch().batch(batch_size)

    # Record the training time
    start_time = time.time()
    history = model.fit(
        ds_train_batched,
        epochs=epochs,
        validation_data=ds_test_batched,
        verbose=1
    )
    end_time = time.time()
    training_time = end_time - start_time

    # Evaluate the model
    test_loss, test_accuracy = model.evaluate(ds_test_batched, verbose=0)

    return {
        'model': model,
        'history': history,
        'learning_rate': learning_rate,
        'batch_size': batch_size,
        'epochs': epochs,
        'training_time': training_time,
        'test_loss': test_loss,
        'test_accuracy': test_accuracy
    }

In [None]:
result_lr_0_1 = train_model(learning_rate=0.1, batch_size=128, epochs=10)

In [None]:
result_lr_0_01 = train_model(learning_rate=0.01, batch_size=128, epochs=10)

In [None]:
result_lr_0_001 = train_model(learning_rate=0.001, batch_size=128, epochs=10)

In [None]:
# Create a DataFrame to store the results
lr_results = pd.DataFrame({
    'Learning Rate': [result_lr_0_1['learning_rate'], result_lr_0_01['learning_rate'], result_lr_0_001['learning_rate']],
    'Test Accuracy': [result_lr_0_1['test_accuracy'], result_lr_0_01['test_accuracy'], result_lr_0_001['test_accuracy']],
    'Test Loss': [result_lr_0_1['test_loss'], result_lr_0_01['test_loss'], result_lr_0_001['test_loss']],
    'Training Time (s)': [result_lr_0_1['training_time'], result_lr_0_01['training_time'], result_lr_0_001['training_time']]
})

print(lr_results)

In [None]:
def plot_learning_curve(history, title):
    # Plot accuracy
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['sparse_categorical_accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_sparse_categorical_accuracy'], label='Validation Accuracy')
    plt.title(f'{title} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{title} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    plt.show()

# Plot for each learning rate
plot_learning_curve(result_lr_0_1['history'], 'Learning Rate = 0.1')
plot_learning_curve(result_lr_0_01['history'], 'Learning Rate = 0.01')
plot_learning_curve(result_lr_0_001['history'], 'Learning Rate = 0.001')

In [None]:
# Epochs = 10
result_epochs_10 = train_model(learning_rate=0.01, batch_size=128, epochs=10)

# Epochs = 20
result_epochs_20 = train_model(learning_rate=0.01, batch_size=128, epochs=20)

In [None]:
# Batch Size = 64
result_batch_64 = train_model(learning_rate=0.01, batch_size=64, epochs=10)

# Batch Size = 128
result_batch_128 = train_model(learning_rate=0.01, batch_size=128, epochs=10)

In [None]:
epoch_results = pd.DataFrame({
    'Epochs': [result_epochs_10['epochs'], result_epochs_20['epochs']],
    'Test Accuracy': [result_epochs_10['test_accuracy'], result_epochs_20['test_accuracy']],
    'Test Loss': [result_epochs_10['test_loss'], result_epochs_20['test_loss']],
    'Training Time (s)': [result_epochs_10['training_time'], result_epochs_20['training_time']]
})

print(epoch_results)

In [None]:
batch_results = pd.DataFrame({
    'Batch Size': [result_batch_64['batch_size'], result_batch_128['batch_size']],
    'Test Accuracy': [result_batch_64['test_accuracy'], result_batch_128['test_accuracy']],
    'Test Loss': [result_batch_64['test_loss'], result_batch_128['test_loss']],
    'Training Time (s)': [result_batch_64['training_time'], result_batch_128['training_time']]
})

print(batch_results)

In [None]:
plot_learning_curve(result_epochs_10['history'], 'Epochs = 10')
plot_learning_curve(result_epochs_20['history'], 'Epochs = 20')

In [None]:
plot_learning_curve(result_batch_64['history'], 'Batch Size = 64')
plot_learning_curve(result_batch_128['history'], 'Batch Size = 128')

# Q4: Building a NN for a different Dataset


In [None]:
from tensorflow.keras.datasets import cifar10

In [None]:
# Load CIFAR-10 dataset
(X_train, y_train), (X_test, y_test) = cifar10.load_data()

In [None]:
# Check the shapes
print("Training data shape:", X_train.shape)
print("Training labels shape:", y_train.shape)
print("Test data shape:", X_test.shape)
print("Test labels shape:", y_test.shape)

In [None]:
# Normalize pixel values
X_train = X_train.astype('float32') / 255.0
X_test = X_test.astype('float32') / 255.0

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout

def create_cifar_model():
    model = Sequential()
    model.add(Flatten(input_shape=(32, 32, 3)))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(10, activation='softmax'))
    return model

In [None]:
model = create_cifar_model()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['sparse_categorical_accuracy']
)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

In [None]:
# Record training time
start_time = time.time()

history = model.fit(
    X_train, y_train,
    epochs=30,
    batch_size=128,
    validation_data=(X_test, y_test),
    callbacks=[early_stopping],
    verbose=1
)

end_time = time.time()
training_time = end_time - start_time

In [None]:
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Training Time: {training_time:.2f} seconds")

In [None]:
def plot_learning_curves(history):
    # Plot accuracy
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['sparse_categorical_accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_sparse_categorical_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    plt.show()

plot_learning_curves(history)