<a href="https://colab.research.google.com/github/whiteibescu/AI/blob/main/%EC%8B%A4%EC%8A%B53_resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow.keras import Model, layers
import numpy as np
from matplotlib import pyplot as plt
import random

random.seed(1)
np.random.seed(1)
tf.random.set_seed(1)

In [2]:
(x_trainval, y_trainval), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

num_classes = 10
num_features = 784
# Preprocess the data (these are NumPy arrays)
x_trainval = x_trainval.reshape(-1,28,28,1)
x_test = x_test.reshape(-1,28,28,1)

x_trainval = x_trainval.astype("float32") / 255
x_test = x_test.astype("float32") / 255

y_trainval = y_trainval.astype("float32")
y_test = y_test.astype("float32")

# sklearn을 활용하여 간편하게 validation_set 만들기 
from sklearn.model_selection import train_test_split
x_train, x_valid, y_train, y_valid = train_test_split(x_trainval, y_trainval, test_size= 1/6, shuffle=True, stratify = y_trainval, random_state=34)



Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [3]:

# block_layer를 class로 묶습니다. ResNet에서 skip_connection을 수행하기 위한 class 입니다. 반드시 resnet18 구조와 함께 확인하면서 보시길 바랍니다. 
class BasicBlock(layers.Layer):

    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = layers.Conv2D(filter_num, kernel_size=(3, 3), strides=stride, padding="same",
                                   kernel_initializer = tf.keras.initializers.he_normal())
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2D(filter_num, kernel_size=(3, 3), strides=1, padding="same",
                                   kernel_initializer = tf.keras.initializers.he_normal())
        self.bn2 = layers.BatchNormalization()
        if stride != 1: # Resnet에서 x+F(x)를 수행할때 두개의 차원이 맞을때와 맞지 않을때가 있습니다. 이 부분은 stride != 1이므로 맞지 않은 경우이며 차원을 맞추는 방법이 들어가 있습니다. 
            self.downsample = tf.keras.Sequential()  #새롭게 self.downsample 을 정의 한 뒤, 
            self.downsample.add(layers.Conv2D(filter_num  ,kernel_size=(1, 1), strides=stride)) #kernel은 1로 한채 위의 convolution layer에서 수행한 것과 같은 stride를 통해 x의 차원을 F(x)에 맞게 줄여줍니다
            self.downsample.add(layers.BatchNormalization()) #논문에서 batch_normalization을 했기에 그대로 적어넣었습니다.
        else:
            self.downsample = lambda x: x # 이때는 위의 stride(if문의 stride값이 아닙니다.)값이 1인 경우이기 때문에 x와 F(x)의 차원이 같아서 그대로 x값을 내려서 더해줍니다. 


    def call(self, x, is_training= False):
        residual = self.downsample(x) #block을 통과해 나오는 값(X)를 residual 이라고 정의합니다

        x = self.conv1(x)
        x = self.bn1(x, training=is_training) # training=is_training을 통해 테스트 중에는 이 layer를 수행하지 않습니다. 
        x = tf.nn.relu(x)
        x = self.conv2(x)
        x = self.bn2(x, training=is_training)
        x = tf.nn.relu(layers.add([residual, x])) # 이 작없을 통해 x+F(X)의 값이 나옵니다. 이때 [residual, x] 에서 residual은 두단계 위의 x 이고, x는 layer를 지나쳐서 나온 F(x)를 의미합니다.  

        return x
    
def make_basic_block_layer(filter_num, blocks, stride=1): #위에서 만든 basic block를 하나로 묶어줍니다. 
    res_block = tf.keras.Sequential()
    res_block.add(BasicBlock(filter_num, stride=stride))

    for _ in range(1, blocks): #아래의 코드를 통해 위에서 정의한 BasicBlock을 쌓습니다. 사실 resnet18에서는 항상 (2개의 layer로 묶인) 하나의 블럭만 쌓기 때문에 필요없지만, 이를 이용해 다른 수의 block을 쌓는 다른 resnet도 쉽게 구현할 수 있습니다. 
        res_block.add(BasicBlock(filter_num, stride=1))

    return res_block

#아래에서 block을 쌓을때 stride에 주는 값은 제공한 실습자료 또는 논문에 나와있는 architecture를 확인하셔서 비교하셔야 이해가 가능합니다. 
class ResNet18(Model):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.conv1 = layers.Conv2D(64, kernel_size=(7, 7), strides=2, padding="same",
                                   kernel_initializer = tf.keras.initializers.he_normal())
        self.bn1 = layers.BatchNormalization()
        self.ac1 = layers.Activation(tf.nn.relu)
        self.pool1 = layers.MaxPool2D((3, 3), strides=2, padding="same")
        self.layer1 = make_basic_block_layer(filter_num=64, blocks=2) #이곳에서 위에서 만든 block이 하나 쌓입니다. 논문을 보시면 이 과정에서는 Input과 output의 크기가 같습니다.
        self.layer2 = make_basic_block_layer(filter_num=128, blocks=2, stride=2) # stride=2로 했기 때문에 위에서 if strid != 1 문이 실행됩니다. 즉 x의 차원을 줄여주어 맞추는 역할을 합니다. 
        self.layer3 = make_basic_block_layer(filter_num=256, blocks=2, stride=2) #위와 같음
        self.layer4 = make_basic_block_layer(filter_num=512, blocks=2, stride=2) #위와 같음 
        self.avgpool = layers.GlobalAveragePooling2D()
        self.out = layers.Dense(num_classes)

    def call(self, x, is_training= False):
        x = self.conv1(x)
        x = self.bn1(x, training=is_training)
        x = self.ac1(x)
        x = self.pool1(x)
        x = self.layer1(x, training=is_training)
        x = self.layer2(x, training=is_training)
        x = self.layer3(x, training=is_training)
        x = self.layer4(x, training=is_training)
        x = self.avgpool(x)
        x = self.out(x)
        if not is_training:
            x = tf.nn.softmax(x)

        return x

In [4]:
# resnet18 = ResNet18()

# optimizer = tf.optimizers.SGD(0.0001, momentum=0.9)
# print("Train ResNet")
# resnet18 = Train_model(resnet18, optimizer, 5)

In [5]:
resnet18 = ResNet18()

resnet18.compile(loss = 'sparse_categorical_crossentropy', optimizer = tf.keras.optimizers.Adam(), metrics = ['accuracy'])
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=4) 



In [6]:
resnet18.fit(x_train, y_train,
             epochs = 1,
             batch_size = 200,
             callbacks = es,
             validation_data = (x_valid, y_valid)
            )



<tensorflow.python.keras.callbacks.History at 0x7fa71006cb50>

In [7]:
resnet18.evaluate(x_test, y_test)



[0.08303102105855942, 0.9740999937057495]