In [None]:
!pip install mtcnn

In [None]:
% matplotlib inline

import matplotlib.image as img
import numpy as np
import pandas as pd 
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.applications import InceptionResNetV2

from sklearn.neighbors import KNeighborsRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from scipy import stats
from PIL import Image 
from mtcnn import MTCNN

print('ready')

In [None]:
config={ 'batch_size' : 20, 'buffer_size' : 40 }


train_tfrecord_path = '/content/drive/My Drive/MachineLearning/data/project_deepfake/all_train.tfrecords'
val_tfrecord_path = '/content/drive/My Drive/MachineLearning/data/project_deepfake/all_val.tfrecords'

image_feature_description = { 'image_raw': tf.io.FixedLenFeature([], tf.string), '_label': tf.io.FixedLenFeature([], tf.int64) }

def _parse_image_function(example_proto):
    return tf.io.parse_single_example(example_proto, image_feature_description)
    
def map_func(target_record):
    img = target_record['image_raw']
    label = target_record['_label']
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.dtypes.cast(img, tf.float32)
    return img, label
    
def resize_and_crop_func(image, label):
    result_image = tf.image.resize(image, (600,600))
    return result_image, label
        
def post_process_func(image, label):
    image = image / 255
#     result_image = my_model_base.preprocess_input(image)
#     onehot_label = tf.one_hot(label, depth=config['num_class'])
    return image, label
    
dataset = tf.data.TFRecordDataset(train_tfrecord_path, compression_type='GZIP')
dataset = dataset.map(_parse_image_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.map(map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.map(resize_and_crop_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(config['buffer_size'])
dataset = dataset.batch(config['batch_size'])
dataset = dataset.map(post_process_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

val_dataset = tf.data.TFRecordDataset(val_tfrecord_path, compression_type='GZIP')
val_dataset = val_dataset.map(_parse_image_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.map(map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.map(resize_and_crop_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
# val_dataset = val_dataset.map(image_aug_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.shuffle(config['buffer_size'])
val_dataset = val_dataset.batch(config['batch_size'])
val_dataset = val_dataset.map(post_process_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
# val_dataset = val_dataset.cache()
val_dataset = val_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
dataset

In [None]:

callbacks_list = [tf.keras.callbacks.EarlyStopping(monitor='val_acc',  # 검증 정확도 모니터링
                                                   patience=5),             # 1 epoch보다 더 길게 정확도 향상되지 않으면 훈련 중지
                  
                  tf.keras.callbacks.ModelCheckpoint(filepath='/content/drive/My Drive/MachineLearning/deepfake_recoard_model_1112.h5',
                                                     monitor='val_loss',    # 검증 loss 모니터링
                                                     save_best_only=True),  # 가장 좋은 모델을 저장
                  
                  tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',  # 검증 loss 모니터링
                                                      factor=0.1,           # 콜백 호출시 학습률 10배로 줄임
                                                      patience=5)]          # 검증손실이 해당 epoch만큼 좋아지지 않으면 콜백 호출

print('callback ready')

In [None]:

model = Sequential()
model.add(Conv2D(filters=32, kernel_size=(3, 3), activation='relu', input_shape=(600, 600, 3)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(filters=128, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(filters=128, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.5))
model.add(Conv2D(filters=256, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(filters=256, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())    
model.add(Dropout(rate=0.5))

model.add(Dense(units=512, activation='relu'))
model.add(Dropout(rate=0.5))
model.add(Dense(units=256, activation='relu'))
model.add(Dropout(rate=0.5))
model.add(Dense(units=1, activation='sigmoid'))

print(model.summary())

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['acc', 'Precision', 'Recall', 'AUC'])


In [None]:
history = model.fit(dataset,
                    # steps_per_epoch=2000,
                    epochs=30,
                    validation_data=val_dataset,
                    callbacks=callbacks_list,
                    validation_steps=50)
        

In [None]:
model.save('/content/drive/My Drive/MachineLearning/deepfake_recoard_model_1112.h5')

In [None]:
% matplotlib inline
import matplotlib.pyplot as plt

train_acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
train_loss = history.history['loss']
val_loss = history.history['val_loss']

print(history.history.keys())
plt.plot(history.history['accuracy'], 'bo', color='r', label='train_acc')
plt.plot(history.history['val_accuracy'], 'b', color='b', label='val_acc')
plt.legend()
plt.show()
plt.plot(history.history['loss'], 'bo', color='r', label='train_loss')
plt.plot(history.history['val_loss'], 'b', color='b', label='val_loss')
plt.legend()
plt.show()

In [None]:
# 기학습 된 모델 불러오기
model = tf.keras.models.load_model('/content/drive/My Drive/MachineLearning/deepfake_recoard_model_1112.h5')
# function ClickConnect() {var buttons = document.querySelectorAll("colab-dialog.yes-no-dialog paper-button#cancel"); buttons.forEach(function(btn) { btn.click(); }); console.log("1분마다 자동 재연결"); document.querySelector("colab-toolbar-button#connect").click(); } setInterval(ClickConnect,1000*60);
