<a href="https://colab.research.google.com/github/vuongvmu/GCL_DemoCode/blob/main/face_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###Server/face_model

In [None]:
#config.py
import tensorflow as tf
import os

TRAIN_DATASET = "d:/AI_Server/dataset/cropped_train_dataset"
TEST_DATASET = "d:/AI_Server/dataset/cropped_test_dataset"

IMAGE_SIZE = (224, 224)

BATCH_SIZE = 256
BUFFER_SIZE = BATCH_SIZE * 2

AUTO = tf.data.AUTOTUNE

LEARNING_RATE = 0.0001
STEPS_PER_EPOCH = 50
VALIDATION_STEPS = 10
EPOCHS = 10

OUTPUT_PATH = "output"
MODEL_PATH = os.path.join(OUTPUT_PATH, "siamese_network")
OUTPUT_IMAGE_PATH = os.path.join(OUTPUT_PATH, "output_image.png")

In [None]:
#dataset.py
import tensorflow as tf
import numpy as np
import random
import os


class MapFunction():
    def __init__(self, imagesize):
        self.imagesize = imagesize

    def decode_and_resize(self, imagepath):
        image = tf.io.read_file(imagepath)
        image = tf.io.decode_jpeg(image, channels=3)
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)
        image = tf.image.resize(image, self.imagesize)
        return image

    def __call__(self, anchor, positive, negative):
        img_anchor = self.decode_and_resize(anchor)
        img_positive = self.decode_and_resize(positive)
        img_negative = self.decode_and_resize(negative)
        return (img_anchor, img_positive, img_negative)


class TripletGenerator:
    def __init__(self, datasetPath):
        self.peopleNames = list()
        for folderName in os.listdir(datasetPath):
            absoluteFolderName = os.path.join(datasetPath, folderName)
            numImages = len(os.listdir(absoluteFolderName))
            if numImages > 1:
                self.peopleNames.append(absoluteFolderName)
        self.allPeople = self.generate_all_people_dict()

    def generate_all_people_dict(self):
        allPeople = dict()
        for personName in self.peopleNames:
            imageNames = os.listdir(personName)
            personPhotos = [os.path.join(personName, imageName) for imageName in imageNames]
            allPeople[personName] = personPhotos
        return allPeople

    def get_next_element(self):
        while True:
            anchorName = random.choice(self.peopleNames)
            temporaryNames = self.peopleNames.copy()
            temporaryNames.remove(anchorName)
            negativeName = random.choice(temporaryNames)
            (anchorPhoto, positivePhone) = random.choices(
                self.allPeople[anchorName],
                k=2,
                # replace=False
            )
            negativePhoto = random.choice(self.allPeople[negativeName])
            yield (anchorPhoto, positivePhone, negativePhoto)


In [None]:
#model.py
from tensorflow.keras.applications import resnet
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf


def get_embedding_module(image_size):
    inputs = keras.Input(image_size + (3,))
    x = resnet.preprocess_input(inputs)
    baseCnn = resnet.ResNet50(weights="imagenet", include_top=False)
    baseCnn.trainable = False

    extractedFeatures = baseCnn(x)
    x = layers.GlobalAvgPool2D()(extractedFeatures)
    x = layers.Dense(units=1024, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=512, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=256, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(units=128)(x)
    embedding = keras.Model(inputs, outputs, name="embedding")
    return embedding


def get_siamese_network(imageSize, embeddingModel):
    anchorInput = keras.Input(name="anchor", shape=imageSize + (3,))
    positiveInput = keras.Input(name="positive", shape=imageSize + (3,))
    negativeInput = keras.Input(name="negative", shape=imageSize + (3,))

    anchorEmbedding = embeddingModel(anchorInput)
    positiveEmbedding = embeddingModel(positiveInput)
    negativeEmbedding = embeddingModel(negativeInput)

    siamese_network = keras.Model(
        inputs=[anchorInput, positiveInput, negativeInput],
        outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding],
        name="siameseNetwork"
    )
    return siamese_network


class SiameseModel(keras.Model):

    def __init__(self, siameseNetwork, margin, lossTracker):
        super().__init__()
        self.siameseNetwork = siameseNetwork
        self.margin = margin
        self.lossTracker = lossTracker

    def _compute_distance(self, inputs):
        (anchor, positive, negative) = inputs
        embeddings = self.siameseNetwork((anchor, positive, negative))
        anchorEmbedding = embeddings[0]
        positiveEmbedding = embeddings[1]
        negativeEmbedding = embeddings[2]
        apDistance = tf.reduce_sum(tf.square(anchorEmbedding - positiveEmbedding), axis=-1)
        anDistance = tf.reduce_sum(tf.square(anchorEmbedding - negativeEmbedding), axis=-1)
        return (apDistance, anDistance)

    def _compute_loss (self, apDistance, anDistance):
        loss = apDistance - anDistance
        loss = tf.maximum(loss+self.margin,0.0)
        return loss

    def call(self, inputs):
        (apDistance, anDistance) = self._compute_distance(inputs)
        return (apDistance, anDistance)

    def train_step(self, inputs):
        with tf.GradientTape() as tape:
            (apDistance, anDistance) = self._compute_distance(inputs)
            loss = self._compute_loss(apDistance, anDistance)
        gradients = tape.gradient(
                loss,
                self.siameseNetwork.trainable_variables)
        self.optimizer.apply_gradients(
                zip(gradients, self.siameseNetwork.trainable_variables)
            )
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    def test_step(self, inputs):
        (apDistance, anDistance) = self._compute_distance(inputs)
        loss = self._compute_loss(apDistance, anDistance)
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}

    @property
    def metrics(self):
        return [self.lossTracker]







# image_size = (224, 224)
# embedding_model = get_embedding_module(image_size=image_size)
# siamese_network = get_siamese_network(image_size, embedding_model)
# siamese_network.summary()


###Server

In [None]:
#crop_faces.py

from imutils.paths import list_images
from tqdm import tqdm
import numpy as np
import argparse
import cv2
import os

ap = argparse.ArgumentParser()
ap.add_argument("-d", "--dataset",
                required=True, help="path to input dataset")
ap.add_argument("-o", "--output",
                required=True, help="put to output dataset")
ap.add_argument("-p", "--prototxt",
                required=True, help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m","--model",
                required=True, help="path to Caffe pre-trained model")
ap.add_argument("-c", "--confidence",
                type=float, default=0.5, help="minimum probability to filter weak detection")
args = vars(ap.parse_args())


net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
if not os.path.exists(args["output"]):
    os.makedirs(args["output"])


names = os.listdir(args["dataset"])


for name in tqdm(names):
    dirPath = os.path.join(args["dataset"], name)
    if os.path.isdir(dirPath):
        imagePaths = list(list_images(dirPath))
        outputDir = os.path.join(args["output"], name)
        if not os.path.exists(outputDir):
            os.makedirs(outputDir)
        for imagePath in imagePaths:
            imageID = imagePath.split(os.path.sep)[-1]
            image = cv2.imread(imagePath)
            (h,w) = image.shape[:2]
            blob = cv2.dnn.blobFromImage(cv2.resize(image,
                                                    (300,300)), 1.0,
                                         (300,300),(104,117.0,123.0))
            net.setInput(blob)
            detections = net.forward()
            i = np.argmax(detections[0,0,:,2])
            confidence = detections[0,0,1,2]
            if confidence> args["confidence"]:
                maxDim = np.max(detections[0,0,i,3:7])
                if maxDim >1.0:
                    continue
                box = np.clip(detections[0,0,i,3:7],0.0,1.0)
                box = box * np.array([w,h,w,h])
                (startX, startY, endX, endY) = box.astype("int")
                face = image[startY:endY, startX:endX]
                facePath = os.path.join(outputDir, imageID)
                cv2.imwrite(facePath, face)

print("[INFO] finished cropping faces and saving them to disk...")



In [None]:
#train.py
from face_model.dataset import TripletGenerator
from face_model.model import get_embedding_module
from face_model.model import get_siamese_network
from face_model.model import SiameseModel
from face_model.dataset import MapFunction
from face_model import config
from tensorflow import keras
import tensorflow as tf
import os

trainTripletGenerator = TripletGenerator(
    datasetPath=config.TRAIN_DATASET)

valTripletGenerator = TripletGenerator(
    datasetPath=config.TRAIN_DATASET)

trainTfDataset = tf.data.Dataset.from_generator(
    generator=trainTripletGenerator.get_next_element,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string)
    )
)

valTfDataset = tf.data.Dataset.from_generator(
    generator=valTripletGenerator.get_next_element,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string)
    )
)

mapFunction = MapFunction(imagesize=config.IMAGE_SIZE)
print("[INFO] building the train and validation `tf.data` pipeline...")

trainDs = (trainTfDataset
           .map(mapFunction)
           .shuffle(config.BUFFER_SIZE)
           .batch(config.BATCH_SIZE)
           .prefetch(config.AUTO)
           )
valDs = (valTfDataset
         .map(mapFunction)
         .batch(config.BATCH_SIZE)
         .prefetch(config.AUTO)
         )
print("[INFO] build the siamese model...")

embedingModule = get_embedding_module(image_size=config.IMAGE_SIZE)
siameseNetwork = get_siamese_network(
    imageSize=config.IMAGE_SIZE,
    embeddingModel=embedingModule)
siameseModel = SiameseModel(
    siameseNetwork=siameseNetwork,
    margin=0.5,
    lossTracker=keras.metrics.Mean(name='loss'))
siameseModel.compile(
    optimizer=keras.optimizers.Adam(config.LEARNING_RATE))

print("[INFO] training the siamese model...")
siameseModel.fit(
    trainDs,
    steps_per_epoch=config.STEPS_PER_EPOCH,
    validation_data=valDs,
    validation_steps=config.VALIDATION_STEPS,
    epochs=config.EPOCHS
    )



if os.path.exists(config.MODEL_PATH):
    os.makedirs(config.OUTPUT_PATH)

modelPath = config.MODEL_PATH
keras.models.save_model(
    model=siameseModel.siameseNetwork,
    filepath=config.modelPath,
    include_optimizer=False)

In [None]:
#test.py
from face_model.dataset import TripletGenerator
from face_model.dataset import MapFunction
from face_model.model import SiameseModel
from matplotlib import pyplot as plt
from face_model import config
from tensorflow import keras
import tensorflow as tf
import os

testTripletGenerator = TripletGenerator(datasetPath=config.TEST_DATASET)
testTfDataset = tf.data.Dataset.from_generator(
    generator=testTripletGenerator.get_next_element,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string)
    ))

mapFunction = MapFunction(imagesize=config.IMAGE_SIZE)

testDs = (testTfDataset
          .map(mapFunction)
          .batch(4)
          .prefetch(config.AUTO)
          )
modelPath = config.MODEL_PATH
print(f"[INFO] loading the siamese network from {modelPath}...")
siameseNetwork = keras.models.load_model(filepath=modelPath)
siameseModel = SiameseModel(
    siameseNetwork=siameseNetwork,
    margin=0.5,
    lossTracker=keras.metrics.Mean(name="loss"), )
# load the test data
(anchor, positive, negative) = next(iter(testDs))
(apDistance, anDistance) = siameseModel((anchor, positive, negative))
plt.figure(figsize=(10, 10))
rows = 4
for row in range(rows):
    plt.subplot(rows, 3, row * 3 + 1)
    plt.imshow(anchor[row])
    plt.axis("off")
    plt.title("Anchor image")
    plt.subplot(rows, 3, row * 3 + 2)
    plt.imshow(positive[row])
    plt.axis("off")
    plt.title(f"Positive distance: {apDistance[row]:0.2f}")
    plt.subplot(rows, 3, row * 3 + 3)
    plt.imshow(negative[row])
    plt.axis("off")
    plt.title(f"Negative distance: {anDistance[row]:0.2f}")
# check if the output directory exists, if it doesn't, then
# create it
if not os.path.exists(config.OUTPUT_PATH):
    os.makedirs(config.OUTPUT_PATH)
# save the inference image to disk
outputImagePath = config.OUTPUT_IMAGE_PATH
print(f"[INFO] saving the inference image to {outputImagePath}...")
plt.savefig(fname=outputImagePath)
