In [None]:
import os
import cv2
# data_path = '/home/workspace/data/.train/.task146/data/train'
# os.chdir(data_path)
# os.getcwd()
# os.listdir()
import pandas as pd
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from tensorflow import keras
import cv2
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.optimizers import Adam
from PIL import ImageFile

In [None]:
data_path = '/home/workspace/data/.train/.task146/data/train'
os.chdir(data_path)
os.getcwd()
os.listdir()

In [None]:
import pandas as pd
# (real:0, fake:1)
df = pd.read_csv('open_train_label.txt', sep=' ', header=None)
df.columns = ['filename','label']
df

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as Image
plt.figure(figsize=(10,10))
for i in range(20):
    plt.subplot(4,5,i+1)
    img = Image.imread(df.filename[i+3000])
    plt.title(df.label[i])
    plt.imshow(img)


In [None]:
df.label = df.label.astype(str)
df

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

image_size = (331,331)
batch_size = 32
directory = data_path
train_datagen = ImageDataGenerator(rescale=1./255., validation_split=0.2)


train_generator = train_datagen.flow_from_dataframe(
                                    df,
                                    directory,
                                    x_col = 'filename',
                                    y_col = 'label',
                                    subset = 'training',
                                    target_size = image_size,
                                    color_mode = 'rgb',
                                    class_mode = 'binary',
                                    batch_size = batch_size,
                                    shuffle = True,
                                    seed = 33)
valid_generator = train_datagen.flow_from_dataframe(
                                    df,
                                    directory,
                                    x_col = 'filename',
                                    y_col = 'label',
                                    subset = 'validation',
                                    target_size = image_size,
                                    color_mode = 'rgb',
                                    class_mode = 'binary',
                                    batch_size = batch_size,
                                    shuffle = True,
                                    seed = 33)



print(train_generator.n)
print(valid_generator.n)

In [None]:
feature_model = tf.keras.applications.NASNetLarge(
                                                    input_shape=(331,331,3),
                                                    include_top=False,
                                                    weights="imagenet"
)

In [None]:
def build_feature_model(feature_model):
    inputs = Input(shape=(None,None,3))
    x = feature_model(inputs)
    outputs = GlobalAveragePooling2D()(x)
    model = Model(inputs=inputs, outputs=outputs)
    
    return model

feature_model_GAP = build_feature_model(feature_model)
feature_model_GAP.summary()

In [None]:
# train 데이터는 수가 너무많음 따라서 이방법은 train데이터에는 사용 X
train_labels = []
train_features = []

for i in tqdm(range(train_generator.n//batch_size)):
    
    x, y = train_generator.next()
    train_labels.extend(y)
    feature = feature_model_GAP.predict(x)
    train_features.extend(feature)
    
train_features = np.array(train_features)
train_labels = np.array(train_labels)

print(train_features.shape)
print(train_labels.shape)

In [None]:
# user-workspace로 chdir - DenseNet/ HSV
# save path
current_path = '/home/workspace/user-workspace/'
os.chdir(current_path)

train_path = './DeepFake/train_features.npy'
tlabel_path = './DeepFake/train_labels.npy'

valid_path = './DeepFake/valid_features.npy'
vlabel_path = './DeepFake/valid_labels.npy'

In [None]:
np.save(train_features,train_path )
np.save(train_labels,tlabel_path )

In [None]:
# train 데이터는 수가 너무많음 따라서 이방법은 train데이터에는 사용 X
valid_labels = []
valid_features = []

for i in tqdm(range(valid_generator.n//batch_size)):
    
    x, y = valid_generator.next()
    print(x.shape)
    valid_labels.extend(y)
    feature = feature_model_GAP.predict(x)
    valid_features.extend(feature)
    
valid_features = np.array(valid_features)
valid_labels = np.array(valid_labels)

print(valid_features.shape)
print(valid_labels.shape)

In [None]:
np.save(valid_features,valid_path )
np.save(valid_labels,vlabel_path )

In [None]:
def build_model(num_outputs=1):
    
    inputs = Input(shape=feature_model_GAP.output.shape[1:])

    x = Dense(1024, activation='relu')(inputs)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)

    x = Dense(512, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    
    x = Dense(256, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    
    x = Dense(128, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)

    
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs,
                 outputs = outputs)
    
    model.compile(optimizer=Adam(lr=4e-4),
                  loss= 'binary_crossentropy',
                  metrics='accuracy')
    
    return model

In [None]:
# 모델 구축
model = build_model()

model.summary()

In [None]:
weight_path="./DeepFake/best_model.h5"

cp = ModelCheckpoint(weight_path, monitor='val_accuracy', verbose=1,
                        save_best_only=True, save_weights_only=True)

reduceLROnPlat = ReduceLROnPlateau(monitor='val_loss', factor=0.5, 
                                   patience=3, 
                                   min_lr=1e-6)
es = EarlyStopping(monitor="val_loss", 
                      mode="min", 
                      patience=10)
callbacks_list = [cp, es, reduceLROnPlat]

In [None]:
initial_epochs = 100

history = model.fit(x = train_features,
                    y = train_labels,
                    validation_data = (valid_features, valid_labels),
                    epochs = initial_epochs,
                    steps_per_epoch = train_generator.n//batch_size,
                    validation_steps = valid_generator.n//batch_size,
                    batch_size=batch_size,
                    callbacks = callbacks_list
                    )

model.summary()

In [None]:
# 학습 곡선
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
model2 = build_model()

# 가중치 로드
model2.load_weights(weight_path)

# 모델 평가
# model2.evaluate(valid_features,valid_labels)

In [None]:
data_path = '/home/workspace/data/.train/.task146/data/public/'
os.chdir(data_path)
os.getcwd()
os.listdir()

In [None]:
public_df = pd.read_csv('open_public_label.txt', header=None)
public_df.columns = ['filename']
public_df

In [None]:
public_df.filename = '/home/workspace/data/.train/.task146/data/public/' + public_df.filename
public_df

In [None]:
## prediction
# 0 --> 0.5 이하/ 1 --> 0.5 이상
public_labels = []
unique_Y = [0, 1]
model = model2
image_size = (331,331)


for i in tqdm(range(len(public_df))):
    image_path = public_df.filename[i]

    # 이미지 불러오기 및 이미지 크기 조정
    img = keras.preprocessing.image.load_img(image_path, target_size=image_size)
    # 이미지를 array로 변경
    img = keras.preprocessing.image.img_to_array(img)
    # 각 픽셀값을 0과 1사이의 값으로 조정
    img = img / 255.0
    # 모델의 인풋 타입에 맞게 차원을 하나 늘림
    img = np.expand_dims(img, axis=0)
    feature = feature_model_GAP.predict(img)
    #img를 Inception V3의 특징 추출기 feature_model을 활용하여 특징 벡터를 생성. 
    prediction = model.predict(feature)[0]

    label = int(np.round(prediction))
    
    public_labels.append(label)
public_labels = np.array(public_labels)
np.save('/home/workspace/user-workspace/DeepFake/public_labels.npy')

In [None]:
public_labels = np.load('/home/workspace/user-workspace/DeepFake/public_labels.npy')
public_labels

In [None]:
public_df['label'] = public_labels
public_df

In [None]:
public_prediction_path ='/home/workspace/user-workspace/prediction/public_prediction.txt'

In [None]:
public_df.to_csv(public_prediction_path, index=False, header=None, sep=' ')

In [None]:
data_path = '/home/workspace/data/.train/.task146/data/private/'
os.chdir(data_path)
os.getcwd()
os.listdir()

In [None]:
private_df = pd.read_csv('open_private_label.txt', header=None)
private_df.columns = ['filename']
private_df

In [None]:
private_df.filename = '/home/workspace/data/.train/.task146/data/private/' + private_df.filename
private_df

In [None]:
def pre_processing(image_path,image_size):
    # 이미지 불러오기 및 이미지 크기 조정
    img = keras.preprocessing.image.load_img(image_path, target_size=image_size)
    # 이미지를 array로 변경
    img = keras.preprocessing.image.img_to_array(img)
    # 각 픽셀값을 0과 1사이의 값으로 조정
    img = img / 255.0
    # 모델의 인풋 타입에 맞게 차원을 하나 늘림
    img = np.expand_dims(img, axis=0)
    return img

In [None]:
## prediction
# 0 --> 0.5 이하/ 1 --> 0.5 이상
private_labels = []
unique_Y = [0, 1]
model = model2
image_size = (331,331)
batch_size = 32

for i in tqdm(range(0,len(private_df), batch_size)):
    images = []
    
    for j in range(batch_size):
        image_path = private_df.filename[i+j]
        img = pre_processing(image_path,image_size)
        images.extend(img)
         
    images = np.array(images) # shape = (32, 331, 331, 3)
    feature = feature_model_GAP.predict(images) # = (32, 4032)
    #img를 Inception V3의 특징 추출기 feature_model을 활용하여 특징 벡터를 생성. 
    prediction = model.predict(feature)
#     print(prediction.shape)
#     label = int(np.round(prediction))
    
    private_labels.extend(prediction)
#     print(len(private_labels))
    
private_labels = np.array(private_labels)
np.save('/home/workspace/user-workspace/DeepFake/private_labels.npy',private_labels)

In [None]:
private_df

In [None]:
private_df['label'] = private_labels
private_df

In [None]:
# private_df.label.round().astype('int')
private_df.label = private_df.label.round().astype('int')
private_df

In [None]:
private_prediction_path ='/home/workspace/user-workspace/prediction/private_prediction.txt'

In [None]:
private_df.to_csv(private_prediction_path, index=False, header=None, sep=' ')