# Imports

In [None]:
import cv2
import os
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import imutils
from tqdm import tqdm
from pylab import rcParams
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model

# Methods

In [None]:
#-- Dataset loader --#

def fetch_files(dir:str, filetype:list, arr:list = []):
    with os.scandir(dir) as content:
        for item in content:
            if os.path.isdir(dir + '/' + item.name):
                arr = fetch_files(dir + '/' + item.name, filetype, arr)
            elif item.name.split('.')[-1] in filetype:
                arr.append(item)
    return arr

def load_dataset(arr:list):
    for i in arr:
        assert type(i) == os.DirEntry
    data = [cv2.cvtColor(cv2.imread(x.path), cv2.COLOR_BGR2RGB) for x in arr]
    label = [x.path.replace(x.name, '')[:-1].split('/')[-1] for x in arr]
    return data, label

#-- Data Processer --#

def center_crop_image(img):
    h, w = img.shape[:2]
    aspect = h/w
    if aspect > 1:
        offset = int(np.round((h / 2) - (w / 2)))
        return img[offset:w + offset, 0:w]
    else:
        offset = int(np.round((w / 2) - (h / 2)))
        return img[0:h, offset:h + offset]

def process_images(images, image_size):
    cropped_images = [center_crop_image(img) for img in images]
    resized_images = [cv2.resize(img, (image_size, image_size), cv2.INTER_AREA) for img in cropped_images]
    return resized_images

def data_augmenter(data, label, rot=0, step=1, flip=True):
    """Augments the data.
    
    ### Parameters
    data : array_like
        Data to be augmented.
    label : array_like
        Label which corresponding to the data.
    rot : integer
        Rotataion degree clockwise and counterclockwise.
    step : integer
        Number of steps to be rotated within the rot range.
    flip : bool
        Flips and doubles the data, including all rotations.
    
    Calculation used for rot and step params: range(- rot, rot + 1, step)]
    
    ### Returns
    data : list
        The augmented data.
    label : list
        List of the labels corresponding to the augmented data."""
    data_pp, label_pp = [], []
    rotation = [x for x in range(- rot, rot + 1, step)]
    for d, l in zip(data, label):
        for r in rotation:
            frame = imutils.rotate(d, r)
            data_pp.append(frame)
            label_pp.append(l)
            if flip:
                frame = cv2.flip(frame, 1)
                data_pp.append(frame)
                label_pp.append(l)
    return data_pp, label_pp

#-- Data visualizer --#

def visualizer(x, y=None, grid=None, font=None):
    if not font:
        font = {'font.family': 'Arial', 'font.size' : 12}
    rcParams.update(font)
    if not grid or 1 in grid:
        fig = plt.figure(figsize=(6,6))
        plt.tick_params(axis='both', bottom=False, left=False, labelbottom=False, labelleft=False)
        plt.imshow(x, cmap="Greys")
        if y: plt.title(y)
    else:
        fig, axes = plt.subplots(grid[0], grid[1],figsize=(10,10))
        for row in axes:
            for axe in row:
                axe.tick_params(axis='both', bottom=False, left=False, labelbottom=False, labelleft=False)
                r = np.random.randint(len(x))
                axe.imshow(x[r], cmap="Greys")
                if y: axe.set_title(y[r])
                #plt.tight_layout()
    plt.show()

# Config

In [None]:
dataset = 'B:/dataset/size224_seed1_conf0.7_limit_onlymask'

image_size = 224

rot = 16
step = 8
flip = True

# Load dataset

In [None]:
data, label = load_dataset(fetch_files(dataset, filetype=['jpg']))

In [None]:
data = process_images(data, image_size)

In [None]:
visualizer(data, label, grid=(4,5))

In [None]:
X_train, X_rem, y_train, y_rem = train_test_split(data, label, train_size=0.8)
X_valid, X_test, y_valid, y_test = train_test_split(X_rem,y_rem, test_size=0.5)

In [None]:
X_train, y_train = data_augmenter(X_train, y_train, rot, step, flip)

In [None]:
# Reshape X
X_train = np.array(X_train).reshape(-1, image_size, image_size, 3)
X_test = np.array(X_test).reshape(-1, image_size, image_size, 3)
X_valid = np.array(X_valid).reshape(-1, image_size, image_size, 3)

# Categorize y
unique_labels = []
[unique_labels.append(x) for x in label if x not in unique_labels]
y_train = np.array([unique_labels.index(x) for x in y_train])
y_test = np.array([unique_labels.index(x) for x in y_test])
y_valid = np.array([unique_labels.index(x) for x in y_valid])

# encode the y labels
categories = len(unique_labels)
y_train = to_categorical(y_train, categories)
y_test = to_categorical(y_test, categories)
y_valid = to_categorical(y_valid, categories)

# CNN

### MobileNet

In [None]:
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout, AveragePooling2D

def init_model():
    model = Sequential()
    model.add(MobileNetV2(weights='imagenet', input_shape=(image_size, image_size, 3), include_top=False))
    # model.add(Flatten())

    model.add(AveragePooling2D(pool_size=(7,7)))
    model.add(Flatten())
    model.add(Dense(128, 'relu'))
    model.add(Dropout(0.5))

    model.add(Dense(categories, 'softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])
    
    return model

model = init_model()
model.summary()

### AlexNet (no transfer-learning)

In [None]:
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPooling2D, Flatten, Dense, Dropout
# from keras.optimizers import Adam

# # AlexNet
# def init_model():
#     model = Sequential()
    
#     # 1st Convelutional Layer
#     model.add(Conv2D(filters=96, kernel_size=(11,11), strides=(4,4), padding='same',
#     input_shape=(image_size,image_size,3)))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'))

#     # 2nd Convelutional Layer
#     model.add(Conv2D(filters=256, kernel_size=(5, 5), strides=(1,1), padding='same'))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'))

#     # 3rd Convelutional Layer
#     model.add(Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), padding='same'))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))

#     # 4th Convelutional Layer
#     model.add(Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), padding='same'))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))

#     # 5th Convelutional Layer
#     model.add(Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same'))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'))

#     # Fully connected layer
#     model.add(Flatten())
#     model.add(Dense(4096))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(Dropout(0.4))

#     # 2nd Connected Layer
#     model.add(Dense(4096))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(Dropout(0.4))

#     # 3nd Connected Layer
#     model.add(Dense(1000))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     model.add(Dropout(0.4))

#     # Output
#     model.add(Dense(categories))
#     model.add(BatchNormalization())
#     model.add(Activation('softmax'))
#     model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
#     return model
    
# model = init_model()
# model.summary()

### ResNet101V2

In [None]:
# from tensorflow.keras.applications import ResNet101V2
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Flatten, Dense, BatchNormalization, Dropout, AveragePooling2D

# def init_model():
#     model = Sequential()
#     model.add(ResNet101V2(weights='imagenet', input_shape=(image_size, image_size, 3), include_top=False))
#     # model.add(Flatten())

#     model.add(AveragePooling2D(pool_size=(7,7)))
#     model.add(Flatten())
#     model.add(Dense(128, 'relu'))
#     model.add(Dropout(0.5))

#     model.add(Dense(categories, 'softmax'))
#     model.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])
    
#     return model

# model = init_model()
# model.summary()

# Training

In [None]:
history = model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=20, verbose=1)

In [None]:
# result = []
# for _ in tqdm(range(10), desc='Traning'):
#     model = init_model()
#     model.fit(X_train, y_train, epochs=30, verbose=0)
#     _, acc = model.evaluate(X_test, y_test, verbose=0)
#     result.append(acc)
# print(np.mean(result))

# Results

In [None]:
_, eval_accuracy = model.evaluate(X_test, y_test, verbose=2)

In [None]:
rcParams.update({'font.family': 'Arial', 'font.size' : 12})
plt.plot(history.history['accuracy'])
plt.plot(history.history['loss'])
plt.plot(history.history['val_accuracy'])
plt.plot(history.history['val_loss'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
# plt.ylim(bottom=0)
plt.xlabel('Epoch')
plt.legend(['accuracy', 'loss', 'val_accuracy', 'val_loss'])
plt.grid()
plt.show()

In [None]:
# Convert categorical back to integers
y_test_true = np.array([np.where(x == x.max())[0][0] for x in y_test])
y_test_pred = np.array([np.where(x == x.max())[0][0] for x in model.predict(X_test)])

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test_true, y_test_pred)
cmd = ConfusionMatrixDisplay(cm, display_labels=unique_labels)
cmd.plot()

In [None]:
# Displays a sample of what the model got wrong
visualizer([x for c, x in enumerate(X_test) if y_test_pred[c] != y_test_true[c]], [f'Predicted: {unique_labels[y[0]]}\nActual: {unique_labels[y[1]]}' for c, y in enumerate(zip(y_test_pred, y_test_true)) if y_test_pred[c] != y_test_true[c]], grid=(2,3))

# System Test

In [None]:
import cv2
import os
import numpy as np
import time

def center_box(box):
    startX, startY, endX, endY = box.astype("int")
    h = endY - startY
    w = endX - startX
    aspect = h/w
    if aspect > 1:
        offset = int(np.round((h / 2) - (w / 2)))
        return startX - offset, startY, endX + offset, endY
    else:
        offset = int(np.round((w / 2) - (h / 2)))
        return startX, startY - offset, endX, endY + offset

def classify_image(interpreter, image):
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    input_data = image
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    return output_data

# Face detect model
prototxt_path = 'deploy.prototxt'
weights_path = 'res10_300x300_ssd_iter_140000_fp16.caffemodel'
face_detect = cv2.dnn.readNet(prototxt_path, weights_path)
# Red, Green, Yellow
color = [(0,0,255),(0,255,0),(128,128,0)]

### Single Classifier Test

In [None]:
# Reassign the current label (just in case)
unique_labels = []
[unique_labels.append(x) for x in label if x not in unique_labels]

# Camera init
video_capture = cv2.VideoCapture(0)

# Used for fps calculation
time1 = 0
time2 = 0

# Main loop
while(video_capture.isOpened()):
    # Start timer for fps calculation
    time1 = time.time()

    # Read frame from camera
    ret, frame = video_capture.read()
    if not ret:
        print('Camera unavailable')
        break
    h, w = frame.shape[:2]

    # OpenCV DNN pre-processing
    blob = cv2.dnn.blobFromImage(frame, 1.0, (image_size, image_size))
    # Process the pre-processed frame & find faces
    face_detect.setInput(blob)
    faces = face_detect.forward()

    # For all faces detected in frame
    for i in range(0, faces.shape[2]):
        # Get the confidence that it is a face 
        confidence = faces[0, 0, i, 2]
        if confidence > 0.7:
            # Get coordinates for face
            box = faces[0, 0, i, 3:7] * np.array([w, h, w, h])
            startX, startY, endX, endY = center_box(box)

            # ensure the bounding boxes fall within the dimensions of the frame
            startX, startY = max(0, startX), max(0, startY)
            endX, endY = min(w - 1, endX), min(h - 1, endY)

            # Pre-process frame
            frame_crop = frame[startY:endY, startX:endX]
            frame_crop_resize = cv2.resize(frame_crop, (image_size, image_size), cv2.INTER_AREA)
            frame_crop_resize_reshape = np.array(frame_crop_resize, dtype=np.float32).reshape(-1, image_size, image_size, 3)
            
            # Make a prediction
            pred = model.predict(frame_crop_resize_reshape)

            # Convert prediction to lables
            pred_index = np.argmax(pred[0])

            # Draw a rectangle around the faces with classification
            text = unique_labels[pred_index] + ' (' +  str(round(pred[0][pred_index] * 100)) + '%)'
            cv2.rectangle(frame, (startX, startY), (endX, endY), color[pred_index], 2)
            cv2.putText(frame, text, (startX, startY - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color[pred_index], 1, cv2.LINE_AA)

    # Calculate fps
    fps = 1/(time1 - time2)
    time2 = time1
    # Draw fps on frame
    cv2.putText(frame, str(int(fps)) + 'fps', (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 1, cv2.LINE_AA)
        
    # Display the resulting frame
    cv2.imshow('window', frame)
    
    # Press ESC to quit
    if cv2.waitKey(1) & 0xFF == 27:  
        break

video_capture.release()
cv2.destroyAllWindows()

### Two-Stage Classifier Test 

In [None]:
model_folder = 'model_3'
model_hasmask = load_model(model_folder)

In [None]:
# Reassign the current label (just in case)
unique_labels = []
[unique_labels.append(x) for x in label if x not in unique_labels]

# Takes a backup of the label
unique_labels_backup = unique_labels.copy()

# Camera init
video_capture = cv2.VideoCapture(0)

# Used for fps calculation
time1 = 0
time2 = 0

# Main loop
while(video_capture.isOpened()):
    # Start timer for fps calculation
    time1 = time.time()

    # Read frame from camera
    ret, frame = video_capture.read()
    if not ret:
        break
    h, w = frame.shape[:2]

    # OpenCV DNN pre-processing
    blob = cv2.dnn.blobFromImage(frame, 1.0, (image_size, image_size))
    # Process the pre-processed frame & find faces
    face_detect.setInput(blob)
    faces = face_detect.forward()

    # For all faces detected in frame
    for i in range(0, faces.shape[2]):
        # Get the confidence that it is a face 
        confidence = faces[0, 0, i, 2]
        if confidence > 0.7:
            # Get coordinates for face
            box = faces[0, 0, i, 3:7] * np.array([w, h, w, h])
            startX, startY, endX, endY = center_box(box)

            # ensure the bounding boxes fall within the dimensions of the frame
            startX, startY = max(0, startX), max(0, startY)
            endX, endY = min(w - 1, endX), min(h - 1, endY)

            # Pre-process frame
            frame_crop = frame[startY:endY, startX:endX]
            frame_crop_resize = cv2.resize(frame_crop, (image_size, image_size), cv2.INTER_AREA)
            frame_crop_resize_reshape = np.array(frame_crop_resize, dtype=np.float32).reshape(-1, image_size, image_size, 3)
            
            # Make a prediction
            pred = model_hasmask.predict(frame_crop_resize_reshape)

            # Sets label to first classifier
            # NOTE: Make sure the labels actually match the category
            unique_labels = ['no mask', 'mask']

            # Convert prediction to lables
            pred_index = np.argmax(pred[0])

            if unique_labels[pred_index] == 'mask':
                # Reassign the unique label
                unique_labels = unique_labels_backup

                # Make a prediction
                pred = model.predict(frame_crop_resize_reshape)

                # Convert prediction to lables
                pred_index = np.argmax(pred[0])

            # Draw a rectangle around the faces with classification
            text = unique_labels[pred_index] + ' (' +  str(round(pred[0][pred_index] * 100)) + '%)'
            cv2.rectangle(frame, (startX, startY), (endX, endY), color[pred_index], 2)
            cv2.putText(frame, text, (startX, startY - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color[pred_index], 1, cv2.LINE_AA)

    # Calculate fps
    fps = 1/(time1 - time2)
    time2 = time1
    # Draw fps on frame
    cv2.putText(frame, str(int(fps)) + 'fps', (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 1, cv2.LINE_AA)
        
    # Display the resulting frame
    cv2.imshow('window', frame)
    
    # Press ESC to quit
    if cv2.waitKey(1) & 0xFF == 27:  
        break

video_capture.release()
cv2.destroyAllWindows()

# Save Model

In [None]:
model_name = 'model_8'

In [None]:
model.save(model_name)

In [None]:
# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model(model_name) # path to the SavedModel directory
tflite_model = converter.convert()

# Save the model.
with open(model_name + '.tflite', 'wb') as f:
  f.write(tflite_model)