In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist
import os
from time import time
print(tf.__version__)

2.3.1


In [2]:
# check point function

def load(model, checkpoint_dir):
    print(" [*] Reading checkpoints...")

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt :
        ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
        checkpoint = tf.train.Checkpoint(dnn=model)
        checkpoint.restore(save_path=os.path.join(checkpoint_dir, ckpt_name))
        counter = int(ckpt_name.split('-')[1])
        print(" [*] Success to read {}".format(ckpt_name))
        return True, counter
    else:
        print(" [*] Failed to find a checkpoint")
        return False, 0

def check_folder(dir):
    if not os.path.exists(dir):
        os.makedirs(dir)
    return dir

# Data load & pre-processing function

In [3]:
# mnist 데이터 로드
def load_mnist():
    (x_train, y_train),(x_test, y_test) = mnist.load_data()
    
    # channel을 하나 추기해서 tensorflow input shape양식으로 설정해준다
    # tensorflow input shape : [batch_size, height, width, channel]
    # axis : 몇번째 위치에 확장할 것인가
    x_train = np.expand_dims(x_train, axis = -1) # [N, 28, 28] -> [N, 28, 28, 1]
    x_test = np.expand_dims(x_test, axis = -1) # [N, 28, 28] -> [N, 28, 28, 1]
    
    # 정규화를 하면 학습을 더 빨리하고 Local optimum 에 빠지는 가능성을 줄일 수 있다
    x_train, x_test = normalize(x_train, x_test) # [0 ~ 255] -> [0 ~ 1]
    # One-hot 인코딩 ex) 3 = [0,0,0,3,0,0,0,0,0,0]
    # to_categorical의 숫자는 데이터의 label의 개수
    y_train = to_categorical(y_train, 10) # [N,] -> [N, 10]
    y_test = to_categorical(y_test, 10) # [N,] -> [N, 10]
    
    return x_train, y_train, x_test, y_test
    
def normalize(x_train, x_test):
    x_train = x_train.astype(np.float32) / 255.0
    x_test = x_test.astype(np.float32) / 255.0
    
    return x_train, x_test

# Performance function

In [4]:
# loss
def loss_fn(model, images, labels):
    # 모델에 이미지를 넣어서 그 모델이 이미지의 숫자 값이 뭔지 출력을 하고
    logits = model(images, training=True)
    # 출력값과 라벨과 softmax_cross_entropy를 이용해서 로스 구한다
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_pred=logits, y_true=labels, from_logits=True))
    return loss

# accuracy / 정확도
def accuracy_fn(model, images, labels):
    # 모델에 이미지를 넣어서 예의 이미지가 숫무자가 뭔지 구한다
    logits = model(images, training=False)
    # -1은 인덱스를 뜻한다 : [batch size, label_dim]
    # tf.argmax() : logits과 labels에서 숫자가 큰값의 위치를 알려달라 = [label_dim](10개의 라벨 중에서 예측하는 값)
    # tf.equal() : logits과 labels값이 같다면 True 다르면 False를 return
    prediction = tf.equal(tf.argmax(logits, -1), tf.argmax(labels, -1))
    # tf.cast(prediction, tf.float32) 
    # : prediction값을 숫자값으로 변환 -> accuracy를 구할 때 bollean값보다는 숫자값으로 해야 계산이 가능하기 때문 
    # True = 1 / False = 0
    accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))
    return accuracy

# gradient
# loss에 해당하는 모델의 gradient값을 구하는 함수
def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

# Model function

In [5]:
def flatten():
    return tf.keras.layers.Flatten()

def dense(channel, weight_init):
    # units = out channel, bias = 사용여부, kernel_initializer = weight_initialize
    return tf.keras.layers.Dense(units=channel, use_bias=True, kernel_initializer=weight_init)

def relu():
    return tf.keras.layers.Activation(tf.keras.activations.relu)

# Create model (class version)

In [6]:
# create models <class>

# * 클래스로 모델을 만드려면 'tf.keras_model' 상속 필수
class create_model(tf.keras.Model):
    # label_dim = 네트워크의 로짓을 구할 때 최종적으로 몇개의 아웃풋을 내보낼 것인지
    def __init__(self,label_dim):
        super(create_model, self).__init__()
    
        # initializer.RandomNormal() : 평균이 0 분산이 1인 가우시안 분포로 랜덤한 웨이트 설정
#         weight_init = tf.keras.initializers.RandomNormal()        
          # xavier를 이용할 때
       # weight_init = tf.keras.initializers.glorot_uniform()
        # ReLu를 이용할 때
        weight_init = tf.keras.initializers.he_uniform()
        
        
        # 모델정의
        self.model = tf.keras.Sequential()

        self.model.add(flatten()) # [N, 28, 28, 1] -> [N, 784]

        for i in range(2):
            # [N, 784] -> [N, 256] -> [N, 256]
            self.model.add(dense(256, weight_init))
            self.model.add(relu())

        # 모델의 로짓구하기
        self.model.add(dense(label_dim, weight_init)) # [N, 256] -> [N, 10]

    # 모델을 불렀을 때 아웃풋을 어떻게 낼 것 인가
    def call(self, x, training=None, mask=None):
        x = self.model(x)

        return x

# Create model (function version)

In [7]:
# create model <function>

def create_model_function(label_dim) :
    weight_init = tf.keras.initializers.he_uniform()

    model = tf.keras.Sequential()
    model.add(flatten())

    for i in range(2) :
        model.add(dense(256, weight_init))
        model.add(relu())

    model.add(dense(label_dim, weight_init))

    return model

# Define data & hyper-parameter

In [8]:
# parameters

""" dataset """
train_x, train_y, test_x, test_y = load_mnist()

""" parameters """
learning_rate = 0.001
batch_size = 128

training_epochs = 1
training_iterations = len(train_x) // batch_size

label_dim = 10

train_flag = True

""" Graph Input using Dataset API """
# 데이터셋 API를 이용해서 각각의 데이터를 네트워크에 넣는 작업을 하기전에
# train data = 60000장 / test = 10000장의 이미지를 가지고 있는데 한꺼번에 네트워크에 집어넣으면 네트워크에게 부담을 주게됨으로
# 데이터들을 batch_size만큼 잘라서 넣어준다

# shuffle(buffer_size = 100000) :데이터셋 섞기, buffer_size의 값은 데이터값보다 크게 설정한다

# prefetch(buffer_size = batch_size)
# : 네트워크가 데이터를 batch_size만큼 잘라서 학습을 하고 있을때 미리 메모리에 데이터를 batch_size만큼 잘라서 올려놓아라
# -> 속도가 더 빨라진다

# batch(batch_size) : batch를 batch_size 몇개만큼 던져줄 것인지

# repeat() : 한번으로 끝내는 것이 아니라 계속해서 진행한다
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).\
shuffle(buffer_size = 100000).\
prefetch(buffer_size = batch_size).\
batch(batch_size, drop_remainder=True)


test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).\
shuffle(buffer_size = 100000).\
prefetch(buffer_size = len(test_x)).\
batch(len(test_x))

In [9]:
# model
""" Model """
# class
# network = create_model(label_dim)
# function
network = create_model_function(label_dim)

""" Training """
# loss minimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

""" Writer """
checkpoint_dir = 'checkpoints'
logs_dir = 'logs'

model_dir = 'nn_relu'

checkpoint_dir = os.path.join(checkpoint_dir, model_dir)
check_folder(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, model_dir)
logs_dir = os.path.join(logs_dir, model_dir)

In [10]:
if train_flag :
    # 네트워크가 학습을 하다가 끊겼을 때 다시 재학습을 이뤄낸다
    # 학습종료 후 테스트 이미지에 대해 테스트를 가능하게 함
    # 학습되어서 저장된 웨이트를 불러오는 데 도움
    checkpoint = tf.train.Checkpoint(dnn=network)

    # create writer for tensorboard
    summary_writer = tf.summary.create_file_writer(logdir=logs_dir)
    start_time = time()

    # restore check-point if it exits
    could_load, checkpoint_counter = load(network, checkpoint_dir)    

    if could_load:
        start_epoch = (int)(checkpoint_counter / training_iterations)        
        counter = checkpoint_counter        
        print(" [*] Load SUCCESS")
    else:
        start_epoch = 0
        start_iteration = 0
        counter = 0
        print(" [!] Load failed...")
    
    # train phase
    with summary_writer.as_default():  # for tensorboard
        for epoch in range(start_epoch, training_epochs):
            for idx, (train_input, train_label) in enumerate(train_dataset):                
                grads = grad(network, train_input, train_label)
                # gradient 적용후 학습을 시킴
                optimizer.apply_gradients(grads_and_vars=zip(grads, network.variables))

                train_loss = loss_fn(network, train_input, train_label)
                train_accuracy = accuracy_fn(network, train_input, train_label)

                for test_input, test_label in test_dataset:                
                    test_accuracy = accuracy_fn(network, test_input, test_label)

                tf.summary.scalar(name='train_loss', data=train_loss, step=counter)
                tf.summary.scalar(name='train_accuracy', data=train_accuracy, step=counter)
                tf.summary.scalar(name='test_accuracy', data=test_accuracy, step=counter)

                print(
                    "Epoch: [%2d] [%5d/%5d] time: %4.4f, train_loss: %.8f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                    % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy,
                       test_accuracy))
                counter += 1  
        # weight 파일저장
        checkpoint.save(file_prefix=checkpoint_prefix + '-{}'.format(counter))

# test phase      
else :
    _, _ = load(network, checkpoint_dir)
    for test_input, test_label in test_dataset:    
        test_accuracy = accuracy_fn(network, test_input, test_label)

    print("test_Accuracy: %.4f" % (test_accuracy))

 [*] Reading checkpoints...
 [*] Success to read nn_relu-468-1
 [*] Load SUCCESS
