In [1]:
import glob
import os
import cv2
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from matplotlib.ticker import MaxNLocator
import albumentations as A

In [2]:
gpu = len(tf.config.list_physical_devices('GPU'))>0
print("GPU is", "available" if gpu else "NOT AVAILABLE")

GPU is available


In [3]:
from tensorflow.keras.layers import (GlobalAveragePooling2D, Dense, Conv2D,
    LeakyReLU, Activation, UpSampling2D, BatchNormalization, AveragePooling2D, concatenate)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [4]:
# Helpers to deal with files!

def get_file_name_with_ext(path: str):
    file_name_with_ext = os.path.basename(path)
    return file_name_with_ext


def get_file_name(path: str):
    file_name_with_ext = os.path.basename(path)
    file_name, file_extension = os.path.splitext(file_name_with_ext)
    return file_name


def make_directory(output_dir: str):
    if not os.path.exists(output_dir):
        print('Making output directory: ' + output_dir)
        os.makedirs(output_dir)
        
# get all image paths from directory
def gather_image_from_dir(input_dir):
    image_extensions = ['*.bmp', '*.jpg', '*.png']
    image_list = []
    for image_extension in image_extensions:
        image_list.extend(glob.glob(input_dir + image_extension))
    image_list.sort()
    return image_list

In [5]:
# Training history plotter

def draw_training_history(history, number_of_epochs, saver, xbins=10):
    # Plot
    # training
    epochs_ints = range(0, number_of_epoch)
    epochs = [str(x) for x in epochs_ints]
    
    fig = plt.figure(figsize=(6.6, 4.8))
    plt.plot(epochs, history.history['binary_accuracy'], 'b--')
    plt.plot(epochs, history.history['val_binary_accuracy'], 'b')
    plt.plot(epochs, history.history['precision'], 'g--')
    plt.plot(epochs, history.history['val_precision'], 'g')
    plt.plot(epochs, history.history['recall'], '--', color='orange')
    plt.plot(epochs, history.history['val_recall'], color='orange')
    plt.axvline(x=saver.best_epoch, color='r')
    ax = plt.gca()
    ax.set_xticks(ax.get_xticks()[::2])
    plt.title('Model accuracy, recall and precision', fontsize=17)
    plt.ylabel('accuracy / recall / precision')
    plt.xlabel('epoch')
    plt.legend(['train_acc', 'val_acc', 'train_pre', 'val_pre', 'train_rec', 'val_rec', 'best_w'], loc='lower right')
    plt.grid(True)
    plt.ylim(0, 1.05)
    plt.show()
    
    # print values
    print('binary_accuracy')
    print(history.history['binary_accuracy'])
    print('val_binary_accuracy')
    print(history.history['val_binary_accuracy'])
    print('recall')
    print(history.history['recall'])
    print('val_recall')
    print(history.history['val_recall'])
    print('precision')
    print(history.history['precision'])
    print('val_precision')
    print(history.history['val_precision'])

    # testing
    fig = plt.figure(figsize=(6.6, 4.8))
    plt.plot(epochs, history.history['loss'], 'b--')
    plt.plot(epochs, history.history['val_loss'], 'g')
    plt.axvline(x=saver.best_epoch, color='r')
    ax = plt.gca()
    ax.set_xticks(ax.get_xticks()[::2])
    plt.title('Model loss', fontsize=17)
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['loss', 'val_loss', 'best_w'], loc='upper right')
    plt.grid(True)
    plt.ylim(0, 1.05)
    plt.show()
    
    # print values
    print('loss')
    print(history.history['loss'])
    print('val_loss')
    print(history.history['val_loss'])
    
    print(50 * '-')
    print(f'Best Accuracy = {saver.best_acc_score}')
    print(f'Best Recall = {saver.best_rec}')
    print(f'Best Precision = {saver.best_precision}')

In [6]:
# Function for figure rendering plot

def add_subplot(fig, rows, cols, pos, name, image, colorspace, min, max):
    image_plot = fig.add_subplot(rows, cols, pos)
    image_plot.title.set_text(name)
    image_plot.title.set_fontsize(17)

    if colorspace:
        im = plt.imshow(image, cmap=colorspace, vmin=min, vmax=max)
        divider = make_axes_locatable(image_plot)
        cax = divider.append_axes("right", size="5%", pad=0.05)
        cb = plt.colorbar(im, cax=cax)
    else:
        im = plt.imshow(image)
        
def make_rgb(name, image, save_path):
    fig = plt.figure(figsize=(6.6, 4.8))
    norm_image = image / 255.
    vmin = 0.0
    vmax = 1.0
    add_subplot(fig, 1, 1, 1, name, norm_image, None, vmin, vmax)
    plt.tight_layout()
    plt.savefig(save_path, bbox_inches='tight')
    plt.close(fig)
    
def make_single_graph(name, image, save_path):
    fig = plt.figure(figsize=(6.6, 4.8))
    norm_image = image / 255.
    colormap = 'jet'
    vmin = 0.0
    vmax = 1.0
    add_subplot(fig, 1, 1, 1, name, norm_image, colormap, vmin, vmax)
    plt.tight_layout()
    plt.savefig(save_path, bbox_inches='tight')
    plt.close(fig)
    
def draw_fig(image, heatmap, name, output_path):
    fig, ax = plt.subplots(figsize=(6.6, 4.8))
    ax.imshow(image)
    ax.imshow(heatmap, cmap='jet', alpha=0.32)
    ax.set_title(name, fontsize=17)
    plt.tight_layout()
    plt.savefig(output_path, bbox_inches='tight')
    plt.close(fig)

In [7]:
# Learning rate scheduler
# this code shared in https://stackabuse.com/learning-rate-warmup-with-cosine-decay-in-keras-and-tensorflow/
def lr_warmup_cosine_decay(global_step,
                           warmup_steps,
                           hold = 0,
                           total_steps=0,
                           start_lr=0.0,
                           target_lr=1e-3):
    # Cosine decay
    learning_rate = 0.5 * target_lr * (1 + np.cos(np.pi * (global_step - warmup_steps - hold) / float(total_steps - warmup_steps - hold)))

    # Target LR * progress of warmup (=1 at the final warmup step)
    warmup_lr = target_lr * (global_step / warmup_steps)

    # Choose between `warmup_lr`, `target_lr` and `learning_rate` based on whether `global_step < warmup_steps` and we're still holding.
    # i.e. warm up if we're still warming up and use cosine decayed lr otherwise
    if hold > 0:
        learning_rate = np.where(global_step > warmup_steps + hold,
                                 learning_rate, target_lr)
    
    learning_rate = np.where(global_step < warmup_steps, warmup_lr, learning_rate)
    return learning_rate

class WarmupCosineDecay(tf.keras.callbacks.Callback):
    def __init__(self, total_steps=0, warmup_steps=0, start_lr=0.0, target_lr=1e-3, hold=0):

        super(WarmupCosineDecay, self).__init__()
        self.start_lr = start_lr
        self.hold = hold
        self.total_steps = total_steps
        self.global_step = 0
        self.target_lr = target_lr
        self.warmup_steps = warmup_steps
        self.lrs = []

    def on_batch_end(self, batch, logs=None):
        self.global_step = self.global_step + 1
        lr = model.optimizer.lr.numpy()
        self.lrs.append(lr)

    def on_batch_begin(self, batch, logs=None):
        lr = lr_warmup_cosine_decay(global_step=self.global_step,
                                    total_steps=self.total_steps,
                                    warmup_steps=self.warmup_steps,
                                    start_lr=self.start_lr,
                                    target_lr=self.target_lr,
                                    hold=self.hold)
        tf.keras.backend.set_value(self.model.optimizer.lr, lr)

In [8]:
# Custom weights saver (according to the best accuracy)

class CustomSaver(tf.keras.callbacks.Callback):
    def __init__(self, output_dir='', weights_name='best'):
        self.best_acc_score = 0.0
        self.best_rec = 0.0
        self.best_precision = 0.0
        self.best_epoch = 0
        self.__output_dir = output_dir
        # make output directory
        if not os.path.exists(self.__output_dir):
            os.makedirs(self.__output_dir)
        self.__weights_name = weights_name

    def on_epoch_end(self, epoch, logs=None):
        # also save if validation error is smallest
        if 'val_binary_accuracy' in logs.keys():
            val_acc = logs['val_binary_accuracy']
            if val_acc > self.best_acc_score:
                self.best_acc_score = val_acc
                self.best_rec = logs['val_recall']
                self.best_precision = logs['val_precision']
                self.best_epoch = epoch
                output_path = self.__output_dir + '/' + self.__weights_name + '.hdf5'
                print(f'New best weights found! Saving to {output_path}')
                self.model.save(output_path)
        else:
            print('Key val_binary_accuracy does not exist!')

In [9]:
# The following function makes the models mentioned in the article
# 'last_stage_out' makes shortened version of the model with last convolutional stage out.
# By shortening the model, we are getting bigger (wider) output with higher resolution for CAM!

def effiecientNetV2B0Classifier(input_size=(128, 128, 3), last_stage_out=False, weights_path=None):
    inputs = tf.keras.Input(shape=input_size)
    x = tf.keras.applications.efficientnet_v2.EfficientNetV2B0(include_top=False,
                                                    weights='imagenet',
                                                    input_tensor=inputs)
    if last_stage_out:
        x = x.get_layer('block6a_expand_activation').output
    else:
        x = x.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1, activation='sigmoid')(x)
    model = tf.keras.Model(inputs, x)
    model.compile(loss=tf.keras.losses.BinaryFocalCrossentropy(),
                  optimizer=Adam(learning_rate=1e-3),
                  metrics=[tf.keras.metrics.BinaryAccuracy(threshold=0.5),
                           tf.keras.metrics.Precision(thresholds=0.5),
                           tf.keras.metrics.Recall(thresholds=0.5)])
    if weights_path:
        model.load_weights(weights_path)
    return model

def convnextTinyClassifier(input_size=(128, 128, 3), last_stage_out=False, weights_path=None):
    inputs = tf.keras.Input(shape=input_size)
    x = tf.keras.applications.convnext.ConvNeXtTiny(include_top=False,
                                                    weights='imagenet',
                                                    input_tensor=inputs)
    
    if last_stage_out:
        x = x.get_layer('convnext_tiny_stage_2_block_8_identity').output
    else:
        x = x.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1, activation='sigmoid')(x)
    model = tf.keras.Model(inputs, x)
    model.compile(loss=tf.keras.losses.BinaryFocalCrossentropy(),
                  optimizer=Adam(learning_rate=1e-3),
                  metrics=[tf.keras.metrics.BinaryAccuracy(threshold=0.5),
                           tf.keras.metrics.Precision(thresholds=0.5),
                           tf.keras.metrics.Recall(thresholds=0.5)])
    if weights_path:
        model.load_weights(weights_path)
    return model

def mobileNetV3Classifier(input_size=(128, 128, 3), last_stage_out=False, weights_path=None):
    inputs = tf.keras.Input(shape=input_size)
    x = tf.keras.applications.MobileNetV3Large(include_top=False,
                                                weights='imagenet',
                                                input_tensor=inputs)
    if last_stage_out:
        x = x.layers[193].output
    else:
        x = x.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1, activation='sigmoid')(x)
    model = tf.keras.Model(inputs, x)
    model.compile(loss=tf.keras.losses.BinaryFocalCrossentropy(),
                  optimizer=Adam(learning_rate=1e-3),
                  metrics=[tf.keras.metrics.BinaryAccuracy(threshold=0.5),
                           tf.keras.metrics.Precision(thresholds=0.5),
                           tf.keras.metrics.Recall(thresholds=0.5)])
    if weights_path:
        model.load_weights(weights_path)
    return model

In [10]:
# Training sample

# Input image information
input_width = 752
input_height = 480
input_channels = 3
# batch size. How many samples you want to feed in one iteration?
batch_size = 2
# number_of_epoch. How many epochs you want to train?
number_of_epoch = 100
# learning rate
learning_rate=0.001
# training percentage until full learning rate [0.0;1.0]
warmup_percentage=0.15

# weights output directory
output_dir = 'Oliena_weights/efficientNet_4down/'
# output weights name
output_weights_name = 'efficientNet_4down'

# data directories [in this sample we will just show how to use the code on few sample of the data]
training_data_dir = 'data samples/'
testing_data_dir = 'data samples/'

In [11]:
# DATA AUGMENTATION
# Augmentation pipeline
def transform(image):
    aug = A.Compose([
        A.VerticalFlip(p=0.5),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.2),
        A.RandomGamma(p=0.2),
        A.ShiftScaleRotate(p=0.2,
                           shift_limit=[-0.1, 0.1],
                           scale_limit=[-0.05,0.05],
                           rotate_limit=[-5,5])
    ])
    return aug(image=image)['image']

In [12]:
# data generators
train_image_generator = ImageDataGenerator(preprocessing_function=transform)

train_generator = train_image_generator.flow_from_directory(
    training_data_dir,
    target_size=(input_height, input_width),
    batch_size=batch_size,
    shuffle=True,
    color_mode='rgb',
    class_mode='binary')

test_image_generator = ImageDataGenerator()

test_generator = test_image_generator.flow_from_directory(
    testing_data_dir, 
    target_size=(input_height, input_width),
    batch_size=batch_size,
    color_mode='rgb',
    class_mode='binary') # set as validation data

Found 8 images belonging to 2 classes.
Found 8 images belonging to 2 classes.


In [13]:
# MODEL, chose from the provideded models
model = effiecientNetV2B0Classifier(input_size=(input_height, input_width, input_channels), last_stage_out=True)
model.summary()

Metal device set to: Apple M1 Pro




ValueError: Unknown optimizer: 'adamw'. Please ensure you are using a `keras.utils.custom_object_scope` and that this object is included in the scope. See https://www.tensorflow.org/guide/keras/save_and_serialize#registering_the_custom_object for details.

In [None]:
# warmup and cosine schedule
total_steps = train_generator.samples/batch_size*number_of_epoch
# defined number of the steps
warmup_steps = int(warmup_percentage*total_steps)

print(f'Warmup steps: {warmup_steps}')

schedule = WarmupCosineDecay(start_lr=0.0,
                             target_lr=learning_rate,
                             warmup_steps=warmup_steps,
                             total_steps=total_steps,
                             hold=warmup_steps)

# custom saver serves as function to save the best performing weights
saver = CustomSaver(output_dir=output_dir, weights_name=output_weights_name)

In [None]:
history = model.fit(
    train_generator,
    steps_per_epoch = train_generator.samples // batch_size,
    validation_data = test_generator, 
    validation_steps = test_generator.samples // batch_size,
    epochs = number_of_epoch,
    callbacks=[schedule, saver],
    shuffle=True)

In [None]:
# load best weights
model.load_weights(output_dir + output_weights_name + '.hdf5')

In [None]:
x = model.layers[-3].output
print(f'Output shape before average pooling: {x.shape}')
weights = tf.expand_dims(model.layers[-1].get_weights()[0][:,0], 0)
x = tf.keras.layers.dot([x, weights], axes=(3, 1))
print(f'Output shape of expanded weights: {weights.shape}')
print(f'Output after multiplication: {x.shape}')
# make multihead output
multi_head_model = tf.keras.Model(inputs=model.input, 
        outputs=(x, model.layers[-1].output))

In [None]:
# Lets plot some DEFECT CAMS!

import matplotlib
matplotlib.use('agg') # let matplotlib save more (>370) images

image_output = output_dir + 'images/'
make_directory(image_output)

cam_output = output_dir + 'cam/'
make_directory(cam_output)

# take the defect directory in the test set
image_paths = gather_image_from_dir(testing_data_dir + '1/')
print(f'Found {len(image_paths)} images')
# lets predict
for i, image_path in enumerate(image_paths):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR) # no need to switch channels, it is actually greyscale
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (model.input.shape[2], model.input.shape[1])) # no need if cropped
    image_name = get_file_name(image_path)
    
    # prepocess image
    # preprocess
    image_norm = image
    image_norm = np.reshape(image_norm, image_norm.shape + (1,))
    image_norm = np.reshape(image_norm, (1,) + image_norm.shape)
    
    # Get the output of last convolutional layer and prediction
    last_conv_output, prediction = multi_head_model.predict(image_norm)
    last_conv_output = last_conv_output[0,:,:]# we have only one image, so take the first one
    last_conv_output_scaled = cv2.resize(last_conv_output, (model.input.shape[2], model.input.shape[1]))
    
    draw_fig(image, last_conv_output_scaled, 'image+defectCAM', cam_output + image_name + '_defectCAM_superpos.png')
    
    print(f'{image_name} {last_conv_output_scaled.shape} {prediction}')
    make_rgb('image', image, image_output + image_name + '_rgb.png')
print(f'Finish! Check {image_output} and {cam_output}')