# Notebook for testing the Keras-ImageDataGenerator

Test was done with Open Cities Dataset from https://www.kaggle.com/datasets/johnowhitaker/opencitiestilesmasked. Things to consider:
* IMAGE_SIZE of this dataset is 512x512.
* Labels have value 255 in mask images.

In [None]:
from google.colab import drive

drive.mount("/content/drive")

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.losses import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.metrics import *
import tensorflow.keras.backend as K
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.applications.efficientnet import EfficientNetB4
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping
import numpy as np
from sklearn.utils import shuffle

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import cv2
import os

In [None]:
device_name = tf.test.gpu_device_name()
if device_name != "/device:GPU:0":
    raise SystemError("GPU device not found")
print("Found GPU at: {}".format(device_name))

Images and labels has to be in subfolders (e. g. 'data') for the import with the flow_from_directory-Function. 

In [None]:
folder = "/content/drive/My Drive/Dokumente/omdena/hotosm/02_data/kaggle_opencities_masked_256/"
image_path = folder + "images-256/"
label_path = folder + "masks-256/"

Defining variables.

In [None]:
IMAGE_SIZE = 512
BATCH_SIZE = 16

Opening some example datasets for fitting the ImageDataGenerator (see below).

In [None]:
images_paths = [
    image_path + "data/" + file for file in os.listdir(image_path + "data/")
]
example_image = img_to_array(load_img(images_paths[0], color_mode="rgb")).reshape(
    -1, IMAGE_SIZE, IMAGE_SIZE, 3
)
print(f"Shape of example_image: {example_image.shape}")

In [None]:
label_paths = [label_path + "data/" + file for file in os.listdir(label_path + "data/")]
example_mask = img_to_array(load_img(label_paths[0], color_mode="grayscale")).reshape(
    -1, IMAGE_SIZE, IMAGE_SIZE, 1
)
print(f"Shape of example_mask: {example_mask.shape}")

### Defining ImageDataGenerator

Definition of ImageDataGenerators.

In [None]:
data_gen_args = dict(
    horizontal_flip=True,
    vertical_flip=True,
    rotation_range=90,
    rescale=1.0 / 255,
    validation_split=0.2,
)
train_image_datagen = ImageDataGenerator(**data_gen_args)
train_mask_datagen = ImageDataGenerator(**data_gen_args)
val_image_datagen = ImageDataGenerator(**data_gen_args)
val_mask_datagen = ImageDataGenerator(**data_gen_args)

Fiting the ImageDataGenerator with example images (see above).

In [None]:
seed = 101
train_image_datagen.fit(example_image, augment=True, seed=seed)
train_mask_datagen.fit(example_mask, augment=True, seed=seed)
val_image_datagen.fit(example_image, augment=True, seed=seed)
val_mask_datagen.fit(example_mask, augment=True, seed=seed)

Call flow_from_directory-Function for training data.

In [None]:
train_image_generator = train_image_datagen.flow_from_directory(
    image_path,
    class_mode=None,
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    subset="training",
    seed=seed,
)
train_mask_generator = train_mask_datagen.flow_from_directory(
    label_path,
    class_mode=None,
    color_mode="grayscale",
    batch_size=BATCH_SIZE,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    subset="training",
    seed=seed,
)

train_generator = zip(train_image_generator, train_mask_generator)

Call flow_from_directory-Function for validation data.

In [None]:
val_image_generator = val_image_datagen.flow_from_directory(
    image_path,
    class_mode=None,
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    subset="validation",
    seed=seed,
)
val_mask_generator = val_mask_datagen.flow_from_directory(
    label_path,
    class_mode=None,
    color_mode="grayscale",
    batch_size=BATCH_SIZE,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    subset="validation",
    seed=seed,
)

val_generator = zip(val_image_generator, val_mask_generator)

### Defining CNN architecture

Simplifed Sequential-Model for testing the ImageDataGenerator.

In [None]:
cnn = Sequential()
cnn.add(
    Conv2D(
        8,
        (3, 3),
        activation="relu",
        padding="same",
        input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3),
    )
)
cnn.add(Conv2D(8, (3, 3), activation="relu", padding="same"))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Conv2D(16, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(16, (3, 3), activation="relu", padding="same"))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))
cnn.add(MaxPooling2D(pool_size=(2, 2)))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))
cnn.add(MaxPooling2D(pool_size=(2, 2)))
cnn.add(Conv2D(128, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(128, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2DTranspose(64, (2, 2), strides=(2, 2), padding="same"))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(64, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2DTranspose(32, (2, 2), strides=(2, 2), padding="same"))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2DTranspose(32, (2, 2), strides=(2, 2), padding="same"))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(32, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2DTranspose(16, (2, 2), strides=(2, 2), padding="same"))
cnn.add(Conv2D(16, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(16, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2DTranspose(8, (2, 2), strides=(2, 2), padding="same"))
cnn.add(Conv2D(8, (3, 3), activation="relu", padding="same"))
cnn.add(Conv2D(8, (3, 3), activation="relu", padding="same"))

cnn.add(Conv2D(1, (1, 1), activation="sigmoid"))

In [None]:
cnn.summary()

### Compiling CNN Model

In [None]:
cnn.compile(
    optimizer=Adam(learning_rate=0.001),
    loss="binary_crossentropy",
    metrics=["accuracy"],
)

### Defining Callback functions

In [None]:
def scheduler(epoch, lr):
    if epoch < 5:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
lr_scheduler = LearningRateScheduler(scheduler)
early_stop = EarlyStopping(monitor="loss", patience=5)

### Training of CNN

In [None]:
training = cnn.fit(
    train_generator,
    batch_size=BATCH_SIZE,
    epochs=10,
    steps_per_epoch=32,
    validation_data=val_generator,
    validation_steps=32,
    workers=16,
    use_multiprocessing=True,
    callbacks=[early_stop, lr_scheduler],
    verbose=1,
)

### Plotting Training history

In [None]:
def plot_loss_history(training_history, logscale=False):
    loss = training_history["loss"]
    val_loss = training_history["val_loss"]
    epochs = range(1, len(loss) + 1)
    plt.plot(epochs, loss, color="red", label="Training loss")
    plt.plot(epochs, val_loss, color="green", label="Validation loss")
    plt.title("Training and validation loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    if logscale:
        plt.yscale("log")
    plt.show()


def plot_accuracy_history(training_history):
    acc = training_history["accuracy"]
    val_acc = training_history["val_accuracy"]
    epochs = range(1, len(acc) + 1)
    plt.plot(epochs, acc, color="red", label="Training acc")
    plt.plot(epochs, val_acc, color="green", label="Validation acc")
    plt.title("Training and validation accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

In [None]:
plot_accuracy_history(training.history)
plot_loss_history(training.history)
plot_loss_history(training.history, logscale=True)