In [None]:
import os
import glob
import shutil

import cv2
import pandas as pd 
import matplotlib.pyplot as plt
import matplotlib.image as mimg
import tensorflow as tf
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc

#tensorflow
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.applications import mobilenet_v2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Input, Flatten, SeparableConv2D
from tensorflow.keras.layers import GlobalAveragePooling2D, BatchNormalization, Concatenate
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

seed_number = 24
tf.random.set_seed(seed_number)
np.random.seed(seed_number)

In [None]:
root = "../spoof/"
input_dir = os.path.join(root,"data")
train_dir = os.path.join(input_dir, 'train')
val_dir = os.path.join(input_dir, 'val')
test_dir = os.path.join(input_dir, 'test')

In [None]:
dataset_dir = [dir for dir in sorted(os.listdir(input_dir)) if os.path.isdir(os.path.join(input_dir, dir))]
label_name = [subdir for subdir in sorted(os.listdir(train_dir)) if os.path.isdir(os.path.join(train_dir, subdir))]

# информация о папках
print(f"Main directories\t: {os.listdir(root)}")
print(f"Dataset sub-directories\t: {dataset_dir}")
print(f"Train set directory\t: {label_name}")

In [None]:
# делаем validation set
def val_make (source, dest):
    files = os.listdir(source)
    for f in files:
        if np.random.rand(1) < 0.3:
            shutil.move(source + '/'+ f, dest + '/'+ f)
    return 

In [None]:
val_make(train_dir + "/spoof", val_dir + "/spoof")

In [None]:
val_make(train_dir + "/real", val_dir + "/real")

In [None]:
dir_dict = {'train': train_dir, 'val': val_dir, 'test': test_dir}

In [None]:
case_count, img_disp, set_length  = {}, {}, {}

for key, val in dir_dict.items():
    case_count[key] = {}
    img_disp[key] = {}
    set_count = 0
    
    for label in label_name:
        label_list = list(sorted(glob.glob(os.path.join(val, label, "*.png"))))
        if len(label_list) == 0:
          continue

        case_count[key][label] = len(label_list)
        set_count += len(label_list)
        
        select_img_id = np.random.randint(len(label_list)-1)
        # print(select_img_id)
        img_disp[key][label] = label_list[select_img_id]
        
    set_length[key] = set_count

case_count_df = pd.DataFrame(case_count)
img_disp_df = pd.DataFrame(img_disp)
print(f"Dataset summary:\n\n{case_count_df}")

In [None]:
# Instantiate data generator for training procedure
train_datagen = ImageDataGenerator(rescale = 1./255,
                                   rotation_range = 20,
                                   width_shift_range = 0.2,
                                   height_shift_range = 0.2,
                                   shear_range = 0.15,
                                   zoom_range = 0.15,
                                   horizontal_flip = True,
                                   fill_mode="nearest",
                                  validation_split = 0.2)

val_datagen = ImageDataGenerator(rescale = 1./255)


In [None]:
test_datagen = ImageDataGenerator(rescale=1./255) 

In [None]:
#характиристики датасета
train_batch_size = 32
val_batch_size = 32
flow_from_directory = 224
img_height = 224
img_width = 224

train_gen = train_datagen.flow_from_directory(train_dir,
                                              batch_size = train_batch_size,
                                              class_mode = 'binary',
                                              target_size = (img_width, img_height),
                                              seed = seed_number)

val_gen = val_datagen.flow_from_directory(val_dir,
                                          batch_size = val_batch_size,
                                          class_mode = 'binary',
                                          target_size = (img_width, img_height),
                                          seed = seed_number)

test_gen = test_datagen.flow_from_directory(input_dir,
                                              batch_size = 1,
                                              class_mode = None,
                                              classes=['test'],
                                              target_size = (img_width, img_height),
                                              seed = seed_number,
                                              shuffle=False)

In [None]:
print(f'Train set batch shape\t: {next(train_gen)[0].shape}')
print(f'Val set batch shape\t: {next(val_gen)[0].shape}')
print(f'Test set batch shape\t: {next(test_gen)[0].shape}')

In [None]:
#загружаем MobileNetV2 (она «легче» vvg16)
pretrain_net = mobilenet_v2.MobileNetV2(input_shape = (img_width, img_height, 3),
                                        include_top = False,
                                        weights = 'imagenet')

freeze_before = None 
if freeze_before:
    for layer in pretrain_net.layers:
        if layer.name == freeze_before:
            break
        else:
            layer.trainable = False    
print(pretrain_net.summary())

In [None]:
# дополняем слоями
x = pretrain_net.output
x = Conv2D(32, (3, 3), activation='relu')(x)
x = Dropout(rate=0.2, name='extra_dropout1')(x)
x = GlobalAveragePooling2D()(x)
x = Dense(1, activation='sigmoid', name='classifier')(x)

model = Model(inputs=pretrain_net.input, outputs=x, name='mobilenetv2_spoof')
print(model.summary())

In [None]:
num_epochs = 15  # обычно после 12 эпохи ничего не изменяется
learning_rate = 5e-5  

In [None]:
#компилируем модель
model.compile(optimizer = Adam(lr=learning_rate),
              loss = 'binary_crossentropy',
              metrics = ['acc'])

In [None]:
# определяем веса классов
train_length = len(train_gen.classes)

weight0 = train_length / case_count_df['train'][label_name[0]] * (1 / len(label_name))
weight1 = train_length / case_count_df['train'][label_name[1]] * (1 / len(label_name))
class_weight = {0: weight0, 1: weight1}

print(f"Class weight\t: {class_weight}")

In [None]:
plateau_scheduler = ReduceLROnPlateau(factor=0.2, patience=3, verbose=1, 
                                      min_delta= 0.005, min_lr=5e-7)

In [None]:
history = model.fit(train_gen,
                    epochs = num_epochs,
                    steps_per_epoch = set_length['train'] // train_batch_size,
                    validation_data = val_gen,
                    validation_steps = 1,
                    callbacks = [plateau_scheduler],
                    class_weight=class_weight)

history_df = pd.DataFrame.from_dict(history.history)
history_df.to_csv(os.path.join(root, "history_14_10.csv"), index=False)

In [None]:
train_accuracy = history.history['acc']
val_accuracy = history.history['val_acc']
train_loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(train_accuracy))
plt.figure(figsize=(12,4))

# точность
plt.subplot(1,2,1)
plt.plot(epochs, train_accuracy, 'bo', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['train', 'val'], loc='lower right')

# потери
plt.subplot(1,2,2)
plt.plot(epochs, train_loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['train', 'val'], loc='upper right')

plt.show()

In [None]:
model.save('saved_model_spoof')

In [None]:
prediction = model.predict(test_gen,verbose=1,steps=len(test_gen))

In [None]:
predicted_class = np.argmax(prediction,axis=-1)

In [None]:
filenames=test_gen.filenames

In [None]:
results=pd.DataFrame({"file":filenames,"pred":prediction[:,0]} )

In [None]:
results.to_excel("output.xlsx")