#Import and Initial Mount Disk


In [1]:
# install library
!pip install -U tensorflow-addons
!pip install facenet-pytorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import os
# Mount drive
from google.colab import drive
drive.mount("/content/drive")
path = "/content/drive/My Drive/Colab Notebooks/FaceMaskRecognize"
os.chdir(path)

import time
import tensorflow_addons as tfa
import tensorflow as tf
from tensorflow.keras import models, layers, metrics, optimizers, Model
from functools import partial
import matplotlib.pyplot as plt
import numpy as np
import cv2
import math
import io
import pickle
import tensorflow_datasets as tfds
import random
from train.Net import InceptionResNetV1
from train.FaceNet import FaceNetModel,call_instance_FaceNet_with_custom, call_instance_FaceNet_without_custom,call_instance_FaceNet_with_last_isDense, convert_train_model_to_embedding
from train.Classify import Classify
from tool.FormatFunction import FormatFunction
from tool.FileFunction import FileFunction
from tool.GlobalValue import GlobalValue

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Train

## Init value

In [3]:
READ_RAW_DATA_THEN_SAVE = False
MODEL_NAME = "Inception-Restnet-Entropy-ASIAN"
path_save_model = os.path.join(os.getcwd(), "save_model", MODEL_NAME)
global_value = GlobalValue(image_size=[160,160], batch_size = 96, shuffle_size = 1000, ratio_train = 0.9, ratio_test = 0.1, ratio_valid = 0.0, epochs = 40, small_epochs = 50,
                           image_each_class = 15)
format_function = FormatFunction(global_value)
file_function = FileFunction()

##Prepare folder and other thing

In [4]:
# Create folder to save model
if not os.path.exists(path_save_model):
  os.makedirs(path_save_model)

#Read label dictionary(name of people not the path of image)
if READ_RAW_DATA_THEN_SAVE: 
  label_dict = dict()
  label_dict.update(format_function.get_label_dict(os.path.join(os.getcwd(),"align_image")))
  label_dict.update(format_function.get_label_dict(os.path.join(os.getcwd(),"dataset","AFDB")))
  path = os.path.join(os.getcwd(),"src","data","label_dict.pkl")
  with open(path, 'wb') as file:
    pickle.dump(label_dict, file)
path = os.path.join(os.getcwd(),"src","data","label_dict.pkl")
with open(path, 'rb') as f:
  label_dict = pickle.load(f)

#Save data path to file to read faster
if READ_RAW_DATA_THEN_SAVE:
  path_image_no_mask = list()
  path_image_no_mask.extend(file_function.get_data_path_by_dictionary(os.path.join(os.getcwd(),"align_image")))
  path_image_no_mask.extend(file_function.get_data_path_by_dictionary(os.path.join(os.getcwd(),"dataset", "AFDB")))
  saved_path = os.path.join(os.getcwd(),"src","data","path_image_no_mask.pkl")
  with open(saved_path, 'wb') as file:
      pickle.dump(path_image_no_mask, file)

  path_image_mask = list()
  path_image_mask.extend(file_function.get_data_path_by_dictionary(os.path.join(os.getcwd(),"face+mask_image")))
  path_image_mask.extend(file_function.get_data_path_by_dictionary(os.path.join(os.getcwd(),"dataset", "AFDB_mask")))
  saved_path = os.path.join(os.getcwd(),"src","data","path_image_mask.pkl")
  with open(saved_path, 'wb') as file:
      pickle.dump(path_image_mask, file)


## Start train

# Train version 2


In [None]:
# Create embedding model
input_shape = [global_value.IMAGE_SIZE[0], global_value.IMAGE_SIZE[1], 3]
face_net_model = call_instance_FaceNet_with_last_isDense(input_shape, len(label_dict))
face_net_model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
    )
#----Choose path to save per epoch
actual_epochs = 1
for i in range(100):
  last_save_path = path_save_model+"/epoch{}".format(actual_epochs)
  if not os.path.exists(last_save_path):
    break
  actual_epochs += 1

# Load saved model
if (actual_epochs != 1):
  load_path = path_save_model+"/epoch{}".format(actual_epochs-1)
  print("load ",load_path)
  face_net_model = tf.keras.models.load_model(load_path)



# Normal train network
for i in range(global_value.EPOCHS):

  # Measure time
  now = time.time()
  # Read data path from file
  path = os.path.join(os.getcwd(),"src","data","path_image_no_mask.pkl")
  with open(path, 'rb') as f:
      path_image_no_mask = pickle.load(f)
      path_image_no_mask = file_function.get_data_path_with_limit(path_image_no_mask,global_value.IMAGE_EACH_CLASS)
  path = os.path.join(os.getcwd(),"src","data","path_image_mask.pkl")
  with open(path, 'rb') as f:
      path_image_mask = pickle.load(f)
      path_image_mask = file_function.get_data_path_with_limit(path_image_mask,global_value.IMAGE_EACH_CLASS)

  # Combine data path
  path_image_no_mask.extend(path_image_mask)
  random.shuffle(path_image_no_mask)
  # Index label (change label of data from string to number)
  label_index =list()
  for path in path_image_no_mask:
    label = path.split("/")[-2]
    label = label_dict[label]
    label_index.append(label)
  path_dataset = tf.data.Dataset.from_tensor_slices(path_image_no_mask)
  label_dataset = tf.data.Dataset.from_tensor_slices(label_index)
  origin_dataset = tf.data.Dataset.zip((path_dataset, label_dataset))
  # Repeat data
  origin_dataset  = origin_dataset.shuffle(global_value.SHUFFLE_SIZE).repeat(2)
  # Split train, test datase
  train_dataset, test_dataset,_ = format_function.get_dataset_partition(origin_dataset, 0.9,0.1,0)
  # read data from path
  train_dataset = train_dataset.map(format_function.process_image, num_parallel_calls=tf.data.AUTOTUNE)
  test_dataset = test_dataset.map(format_function.process_image, num_parallel_calls=tf.data.AUTOTUNE)
  # augmentation data(flip, rotate,...)
  train_dataset = train_dataset.map(format_function.augment_data, num_parallel_calls=tf.data.AUTOTUNE)
  test_dataset = test_dataset.map(format_function.augment_data, num_parallel_calls=tf.data.AUTOTUNE)

  # batch data
  train_dataset = train_dataset.batch(global_value.BATCH_SIZE)
  test_dataset = test_dataset.batch(global_value.BATCH_SIZE)
  # Set cache and prefetch to improve performance
  train_dataset = train_dataset.prefetch(buffer_size = tf.data.experimental.AUTOTUNE)
  test_dataset = test_dataset.prefetch(buffer_size = tf.data.experimental.AUTOTUNE)
  print("--------------------------big epoch {}--------------------------".format(actual_epochs))
  history = face_net_model.fit(
      train_dataset,
      epochs = 1,
      validation_data = test_dataset
  )
  face_net_model.save(path_save_model+"/epoch{}".format(actual_epochs))
  with open("src/log/log_{}.txt".format(MODEL_NAME), "a") as file_object:
    file_object.write("\n")
    file_object.write("epoch {}, loss {}, accuracy {}, loss_valid {}, accuracy_valid {}, time {}".format(actual_epochs, history.history['loss'], history.history['sparse_categorical_accuracy'], history.history['val_loss'],history.history['val_sparse_categorical_accuracy'], time.time() - now))
  actual_epochs += 1


load  /content/drive/MyDrive/Colab Notebooks/FaceMaskRecognize/save_model/Inception-Restnet-Entropy-ASIAN/epoch1
--------------------------big epoch 2--------------------------

## test embedding

In [None]:
# face_net_model = tf.keras.models.load_model("save_model/align_image_origin36")
# classify = Classify(face_net_model, format_function)
# database_embedding = classify.embedding_all_data_by_directory(os.path.join(os.getcwd(),"dataset","lfw"))
# classify.save_embedding_to_file(database_embedding, os.path.join(os.getcwd(),"encodings","encode_lfw_epoch36.pkl"))
# #Preprocess data
# test_dataset = tf.data.Dataset.list_files("dataset/lfw/*/*",shuffle=False)
# test_dataset = test_dataset.map(format_function.get_label_as_number, num_parallel_calls=tf.data.AUTOTUNE)
# test_dataset = test_dataset.map(format_function.process_image, num_parallel_calls=tf.data.AUTOTUNE)
# test_dataset = test_dataset.filter(lambda image, label: tf.math.not_equal(tf.size(image), 0))
# test_dataset = test_dataset.batch(20)

# # Accuracy
# print(classify.evaluate(test_dataset, database_embedding))

In [None]:
# # Load network
# face_net_model = tf.keras.models.load_model("save_model/face_recognize_entropy47")
# face_net_model =  convert_train_model_to_embedding(face_net_model)
# #Preprocess data
# test_dataset = tf.data.Dataset.list_files("dataset/10_person/*/*",shuffle=False)
# test_dataset = test_dataset.map(format_function.get_label_as_number, num_parallel_calls=tf.data.AUTOTUNE)
# test_dataset = test_dataset.map(format_function.process_image, num_parallel_calls=tf.data.AUTOTUNE)
# test_dataset = test_dataset.filter(lambda image, label: tf.math.not_equal(tf.size(image), 0))
# test_dataset = test_dataset.batch(20)
# # Evaluate the network
# results = face_net_model.predict(test_dataset)

# # Save test embeddings for visualization in projector
# np.savetxt("vecs.tsv", results, delimiter='\t')

# out_m = io.open('meta.tsv', 'w', encoding='utf-8')
# for img, labels in tfds.as_numpy(test_dataset):
#     [out_m.write(str(x) + "\n") for x in labels]
# out_m.close()


# try:
#   from google.colab import files
#   files.download('vecs.tsv')
#   files.download('meta.tsv')
# except:
#   pass