In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
import os

## Set Hyper Parameters

In [2]:
learning_rate = 0.001
training_epochs = 15
batch_size = 100

tf.random.set_seed(777)

## Checkpoint를 저장할 디렉토리 생성

In [3]:
cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'minst_cnn_seq'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

## MNIST 데이터 로드
* CNN은 입력으로 4차원 tensor가 필요하다.
* (batch, height, width, channel)

In [4]:
mnist = keras.datasets.mnist
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

In [5]:
(train_images, train_labels), (test_images, test_labels) = mnist.load_data() # 3차원 tensor의 형태임.
# train_images = (600, 28, 28), train_labels = (60000,)
# test_images = (10000, 28, 28), test_labels= (10000,)

train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.
train_images = np.expand_dims(train_images, axis=-1) # 4차원 tensor로 만들기 위한 과정. 마지막 차원을 추가하기 때문에 -1, 3도 가능
test_images = np.expand_dims(test_images, axis=-1)

# one-hot encoding    
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)    

# data를 batch size만큼 잘라서 후에 model에 공급하기 위한 부분.    
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(
                buffer_size=100000).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


## 모델 만들기
* 모델 구조
  * convolution layer, filter=32
  * pooling layer
  * convolution layer, filter=64
  * pooling layer
  * convolution layer, filter=128
  * pooling layer
  * fully connected layer

In [7]:
def create_model():    
    model = keras.Sequential()
    model.add(keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME', 
                                  input_shape=(28, 28, 1))) # 첫 번째 layer에는 input_shape을 써주는 것이 좋다.
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Flatten()) # feature map을 vector로 펴주는 과정
    model.add(keras.layers.Dense(256, activation=tf.nn.relu))
    model.add(keras.layers.Dropout(0.4)) # dense layer는 parameter가 굉장히 많기 때문에 적용해줌.
    model.add(keras.layers.Dense(10))
    return model

In [8]:
model = create_model()
model.summary() # 모델이 내 생각대로 만들어졌는지 확인

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_3 (Conv2D)            (None, 28, 28, 32)        320       
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 14, 14, 64)        18496     
_________________________________________________________________
max_pooling2d_4 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
max_pooling2d_5 (MaxPooling2 (None, 4, 4, 128)         0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 2048)             

## Loss function

In [9]:
@tf.function
def loss_fn(model, images, labels):
    logits = model(images, training=True) # trining을 True로 하면 dropout이 적용된다.
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(
        y_pred=logits, y_true=labels, from_logits=True))    
    return loss

## Gradient 계산

In [10]:
@tf.function
def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables) # gradient는 back-propagation 방법으로 계산됨

## 모델 정확도 계산

In [11]:
@tf.function
def evaluate(model, images, labels):
    logits = model(images, training=False) # 정확도 계산하는 거는 training과정이 아니니까 false
    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    return accuracy

## optimizer 선택

In [12]:
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

## Checkpoint 만들기

In [13]:
checkpoint = tf.train.Checkpoint(cnn=model)


## Training

In [14]:
@tf.function
def train(model, images, labels):
    grads = grad(model, images, labels)
    optimizer.apply_gradients(zip(grads, model.trainable_variables)) 

In [None]:
# train my model
print('Learning started. It takes sometime.')
for epoch in range(training_epochs):
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0    
    
    for images, labels in train_dataset: # batch size만큼 이미지 가져오기
        train(model, images, labels)
        loss = loss_fn(model, images, labels)
        acc = evaluate(model, images, labels)
        avg_loss = avg_loss + loss
        avg_train_acc = avg_train_acc + acc
        train_step += 1
    avg_loss = avg_loss / train_step
    avg_train_acc = avg_train_acc / train_step
    
    for images, labels in test_dataset:        
        acc = evaluate(model, images, labels)        
        avg_test_acc = avg_test_acc + acc
        test_step += 1    
    avg_test_acc = avg_test_acc / test_step    

    print('Epoch:', '{}'.format(epoch + 1), 'loss =', '{:.8f}'.format(avg_loss), 
          'train accuracy = ', '{:.4f}'.format(avg_train_acc), 
          'test accuracy = ', '{:.4f}'.format(avg_test_acc))
    
    checkpoint.save(file_prefix=checkpoint_prefix)

print('Learning Finished!')

Learning started. It takes sometime.
Epoch: 1 loss = 0.16675605 train accuracy =  0.9572 test accuracy =  0.9872
Epoch: 2 loss = 0.04016740 train accuracy =  0.9901 test accuracy =  0.9894
Epoch: 3 loss = 0.02720951 train accuracy =  0.9929 test accuracy =  0.9915
Epoch: 4 loss = 0.01922376 train accuracy =  0.9954 test accuracy =  0.9926
Epoch: 5 loss = 0.01509581 train accuracy =  0.9963 test accuracy =  0.9911
Epoch: 6 loss = 0.01076387 train accuracy =  0.9974 test accuracy =  0.9923
Epoch: 7 loss = 0.00940884 train accuracy =  0.9978 test accuracy =  0.9926
Epoch: 8 loss = 0.00804093 train accuracy =  0.9983 test accuracy =  0.9929
Epoch: 9 loss = 0.00596689 train accuracy =  0.9987 test accuracy =  0.9906
