In [None]:
#!/usr/bin/env python

import numpy as np
import pandas as pd
import os
os.environ["KERAS_BACKEND"] = "plaidml.keras.backend"
os.environ["PLAIDML_EXPERIMENTAL"] = "1"
os.environ["PLAIDML_DEVICE_IDS"] = "opencl_nvidia_nvidia_geforce_rtx_3060_ti.0" #Edit for correct GPU
from keras import backend as K
import random

import keras

import tensorflow as tf
from keras.applications.resnet50 import ResNet50

import matplotlib.pyplot as plt
from keras.preprocessing.image import load_img
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import *
from keras.models import *
from keras.losses import *
from keras.optimizers import *
from keras.optimizers import Adam
from keras.utils.data_utils import Sequence
from keras.models import Sequential
from keras.utils import to_categorical

from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split

In [None]:
# Setting paths for training and testing directories
train_dir = r'D:\Datasets FAces\Dataset_Final\Training/' #Set own path
train_paths = []
for label in os.listdir(train_dir):
    for file in os.listdir(train_dir+label):
        train_paths.append(train_dir+label+'/'+file)
random.shuffle(train_paths)

test_dir = r'D:\Datasets FAces\Dataset_Final\test_test/' #Set own path
test_paths = []
for label in os.listdir(test_dir):
    for file in os.listdir(test_dir+label):
        test_paths.append(test_dir+label+'/'+file)
random.shuffle(test_paths)

# Predict on new data:

predict_dir = r'D:/Datasets FAces/Dataset_Final/test_final/' #Set own path
predict_paths = []
for label in os.listdir(predict_dir):
    for file in os.listdir(predict_dir+label):
        predict_paths.append(predict_dir+label+'/'+file)


In [None]:
# Generate and list labels
labels = os.listdir(train_dir)
labels

In [None]:
# Open images in directory
def open_images(paths):
    images = []
    for path in paths:
        image = load_img(path, target_size=(128,128))
        input_arr = keras.preprocessing.image.img_to_array(image)
        input_arr = np.array(input_arr)/255.0 # Convert single image to a batch
        images.append(input_arr)
    return np.array(images)

In [None]:
# Extract labels from paths:
def get_labels(paths):
    label = []
    for path in paths:
        path = path.split('/')[-2]
        label.append(labels.index(path))
    return label

In [None]:
test_images = open_images(test_paths)
test_labels = get_labels(test_paths)
test_labels = to_categorical(test_labels)

In [None]:
train_images = open_images(train_paths)
train_labels = get_labels(train_paths)
train_labels = to_categorical(train_labels)
print('loaded')

In [None]:
plt.figure(figsize=(10,10)) # specifying the overall grid size

for i in range(4):
    plt.subplot(2,2,i+1)
    plt.imshow(train_images[i])
    plt.title(train_paths[i].split('/')[-2]) #Grabs the label from path

plt.show()

In [None]:
#batch_size=32
batch_size=128
#epochs = 30

In [None]:
inputs = train_images
targets = train_labels

In [None]:
# Create the Validation Dataset
Xtrain, Xval, ytrain, yval = train_test_split(inputs, targets, train_size=0.9, test_size=0.1, random_state=42)
Xtrain, Xtest, ytrain, ytest = train_test_split(Xtrain, ytrain, train_size=0.78, random_state=42)

In [None]:
# Increase number of training samples

train_datagen = ImageDataGenerator(horizontal_flip=True, 
                                   vertical_flip=True)
it = train_datagen.flow(Xtrain, ytrain)

In [None]:
# Augment validation set
val_datagen = ImageDataGenerator(horizontal_flip=True, 
                                   vertical_flip=True)
val_it = val_datagen.flow(Xval, yval)

In [None]:
# Augment test set
test_datagen = ImageDataGenerator(horizontal_flip=True, 
                                   vertical_flip=True)
test_it = val_datagen.flow(Xtest, ytest)

In [None]:
import math
steps_per_epoch = math.ceil(len(Xtrain)/32) #Recommended size. 32 is the inital batch size
#steps_per_epoch = 300

In [None]:
# Apply early stopping to find optimal number of epochs
from keras import callbacks
earlystopping = callbacks.EarlyStopping(monitor ="val_loss",
                                        mode ="min", patience = 10,
                                        restore_best_weights = True)


In [None]:
# Add feature extractors from ResNet50 pretrained network

#restnet = ResNet50(include_top=False, weights='imagenet', input_shape=(128,128,3))
#output = restnet.layers[-1].output
#output = keras.layers.Flatten()(output)
#restnet = Model(restnet.input, output=output)
#for layer in restnet.layers:
#    layer.trainable = False
#restnet.summary()

In [None]:
# Add feature extractors from VGG16/VGG19 pretrained network
#from keras.applications.vgg19 import VGG19
#from keras.models import Model

# load model
#VGG = VGG19(include_top=False, input_shape=(128, 128, 3))

In [None]:
# Define the K-fold Cross Validator

num_folds = 5
kfold = KFold(n_splits=num_folds, shuffle=True)

acc_per_fold = []
loss_per_fold = []

# K-fold Cross Validation model evaluation
fold_no = 1

for train, test in kfold.split(it):

    # Define the model architecture
    model = Sequential()
    # ResNet Model
    #model.add(restnet)
    #model.add(Dense(512, activation='relu', input_dim=(128,128,3)))
    #model.add(Dropout(0.3))
    #model.add(Dense(128, activation='relu'))
    #model.add(Dropout(0.3))
    #################################################################
    # VGG Model
    #model.add(VGG)
    #model.add(Dense(512, activation='relu', input_dim=(128,128,3)))
    #model.add(Dropout(0.3))
    #model.add(Dense(128, activation='relu'))
    #model.add(Dropout(0.3))
    #################################################################
    # Plain CNN Model as reference
    model.add(Conv2D(32, (3, 3),
                 activation='relu',
                 input_shape=(128,128,3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(64, (3, 3),activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(128, (3, 3),activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.4))

    model.add(Flatten()) # Uncomment for VGG16, comment for ResNet50
    model.add(Dense(64))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(3))
    model.add(Dense(3, activation='softmax'))
    #model.add(Activation('softmax'))

    model.compile(loss=keras.losses.categorical_crossentropy,
              #optimizer=optimizers.RMSprop(lr=2e-5), # Alternative optimizer
              optimizer='adam',
              metrics=['accuracy'])


    # Generate a print
    print('------------------------------------------------------------------------')
    print(f'Training for fold {fold_no} ...')

    # Fit data to model

    history = model.fit_generator(
    it, epochs=300, verbose=2,
    callbacks=[earlystopping], validation_data=(val_it), shuffle=True,
    class_weight=None,steps_per_epoch=steps_per_epoch,
    validation_steps=None,
    max_queue_size=10, workers=1, use_multiprocessing=False)

    # Generate generalization metrics
    scores = model.evaluate_generator(test_it, verbose=0)
    print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])

    # Increase fold number
    fold_no = fold_no + 1
# == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
    print('------------------------------------------------------------------------')
    print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print('------------------------------------------------------------------------')

In [None]:
model.summary()

In [None]:
# Save model to directory
model.save('Keras CNN Model') # Edit to own liking

In [None]:
# Print accuracy and loss
score = model.evaluate_generator(test_it, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

In [None]:
y_predicted = []
y_labels = []
for i in range(0,len(test_paths)):
    images = open_images([test_paths[i]])
    predict = model.predict(images)[0]
    predict = np.argmax(predict)
    predict = labels[predict]
    y_predicted.append(predict)
    label = test_paths[i].split('/')[-2]
    y_labels.append(label)

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_labels, y_predicted, target_names=labels))

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
# Predict Facemask
for i in range(0,len(predict_paths)):
    images = open_images([predict_paths[i]])
    predicted = model.predict(images)[0]
    predicted = np.argmax(predicted)
    predicted = labels[predicted]
    label = predict_paths[i].split('/')[-2]
    print(predict_paths[i],'Predicted:', predicted)

In [None]:
# Predict Facemask
predict_dir = r'D:/Datasets FAces/test/' #Set own path
predict_paths = []
for label in os.listdir(predict_dir):
    for file in os.listdir(predict_dir+label):
        predict_paths.append(predict_dir+label+'/'+file)

for i in range(0,len(predict_paths)):
    images = open_images([predict_paths[i]])
    predicted = model.predict(images)[0]
    predicted = np.argmax(predicted)
    predicted = labels[predicted]
    label = predict_paths[i].split('/')[-2]
    print(predict_paths[i],'Predicted:', predicted)