In [None]:
## dependencies

import keras
import tensorflow as tf
from matplotlib import pyplot as plt
import numpy as np
from keras.layers import  Input, Dense, Flatten, Dropout, Conv2D, Activation, Add, BatchNormalization, AveragePooling2D, Concatenate
from keras.models import Model
from tensorflow.keras.applications.resnet50 import ResNet50
from keras.applications.vgg16 import VGG16
from keras.applications.xception import Xception
from keras.applications.inception_v3 import InceptionV3
from keras.applications.densenet import DenseNet121
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential
from keras import activations
from keras.losses import BinaryCrossentropy
import keras.backend as K
import numpy as np
from glob import glob
from keras.models import load_model
import pandas as pd
pd.options.mode.chained_assignment = None
from keras.preprocessing.image import img_to_array, array_to_img
from keras.models import Model, load_model
from keras.losses import binary_crossentropy, mean_squared_error
from keras.initializers import glorot_uniform
import cv2
from imutils import paths
import os
from sklearn.utils import shuffle
import sklearn.metrics as metrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder , StandardScaler , PolynomialFeatures , MinMaxScaler
import seaborn as sns
from matplotlib.patches import Rectangle
from sklearn.metrics import f1_score
import random

import tensorflow_addons as tfa


In [None]:
## Initializations

img_size, num_channels = 64,1
equalize_classes = 0 # flag for oversampling minority class if needed
aug_data_flag = 1    # flag for using augmentations
clf_model = 'vgg'    # which backbone to use for the classifier inception, resnet, vgg, densenet, encoder, simple_cnn,  ccf_model , load_custom_model
last_layers_are_needed = 1 # adding MLP after convolutional layers to customize the classifier for binary output
retrain_model = 1 # 0-imagenet weights , 1- initialize from imagenet, None for random weights not "imagenet 
seed_val = 3 
margin=15 # crop the edges of the images
data_path = '/home/odedrot' # user input

In [None]:
## functions used for displaying outpus

def plot_imgs_withLabels_3D(imgs, labels, n_examples, fig_size):
    n = int(np.sqrt(n_examples))
    k=0
    fig, ax = plt.subplots(n, n, figsize=(fig_size,fig_size))
    for i in range(n):
        for j in range(n):
            ax[i, j] .imshow(imgs[k].reshape(img_size, img_size, 3), cmap='gray')
            ax[i, j].axis('off')
            ax[i, j].set_title(labels[k])
            k = k + 1
    fig.tight_layout()
    plt.show()

def plot_imgs_withLabelsandpred_3D(imgs, labels, n_examples, fig_size):
    n = int(np.sqrt(n_examples))
    k=0
    fig, ax = plt.subplots(n, n, figsize=(fig_size,fig_size))
    for i in range(n):
        for j in range(n):
            ax[i, j] .imshow(imgs[k].reshape(img_size, img_size, 3), cmap='gray')
            ax[i, j].axis('off')
            ax[i, j].set_title(labels[k])
            k = k + 1
    fig.tight_layout()
    plt.show()    

def plot_imgs_withLabels_1D(imgs, labels, n_examples, fig_size):
    n = int(np.sqrt(n_examples))
    k=0
    fig, ax = plt.subplots(n, n, figsize=(fig_size,fig_size))
    for i in range(n):
        for j in range(n):
            ax[i, j] .imshow(imgs[k].reshape(img_size, img_size, 1), cmap='gray')
            ax[i, j].axis('off')
            ax[i, j].set_title(labels[k], fontsize=30)
            k = k + 1
    fig.tight_layout()
    plt.show()
    
# plot_imgs_withLabels(images, images_id, 144, 15)

def sample_images(imgs, r,c):
    fig, axs = plt.subplots(r, c, figsize=(20,2*r))
    cnt = 0
    for i in range(r):
        for j in range(c):
            axs[i,j].imshow(imgs[cnt, :,:,0], cmap='gray')
            axs[i,j].axis('off')
            cnt += 1
    plt.show()
##############################################################################################################################################################

In [None]:
## load images 

def load_images(directory):
    imagePaths = list(paths.list_images(directory))
    data = []
    images_name = []
    labels = []
    for imagePath in imagePaths:
        # name
        img_name = imagePath.split("/")[-1]
        images_name.append(img_name)
        # labels
        labels.append(0)
        # image
        image = cv2.imread(imagePath) # load the image
        image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) # swap color channels 
        image = image[margin:img_size-margin , margin:img_size-margin] 
        image = cv2.resize(image, (img_size,img_size) , interpolation = cv2.INTER_AREA)
        data.append(image)
        
    data_arr = np.array(data)
    labels_arr = np.array(labels)
    print(data_arr.shape, np.min(data_arr), np.max(data_arr), data_arr.dtype)
    return data_arr, images_name, labels_arr

In [None]:
## load train/test datasets

print('female train')
images_1, names_1, labels_1                = load_images(data_path + '/DISCOVERY-master/GENDER/IMAGES/TRAIN_1') 
print('male train')
images_0, names_0, labels_0                = load_images(data_path + '/DISCOVERY-master/GENDER/IMAGES/TRAIN_0') 

print('female test')
images_1_test, names_1_test, labels_1_test = load_images(data_path + '/DISCOVERY-master/GENDER/IMAGES/TEST_1') 
print('male test')
images_0_test, names_0_test, labels_0_test  = load_images(data_path + '/DISCOVERY-master/GENDER/IMAGES/TEST_0') 

print('Done loading images')
###### 

In [None]:
## verify no leakage 
print( set(names_1) == set(names_1_test) )
print( set(names_0) == set(names_0_test) )

In [None]:
## Equalize data from both classes for training and merge both classes to one training dataset

train_images_0 = images_0 
train_images_1 = images_1

### upsample class 0
# n_upsample_0 = 400
# rand_ind=np.random.randint(0,len(train_images_0),n_upsample_0)
# train_images_0 =  np.concatenate(( train_images_0, train_images_0[rand_ind]))
### upsample class 1
# n_upsample_1 = 400
# rand_ind=np.random.randint(0,len(train_images_1),n_upsample_1)
# train_images_1 =  np.concatenate(( train_images_1, train_images_1[rand_ind]))


if equalize_classes == 1:  
    def euql_cls(imgs_maj, imgs_min):
        delta = len(imgs_maj) - len(imgs_min)
        np.random.seed(seed_val)
        rand_ind=np.random.randint(0,len(imgs_min),delta)
        imgs_min =  np.concatenate(( imgs_min, imgs_min[rand_ind]))
        return imgs_maj, imgs_min
    
    if len(train_images_0) > len(train_images_1):
        train_images_0, train_images_1 = euql_cls(train_images_0, train_images_1)
             
    if len(train_images_1) > len(train_images_0):
        train_images_1, train_images_0 = euql_cls(train_images_1, train_images_0)

p_label = 1 # 1
n_label = 0 # 0

train_images = np.concatenate(( train_images_0 , train_images_1))
train_labels = np.concatenate(( n_label*np.ones(len(train_images_0)) , p_label*np.ones(len(train_images_1)) ))

## test 
test_images_0 = images_0_test 
test_images_1 = images_1_test 
test_images = np.concatenate(( test_images_0, test_images_1))
test_labels = np.concatenate(( n_label*np.ones(len(test_images_0)) , p_label*np.ones(len(test_images_1)) ))

test_names_0 = names_0_test 
test_names_1 = names_1_test   
test_names = np.concatenate(( test_names_0,     test_names_1))

# shuffle
(X_train, y_train) = shuffle(train_images, train_labels)
(X_test, y_test, X_test_names) = shuffle(test_images, test_labels, test_names)

print('')
print('train_images_0', train_images_0.shape)
print('train_images_1', train_images_1.shape)
print('')
print('X_train', X_train.shape)
print('y_train', y_train.shape)
print('X_test', X_test.shape)
print('y_test', y_test.shape)

In [None]:
# AUGMENTATIONS

@tf.function
def brightness(x):
    x = tf.image.random_brightness(x, 0.15)
    x = tf.clip_by_value(x, 0, 1)
    return x
@tf.function
def contrast(x):
    x = tf.image.random_contrast(x, 0.85, 1.15)
    x = tf.clip_by_value(x, 0, 1)
    return x
@tf.function
def rotate(x):
    orient = np.random.choice([1,2,3], 1)[0] # 90,180,270
    x = tf.image.rot90(x, k=orient)
    # x = tf.contrib.image.rotate(y, angles=2.356194490192345, interpolation='NEAREST')
    return x
@tf.function
def flip_ver(x):
    x = tf.image.random_flip_up_down(x)
    return x
@tf.function
def flip_hor(x):
    x = tf.image.random_flip_left_right(x)
    return x
@tf.function
def add_noise(x):
    x_noise = tf.random.normal(shape=(img_size, img_size, 1), mean=0.0, stddev=0.03, dtype=tf.float32)
    x = tf.add(x, x_noise) 
    x = tf.clip_by_value(x, 0, 1)
    return x
@tf.function
def resize_crop(x):
    rand_size = tf.random.uniform(shape=[],  minval=int(0.75 * img_size),  maxval=1 * img_size, dtype=tf.int32,)
    crop = tf.image.random_crop(x, (rand_size, rand_size, x.shape[-1]))
    x = tf.image.resize(crop, (img_size, img_size))
    return x
@tf.function
def blur(x):
    # s = np.random.random()
    # x = tfa.image.gaussian_filter2d(image=x, sigma=s)     
    x = tfa.image.gaussian_filter2d(x, filter_shape=(3,3), sigma=0.3)
    return x

def augment_img(img):
    img_aug = np.copy(img)
    ### flip hor
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = flip_hor(img_aug)
    ### flip ver
    # rand_val = np.random.uniform(0,1)
    # if rand_val > 0.5:
    #     img_aug = flip_ver(img_aug)
    ### rotate
    # rand_val = np.random.uniform(0,1)
    # if rand_val > 0.5:
    #     img_aug = rotate(img_aug)
    ### resize crop
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = resize_crop(img_aug)
        
    ### blur
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = blur(img_aug)
    ### brightness
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = brightness(img_aug)
    ### contrast
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = contrast(img_aug)
    ### noise
    rand_val = np.random.uniform(0,1)
    if rand_val > 0.5:
        img_aug = add_noise(img_aug)
    return img_aug


def augment_imgs(imgs):
    imgs_aug = []
    for i in range(len(imgs)):
        img_aug = augment_img(imgs[i])
        imgs_aug.append( img_aug )
    imgs_aug = np.array(imgs_aug)
    return imgs_aug

In [None]:
## Construct the model.
## define the backbone and training from scracth or from pretrained weights

# if clf_model == 'load_custom_model': 
#     CNN_saved_name = 'saved_model_name.h5' # new
#     CNN_clf_model_path = '/saved_dir/' + CNN_saved_name
#     model = load_model(CNN_clf_model_path , compile=True)
if clf_model == 'resnet': 
    model = ResNet50(weights="imagenet", include_top=False, input_shape=(img_size, img_size,3)) # Xception ResNet50
if clf_model == 'vgg': # "imagenet" or None
    model = VGG16(weights="imagenet", include_top=False, input_shape=(img_size, img_size,3)) # None , "imagenet"
if clf_model == 'densenet': 
    model = DenseNet121(weights="imagenet", include_top=False, input_shape=(img_size, img_size,3)) # Xception ResNet50    
if clf_model == 'simple_cnn':
    input_D = Input(shape=(img_size, img_size, 3))
    X = Conv2D(filters=128, kernel_size=5, strides=2, activation='relu', padding='same')(input_D) #### HP #####
    X = Conv2D(filters=128, kernel_size=7, strides=1, activation='relu', padding='same')(X)
    X = Dropout(0.15)(X)
    X = Conv2D(filters=256, kernel_size=5, strides=2, activation='relu', padding='same')(X)
    X = Conv2D(filters=256, kernel_size=7, strides=1, activation='relu', padding='same')(X)
    X = Dropout(0.15)(X)
    X = Conv2D(filters=512, kernel_size=5, strides=2, activation='relu', padding='same')(X)
    X = Conv2D(filters=512, kernel_size=7, strides=1, activation='relu', padding='same')(X)
    X = Dropout(0.15)(X)
#     X = BatchNormalization()(X)
#     X = Dropout(0.1)(X)
    model = Model(input_D, X)
#####################################################################################################
if retrain_model==0:
    for layer in model.layers: # retrain all layers
        layer.trainable = False 
if retrain_model==1:
    for layer in model.layers:
        layer.trainable = True 
######################################################################################################
# flatten and add dense layers
if last_layers_are_needed == 1:
    X = model.output
    X = Flatten()(X)
    X = Dense(16, activation='relu')(X)  #### HP #####
    X = Dropout(0.5)(X)
    output_linear = Dense(1)(X)
    output_sigmoid = activations.sigmoid(output_linear)
    final_model = Model(inputs=model.inputs, outputs=output_sigmoid)
if last_layers_are_needed == 0:
    final_model = model

In [None]:
## hyperparameters and optimizer

batch_size = 64
epochs = 300
LEARNING_RATE = 0.00000001
optimizer=Adam(LEARNING_RATE)


In [None]:
## lossess and metrics

bce = tf.keras.losses.BinaryCrossentropy(from_logits=False, label_smoothing=0.3)

def bce_loss(y_real, y_pred):
    bce_losses = -1* ( y_real*tf.math.log(y_pred) + (1-y_real)*(tf.math.log(1 - y_pred)) ) # positive value
    # bce_losses = bce_losses[bce_losses>0.5]
    return tf.math.reduce_mean( bce_losses )

def calc_metrices(y_real, y_pred):
        clf_loss = bce(y_real, y_pred).numpy()
        fpr, tpr, thresholds = metrics.roc_curve(y_real , y_pred)
        auc_score = np.round(metrics.roc_auc_score(y_real , y_pred),2)
        gmeans = np.sqrt(tpr * (1-fpr))
        ix = np.argmax(gmeans) # locate the index of the largest g-mean
        gmax = np.round(gmeans[ix],2)
        th = np.round(thresholds[ix],2)
        tpr = np.round(tpr[ix],2)
        tnr = np.round(1-fpr[ix],2)
        
        y_pred_TH = np.copy(y_pred)
        y_pred_TH[y_pred>th] = 1
        y_pred_TH[y_pred<th] = 0
        acc = metrics.accuracy_score(y_real, y_pred_TH)
        f1 = f1_score(y_real, y_pred_TH)
        return clf_loss, tnr, tpr, gmax, th, auc_score, acc, f1
    
def plot_hist(y_real, y_pred):
        _ = plt.hist(y_pred[y_real==0] , 100, alpha = 0.5, color= 'r')
        _ = plt.hist(y_pred[y_real==1] , 100, alpha = 0.5, color= 'b')
        # plt.title('Quality scores based on real images', fontsize=20)
        handles = [Rectangle((0,0),1,1,color=c,ec="k") for c in ["red", "blue"] ]
        labels= ["low", "high"]
        plt.legend(handles, labels)
        plt.xlabel("classifier scores", fontsize=15)
        plt.ylabel("# of samples", fontsize=15)
        plt.show()

In [None]:
## Training model and save

test_imgs = X_test.astype("float32") / 255
test_imgs = np.expand_dims(test_imgs,-1)
test_imgs = test_imgs.repeat(3, -1)

idx = np.random.randint(0, X_train.shape[0], 2000)
train_imgs = np.copy(X_train[idx]).astype("float32") / 255
train_imgs = np.expand_dims(train_imgs,-1)
train_imgs = train_imgs.repeat(3, -1)
train_y = y_train[idx]

for epoch in range(epochs):
    print("epoch", epoch)
    iteration = 0
    for n_batch in range(len(X_train) // batch_size): 
        idx = np.random.randint(0, X_train.shape[0], batch_size)
        image_batch = np.copy(X_train[idx]).astype("float32") / 255
        image_batch = np.expand_dims(image_batch,-1)
        y_batch = np.copy(y_train[idx])
        y_batch = tf.convert_to_tensor(y_batch.astype('float32'))
        if aug_data_flag==1:
            image_batch = augment_imgs(image_batch) # 
        image_batch = image_batch.repeat(3, -1)
        with tf.GradientTape() as tape:
            clf_pred = final_model(image_batch, training=True)
            clf_loss = bce(y_batch, clf_pred)
        trainable_variables = final_model.trainable_variables
        clf_gradients = tape.gradient(clf_loss, trainable_variables)
        optimizer.apply_gradients(zip(clf_gradients, trainable_variables))
            
    # end of epoch - test 
    
    if epoch % 3 == 0:
    
        print('')
        train_clf_pred = final_model.predict(train_imgs)[:,0]    
        test_clf_pred = final_model.predict(test_imgs)[:,0]

        train_clf_loss, train_tnr, train_tpr, train_gmax, train_th, train_auc, train_acc, train_f1 = calc_metrices(train_y, train_clf_pred)
        test_clf_loss, test_tnr, test_tpr, test_gmax, test_th, test_auc, test_acc, test_f1 = calc_metrices(y_test, test_clf_pred)

        print('train', '                 ','test')
        print('loss', train_clf_loss, '    ', test_clf_loss)
        print('acc', train_acc, '    ', test_acc)
        print('gmax', train_gmax, '    ', test_gmax)
        print('th', train_th, '    ', test_th)
        print('tnr', train_tnr, '    ', test_tnr)
        print('tpr', train_tpr, '    ', test_tpr)
        print('auc', train_auc, '    ', test_auc)
        print('f1', train_f1, '    ', test_f1)
        print('')
    
    
        plot_hist(train_y, train_clf_pred)
        plot_hist(y_test, test_clf_pred)
    
    final_model.save(data_path + 'DISCOVERY/GENDER/GENDER_SAVED_MODELS/classifier.h5, include_optimizer=True)  
                     
