In [3]:
!cp "/content/drive/MyDrive/2021_2_class/Cat_and_dog.tar" "/content/data/Cat_and_dog.tar"

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!tar -xvf "/content/data/Cat_and_dog.tar"

test.txt
train.txt
val.txt
Cat_Dog_100_val/
Cat_Dog_100_val/Cat_10447.jpg
Cat_Dog_100_val/Cat_10448.jpg
Cat_Dog_100_val/Cat_10449.jpg
Cat_Dog_100_val/Cat_1045.jpg
Cat_Dog_100_val/Cat_10450.jpg
Cat_Dog_100_val/Cat_10451.jpg
Cat_Dog_100_val/Cat_10452.jpg
Cat_Dog_100_val/Cat_10453.jpg
Cat_Dog_100_val/Cat_10454.jpg
Cat_Dog_100_val/Cat_10455.jpg
Cat_Dog_100_val/Cat_10456.jpg
Cat_Dog_100_val/Cat_10457.jpg
Cat_Dog_100_val/Cat_10458.jpg
Cat_Dog_100_val/Cat_10459.jpg
Cat_Dog_100_val/Cat_1046.jpg
Cat_Dog_100_val/Cat_10460.jpg
Cat_Dog_100_val/Cat_10461.jpg
Cat_Dog_100_val/Cat_10462.jpg
Cat_Dog_100_val/Cat_10463.jpg
Cat_Dog_100_val/Cat_10464.jpg
Cat_Dog_100_val/Cat_10465.jpg
Cat_Dog_100_val/Cat_10466.jpg
Cat_Dog_100_val/Cat_10467.jpg
Cat_Dog_100_val/Cat_10468.jpg
Cat_Dog_100_val/Cat_10469.jpg
Cat_Dog_100_val/Cat_1047.jpg
Cat_Dog_100_val/Cat_10470.jpg
Cat_Dog_100_val/Cat_10471.jpg
Cat_Dog_100_val/Cat_10472.jpg
Cat_Dog_100_val/Cat_10473.jpg
Cat_Dog_100_val/Cat_10474.jpg
Cat_Dog_100_val/Cat_10475.jpg

In [None]:
# -*- coding:utf-8 -*-
from random import shuffle, random

import tensorflow as tf
import numpy as np
import easydict # 저는 이 기능을 전역변수로서 사용하려고 합니다.
import matplotlib.pyplot as plt
import os

FLAGS = easydict.EasyDict({"img_size": 256,
                           
                           "tr_txt_path": "/content/train.txt",

                           "tr_img_path": "/content/Cat_Dog_500_train/",

                           "val_txt_path": "/content/val.txt",

                           "val_img_path": "/content/Cat_Dog_100_val/",

                           "te_txt_path": "/content/test.txt",

                           "te_img_path": "/content/Cat_Dog_200_test/",
                           
                           "batch_size": 10,

                           "pre_checkpoint": False, # 학습이 멈추어 이어서 시작해야 할때, 혹은 테스트를 진행할 때

                           "pre_checkpoint_path": "",   # 학습된 weight가 저장된 경로

                           "save_checkpoint": "/content/drive/MyDrive/2021_2_class/checkpoint",
                           
                           "epochs": 50,
                           
                           "lr": 0.001})

optim = tf.keras.optimizers.Adam(FLAGS.lr)

def tr_input(img_list, lab_list):

    img = tf.io.read_file(img_list)
    img = tf.image.decode_jpeg(img, 3)
    img = tf.image.resize(img, [FLAGS.img_size, FLAGS.img_size]) / 255.
    
    if random() > 0.5:
      img = tf.image.flip_left_right(img) #o nline data augmentation


    lab = lab_list

    return img, lab

def te_input(img_list, lab_list):

    img = tf.io.read_file(img_list)
    img = tf.image.decode_jpeg(img, 3)
    img = tf.image.resize(img, [FLAGS.img_size, FLAGS.img_size]) / 255.

    lab = lab_list

    return img, lab


def cal_loss(model, images, labels):

    with tf.GradientTape() as tape:

        logits = model(images, True)

        loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(labels, logits)

    grads = tape.gradient(loss, model.trainable_variables)
    # weight들에 대한 gradient, back-propagation
    # 첫번 째 conv 에서의 grads의 모양은 ==> [7, 7, 3, 64] ==> 7x7x3x64
    optim.apply_gradients(zip(grads, model.trainable_variables))
    # updataed weight = previous weight - (lr x grads)
    # a = [1,2,3], b = [4,5,6] ==> a와 b를 zip하면 ==> [[1,4], [2,5], [3,6]]


    return loss

def cal_acc(model, val_images, val_labels):

  logits = model(val_images, False) # [1, 1]
  predict = tf.nn.sigmoid(logits[0]) # sigmoid value
  predict = tf.where(predict > 0.5, 1., 0.)
  predict = tf.cast(predict, tf.float32)

  ground_truth = val_labels #[1, ]
  ground_truth = tf.cast(ground_truth, tf.float32)

  acc = tf.equal(predict, ground_truth) # 같으면 True, 틀리면 False
  acc = tf.cast(acc, tf.float32)



  return acc
def main():
    
    # 1. 모델 정의
    model = tf.keras.applications.VGG16(include_top=False, input_shape=(FLAGS.img_size, FLAGS.img_size, 3)) #CL
    # fine tuning, transfer learning
    x = model.output    # [B, 7, 7, 512]
    h = tf.keras.layers.Flatten()(x)
    h = tf.keras.layers.Dense(1024)(h)
    h = tf.keras.layers.ReLU()(h)
    h = tf.keras.layers.Dense(1024)(h)
    h = tf.keras.layers.Dense(1)(h) #NLP
    model = tf.keras.Model(inputs= model.input, outputs=h)

    model.summary()

    if FLAGS.pre_checkpoint:
        ckpt = tf.train.Checkpoint(model=model, optim=optim)
        ckpt_manager = tf.train.CheckpointManager(ckpt, FLAGS.pre_checkpoint_path, 5)

        if ckpt_manager.latest_checkpoint:
            ckpt.restore(ckpt_manager.latest_checkpoint)
            print("Restored!!!!!")

    # 2. 입력 정의 (학습, 테스트, 확인용)
    tr_img_data = np.loadtxt(FLAGS.tr_txt_path, dtype="<U100", skiprows=0, usecols=0)
    tr_img_data = [FLAGS.tr_img_path + data for data in tr_img_data]
    tr_img_data = np.array(tr_img_data)
    tr_lab_data = np.loadtxt(FLAGS.tr_txt_path, dtype=np.int32, skiprows=0, usecols=1)

    val_img_data = np.loadtxt(FLAGS.val_txt_path, dtype="<U100", skiprows=0, usecols=0)
    val_img_data = [FLAGS.val_img_path + data for data in val_img_data]
    val_img_data = np.array(val_img_data)
    val_lab_data = np.loadtxt(FLAGS.val_txt_path, dtype=np.int32, skiprows=0, usecols=1)

    te_img_data = np.loadtxt(FLAGS.te_txt_path, dtype="<U100", skiprows=0, usecols=0)
    te_img_data = [FLAGS.te_img_path + data for data in te_img_data]
    te_img_data = np.array(te_img_data)
    te_lab_data = np.loadtxt(FLAGS.te_txt_path, dtype=np.int32, skiprows=0, usecols=1)

    val_generator = tf.data.Dataset.from_tensor_slices((val_img_data, val_lab_data))
    val_generator = val_generator.map(te_input) # 학습이 아니면 shuffle X
    val_generator = val_generator.batch(1) # 한장씩
    val_generator = val_generator.prefetch(tf.data.experimental.AUTOTUNE)

    te_generator = tf.data.Dataset.from_tensor_slices((te_img_data, te_lab_data))
    te_generator = te_generator.map(te_input) # 학습이 아니면 shuffle X
    te_generator = te_generator.batch(1) # 한장씩
    te_generator = te_generator.prefetch(tf.data.experimental.AUTOTUNE)



    count = 0
    for epoch in range(FLAGS.epochs):

        tr_generator = tf.data.Dataset.from_tensor_slices((tr_img_data, tr_lab_data))
        tr_generator = tr_generator.shuffle(len(tr_img_data))
        tr_generator = tr_generator.map(tr_input)
        tr_generator = tr_generator.batch(FLAGS.batch_size)
        tr_generator = tr_generator.prefetch(tf.data.experimental.AUTOTUNE)

        tr_iter = iter(tr_generator)
        tr_idx = len(tr_img_data) // FLAGS.batch_size
        for step in range(tr_idx):
            batch_images, batch_labels = next(tr_iter)

            loss = cal_loss(model, batch_images, batch_labels)

            if count % 10 == 0:
                print("Epoch: {} [{}/{}] loss = {}".format(epoch, step + 1, tr_idx, loss))
            
            if count % 100 == 0 and count != 0:
              val_it = iter(val_generator)
              val_acc = 0.
              for v in range(len(val_img_data)):
                val_images, val_labels = next(val_it)
                val_acc += cal_acc(model, val_images, val_labels)

              te_it = iter(te_generator)
              te_acc = 0.
              for t in range(len(te_img_data)):
                te_images, te_labels = next(te_it)
                te_acc += cal_acc(model, te_images, te_labels)
              
              #print("Epochs = {}, total_steps = {} Val accuracy = {} %", format(epoch, count, (val_acc / len(val_img_data))*100.))
              #print("Epochs = {}, total_steps = {} Test accuracy = {} %", format(epoch, count, (te_acc / len(te_img_data))*100.))
              print(f'Epochs = {epoch}, total_steps = {count}, Val accuracy = {val_acc / len(val_img_data) * 100} %')
              print(f'Epochs = {epoch}, total_steps = {count}, Test accuracy = {te_acc / len(te_img_data) * 100} %')

            if count % 200 == 0:
                num = str(int(count) // 200)
                model_dir = "%s/%s" % (FLAGS.save_checkpoint, num)

                ckpt = tf.train.Checkpoint(model=model, optim=optim)
                if not os.path.isdir(model_dir):
                    os.makedirs(model_dir)
                    print("Made {} files to save weights".format(num))

                ckpt_dir = model_dir + "/" + "fix_vgg_16_model_{}.ckpt".format(count)
                ckpt.save(ckpt_dir)



            count += 1





    # Train, Test, Validation
 
if __name__ == "__main__":
    main()


Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_8 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 256, 256, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 256, 256, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 128, 128, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 128, 128, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 128, 128, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 64, 64, 128)       0   