In [1]:
GLOBAL_SEED = 744

import os
os.environ['PYTHONHASHSEED'] = str(GLOBAL_SEED)

import numpy as np
import tensorflow as tf
import random


np.random.seed(GLOBAL_SEED)
random.seed(GLOBAL_SEED)

from keras import backend as K
tf.random.set_seed(GLOBAL_SEED)

config = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=config)
K.set_session(sess)

import torch
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(GLOBAL_SEED)

#### After initializng seeds for reproducibility import the BigGAN based generator

In [2]:
!cp ../input/biggan-generator-v3/biggan_generator.py ./
!cp ../input/biggan-generator-v3/generate_images.py ./

import sys
sys.path.insert(0, './')

In [3]:
import numpy as np
from sklearn.metrics import recall_score, roc_auc_score
from sklearn.model_selection import train_test_split
from PIL import Image
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model, Sequential
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Input, Conv2D, Lambda, Dense, Flatten, MaxPooling2D, Activation, BatchNormalization
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from tensorflow.keras.optimizers import Adam
import warnings
import os
import shutil
from PIL import ImageFile
from PIL import Image
import pandas as pd
from pandas import DataFrame
from scipy.stats import entropy
from scipy.special import softmax
from heapq import nlargest, nsmallest
from operator import itemgetter
import generate_images

#### Create directory structures and define constants

In [4]:
DRY_RUN = True # True to quickly check everything is working properly

img_rows, img_cols = 128, 128 #original: 220, 360

src_dir = '../input/shaver-shell-full-all-classes-v2/shaver-shell-full'
train_path = './train/'
validation_path = './validation/'
test_path = './test/'

class_double_print = '19-01 dubbeldruk'
class_good = '19-01 goed'
class_interrupted = '19-01 onderbroken'
all_classes = [class_double_print, class_good, class_interrupted]

model_path = './models/custom_cnn.h5'

#### Define the custom CNN model used for defect classification

In [5]:
input_shape = (img_rows, img_cols, 3)

def conv_net(conv_blocks = 1, filter_size = (3,3), no_filters = 16, is_init = True, is_last = True):
    convnet = Sequential()
    if is_init:
        convnet.add(tf.keras.layers.experimental.preprocessing.Rescaling(1./255, input_shape=input_shape))
        convnet.add(tf.keras.layers.experimental.preprocessing.Normalization())
    for i in range(conv_blocks):
        convnet.add(Conv2D(no_filters,filter_size,padding='same'))
        convnet.add(BatchNormalization())
        convnet.add(Activation('relu'))
        convnet.add(tf.keras.layers.Dropout(0.4))
        convnet.add(MaxPooling2D())
    if is_last:
        convnet.add(Flatten())
        return convnet

def create_model():  
    inp = Input(input_shape)

    base = conv_net()(inp)
    detailed = conv_net(conv_blocks = 1, filter_size = (1,1), is_last = True)(inp)

    concat_layer = tf.concat([base, detailed], axis = 1)
    out = keras.layers.Dense(len(all_classes), activation="softmax", 
                         kernel_regularizer=tf.keras.regularizers.l1_l2(l1=0.0001, l2=0.0001))(concat_layer)

    model = Model([inp], out)
    optimizer = Adam(0.00001)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])

    return model

#### Calculation of examined metrics

In [6]:
def calc_metrics(y_true, y_pred, y_proba):
    y_good = (y_true == all_classes.index('19-01 goed')).astype(int)
    y_double_print = (y_true == all_classes.index('19-01 dubbeldruk')).astype(int)
    y_interrupted = (y_true == all_classes.index('19-01 onderbroken')).astype(int)
    res = {'binary_roc_auc': roc_auc_score(y_good, y_proba[:, all_classes.index('19-01 goed')]),
            'binary_recall': recall_score(y_good, (y_pred == all_classes.index('19-01 goed')).astype(int), pos_label=0),
            'multiclass_roc_auc': roc_auc_score(y_true, y_proba, multi_class='ovr', average='weighted'),
            'recall_double_print': recall_score(y_double_print, (y_pred == all_classes.index('19-01 dubbeldruk')).astype(int), pos_label=1),
            'recall_interrupted': recall_score(y_interrupted, (y_pred == all_classes.index('19-01 onderbroken')).astype(int), pos_label=1),
           }
    return res

#### Utils for copying train, validation and test files from the input dataset to the train and test files following the current split

In [7]:
def copy_split_images(X, Y, dest, val_path):
    X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size = 0.2, random_state=GLOBAL_SEED)
    copy_images(X_train, Y_train, dest)
    copy_images(X_val, Y_val, val_path)
    

def copy_images(X, Y, dest):
    for eachIndex in range(len(X)):
        label=''
        for i in range(len(all_classes)):
            if(Y[eachIndex]==i):
                label=all_classes[i]
        shutil.copy(os.path.join(src_dir, label, X[eachIndex]), 
                    os.path.join(dest, label, X[eachIndex]))

#### Code to generate synthetic images calling biggan_generator.py

In [8]:
def generate_aug_images(X, dest, n_aug):
    generate_images.generate_images(dataset_root = src_dir, 
                    weights_path = '../input/biggan-weights/138k/G_ema.pth', 
                    AUGMENTATION_TARGET = n_aug, 
                    savedir = dest, 
                    no_images_to_generate = len(X), 
                    GLOBAL_SEED = GLOBAL_SEED,
                    start_image = 0, 
                    filter_set = [x.split('/')[1] for x in X]
                   )

def get_aug_images(X, Y, Y_train, dest):
    n_aug = 2*len([y for y in Y_train if y==1]) # good images are have class index = 1
    print('N_AUG: ', n_aug)
    generate_aug_images(X, dest, n_aug)

#### Confidence calculation based on the distance to boundary (needs to be called inside a  tf.GradientTape context)

In [9]:
def get_all_layers(model):
    all_layers = []
    for layer in model.layers[1:]:
        if 'sequential' in layer.name:
            sub_layers = model.get_layer(layer.name).layers
            for sub_l in sub_layers:
                if ('conv' in sub_l.name) or ('dense' in sub_l.name):
                    all_layers.append(sub_l)
        elif ('conv' in layer.name) or ('dense' in layer.name):
            all_layers.append(layer)
    return all_layers


def distance_to_boundary(y_pred, y_true, model, tape):
    correct_class_prob = y_pred[y_true]
    good_class_prob = y_pred[1] #good class has index 1
    
    difference_prob = abs(correct_class_prob - good_class_prob)
    
    norm_fn = lambda x: tf.norm(x, ord=np.inf)
    
    model_layers = get_all_layers(model)
    
    dists_list = []
    
    for layer in model_layers:
        wt = layer.trainable_weights
        if len(wt) > 0:
            wt = wt[0]
        flatten_layer = Flatten()
                
        difference_prob_grad = flatten_layer(tape.gradient(difference_prob, wt)[tf.newaxis, :])
        
        norm_term = tf.map_fn(norm_fn, difference_prob_grad)
        
        difference_prob_gradnorm = norm_term / wt
        
        epsilon = 0.00001
        distance_to_boundary = difference_prob / (difference_prob_gradnorm + epsilon)
        dists_list.append(tf.reduce_mean(distance_to_boundary))
        
        final_dist = tf.reduce_mean(dists_list)
        
    return abs(final_dist.numpy())

#### pick_aug_images utilizes the above distance to boundary calculation to pick the top_k images with the smallest distance (lowest confidence) and their labels

In [10]:
def pick_aug_images(model, train_generator, defect_class_recalls, top_k = 10):
    true_classes = train_generator.classes
    true_classes = tf.Variable(true_classes)
    filenames = train_generator.filenames
    batch_size = train_generator.batch_size
    
    n_batches = len(filenames)//batch_size

    confidence_scores_double_print = []
    confidence_scores_interrupted = []
    
    for i in range(n_batches):
        img_batch, _ = train_generator.next()
        img_batch = tf.Variable(img_batch)
        
        recall_double = defect_class_recalls['recall_double_print']
        recall_interrupted = defect_class_recalls['recall_interrupted']
    
        with tf.GradientTape(persistent=True) as tape:
            train_predictions = model(img_batch)
            for j in range(batch_size):
                current_idx = i*batch_size + j
                pred = train_predictions[j]
                conf = distance_to_boundary(pred, true_classes.value()[current_idx], model, tape)
                
                #perform augmentation only for minority classes with indexes 0 and 2
                if true_classes.value()[current_idx] == 0: 
                    confidence_scores_double_print.append((filenames[current_idx], true_classes.value()[current_idx], pred, conf))
                elif true_classes.value()[current_idx] == 2:
                    confidence_scores_interrupted.append((filenames[current_idx], true_classes.value()[current_idx], pred, conf))
        del tape
        
    all_confidence_scores = confidence_scores_double_print + confidence_scores_interrupted
    top_items = nsmallest(top_k, all_confidence_scores, key=itemgetter(3))
        
    top_paths = [t[0] for t in top_items]
    top_labels = [t[1] for t in top_items]
    return top_paths, top_labels

#### Load image paths in X and corresponding class labels to Y

In [11]:
X=[]
Y=[]

for i in range(len(all_classes)):
    source_files=os.listdir(os.path.join(src_dir, all_classes[i]))
    for f in source_files:
        X.append(f)
        Y.append(i)

X=np.asarray(X)
Y=np.asarray(Y)

#### The 'main' part of the code where the classifier is pre-trained for EPOCHS_PRE_TRAIN epochs and then trained for an additional EPOCHS_TRAIN epochs with confidence-based augmentation 

In [None]:
if DRY_RUN:
    EPOCHS_PRE_TRAIN = 2
    EPOCHS_TRAIN = 2
    TOP_K = 3
else:
    EPOCHS_PRE_TRAIN = 20
    EPOCHS_TRAIN = 30
    TOP_K = 15


# ===============Stratified K-Fold======================

batch_size = 4
val_split = 0.2
test_split = 0.2

all_results = []

foldNum=0

#Remove old split
if os.path.exists(train_path):
    shutil.rmtree(train_path)
if os.path.exists(validation_path):
    shutil.rmtree(validation_path)
if os.path.exists(test_path):
    shutil.rmtree(test_path)
    
#Recreate paths
for label in all_classes:
    os.makedirs( train_path+label, exist_ok = True)
    os.makedirs( validation_path+label, exist_ok = True)
    os.makedirs( test_path+label, exist_ok = True)
    
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_split, random_state=GLOBAL_SEED)
    
# Copy train images of this keep and fold from full_data to the train folder
copy_split_images(X_train, Y_train, train_path, validation_path)
    
# Copy validation images of this fold from full_data folder to the validation folder
copy_images(X_test, Y_test, test_path)
        
# Create data loaders
train_datagen = ImageDataGenerator(validation_split=val_split)
validation_datagen = ImageDataGenerator(validation_split=val_split)
test_datagen = ImageDataGenerator()
        
    
train_generator = train_datagen.flow_from_directory(
    train_path,
    target_size=(img_rows, img_cols),
    batch_size=batch_size,
    class_mode='categorical',
    seed=GLOBAL_SEED
)
    
validation_generator = validation_datagen.flow_from_directory(
    validation_path,
    target_size=(img_rows, img_cols),
    batch_size=batch_size,
    class_mode='categorical',
    seed=GLOBAL_SEED
)   
    
test_generator = test_datagen.flow_from_directory(
    test_path,
    target_size=(img_rows, img_cols),
    batch_size=batch_size,
    class_mode=None,
    seed=GLOBAL_SEED,
    shuffle=False
)

# Pre-train model without augmentation

model=create_model()
model.fit(train_generator, epochs=EPOCHS_PRE_TRAIN, validation_data=validation_generator, 
        shuffle=True, verbose = 0)
predictions_pretrain = model.predict(test_generator, verbose=0)
model.save(model_path)
r_prelim = calc_metrics(test_generator.classes, np.argmax(predictions_pretrain, axis=1), predictions_pretrain)


# Pick images close to classification border of pretrained model
border_images, border_image_labels = pick_aug_images(model, train_generator, r_prelim, top_k = TOP_K)
tf.keras.backend.clear_session()
get_aug_images(border_images, border_image_labels, Y_train, train_path)
    
# New generators that include augmentation data
aug_train_datagen = ImageDataGenerator(validation_split=val_split)
aug_val_datagen = ImageDataGenerator(validation_split=val_split)
    
aug_train_generator = aug_train_datagen.flow_from_directory(
    train_path,
    target_size=(img_rows, img_cols),
    batch_size=batch_size,
    class_mode='categorical',
    seed=GLOBAL_SEED
)
    
aug_val_generator = aug_val_datagen.flow_from_directory(
    validation_path,
    target_size=(img_rows, img_cols),
    batch_size=batch_size,
    class_mode='categorical',
    seed=GLOBAL_SEED
)
        
# Train model on augmented dataset
aug_model = tf.keras.models.load_model(model_path)

mc = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss', mode='min', save_best_only=True, verbose = 0)

aug_model.fit(aug_train_generator, epochs=EPOCHS_TRAIN, validation_data=aug_val_generator, 
              shuffle=True, callbacks=[mc], verbose = 0)
aug_model = tf.keras.models.load_model(model_path)
predictions = aug_model.predict(test_generator, verbose=0)
y_pred = np.argmax(predictions, axis=1)
true_classes = test_generator.classes
res = calc_metrics(true_classes, y_pred, predictions)
print('Result: ' ,res)


print('')