## Residual Unit

bn - relu - weight(conv) - bn - relu - weight(conv) - addition

In [1]:
import tensorflow as tf
import numpy as np

In [36]:
class ResidualUnit(tf.keras.Model):
    def __init__(self, filter_in, filter_out, kernel_size):
        super(ResidualUnit, self).__init__()
        
        self.bn1 = tf.keras.layers.BatchNormalization()
        # 생성자에서 relu는 안 만들 것이다.
        # 왜냐하면 학습 가능한 파라미터가 없기 때문에
        # 사용(call)할 때만 구성해 주"면 된다
        self.conv1 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='same')
        
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.conv2 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='same')
        
        # channel 개수가 서로 같은 경우
        if filter_in == filter_out:
            self.identity = lambda x: x # identity mapping
        
        # channel 개수가 서로 다른 경우
        else:
            # 입력 데이터의 ch을 filter_out 채널 개수로 변경
            self.identity = tf.keras.layers.Conv2D(filter_out, (1, 1), padding='same')
            
    def call(self, x, training=False):
        # training 정말 중요!
        # 왜? BN은 훈련때와 추론할 때가 다르기 때문에
        h = self.bn1(x, training=training) # 추론과정에서는 False, 훈련과정에서는 True
        h = tf.nn.relu(h) #nn = neural net
        h = self.conv1(h)

        h = self.bn2(h, training=training)
        h = tf.nn.relu(h)
        h = self.conv2(h)

        return self.identity(x) + h

## Residual Layer
residual unit을 이용해서 feature extraction 하는 레이어

In [37]:
# Residual Layer?? residual unit 여러개를 묶은 것
class ResnetLayer(tf.keras.Model):
    # Residual Unit을 여러개 사용할 것이기 때문에
    # Filters는 리스트 형태로 한 번에 저장해둔다
    def __init__(self, filter_in, filters, kernel_size):
        super(ResnetLayer, self).__init__()
        self.sequence = list()
        
        # [16] + [32, 32, 32] 이면?
        # res unit은 3개, skip connection은 3단계
        for f_in, f_out in zip([filter_in] + list(filters), filters):
            # zip 안에서 list길이가 서로 다르면 짧은 쪽의 len만큼 loop가 돈다
            
            # for i, j in zip([1,2,3,4], [1,2,3]):
            #     print(i, j)
            self.sequence.append(ResidualUnit(f_in, f_out, kernel_size))  
            
    
    def call(self, x, training=False):
        for unit in self.sequence:
            # ResidualUnit클래스 객체 만든 것 아래서 호출하기
            x = unit(x, training=False) # unit의 call호출됨
        return x

In [22]:
# 더하면 어케 될까?
filter_in = 16
filters = [32, 32, 32]

[filter_in] + list(filters)

[16, 32, 32, 32]

In [23]:
# 몇 번 돌까?
for i, j in zip([1,2,3,4], [1,2,3]):
    print(i, j)

1 1
2 2
3 3


ResnetLayer(filter_in, (after_res_unit_in, after_res_unit_in), kernel_size)

## ResNet 종합

In [38]:
class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        # feature extraction = Conv으로 초기 feature 뽑기
        # output은 28 x 28 x 8 유지됨 ㅇㅇ (mnist 기준)
        self.conv1 = tf.keras.layers.Conv2D(8, (3, 3), padding='same', activation='relu') 
        
        # filters, resnet layer 두 개 잖아 그러니 filter도 2개! , kernel_size
        self.res1 = ResnetLayer(8, (16, 16), (3, 3)) # output(28, 28, 16)
        self.pool1 = tf.keras.layers.MaxPool2D((2, 2)) # output(14, 14, 16)
        
        self.res2 = ResnetLayer(16, (32, 32), (3, 3)) # output(14, 14, 32)
        self.pool2 = tf.keras.layers.MaxPool2D((2, 2)) # output(7, 7, 32)
        
        self.res3 = ResnetLayer(32, (64, 64), (3, 3)) # output(7, 7, 64)
        
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(128, activation='relu')
        self.dense2 = tf.keras.layers.Dense(10, activation='softmax')
    
    # training 변수 반드시...!
    def call(self, x, training=False):
        x = self.conv1(x)
       
        x = self.res1(x, training=training)
        x = self.pool1(x)
        
        x = self.res2(x, training=training)
        x = self.pool2(x)
        
        x = self.res3(x, training=training)
        
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        
        return x

In [31]:
mnist = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train, X_test = X_train/255., X_test/255.

X_train = X_train[..., tf.newaxis]
X_test = X_test[..., tf.newaxis]

In [32]:
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(1024).batch(32)
test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

## 자동미분

In [39]:
@tf.function
def train_step(model, images, labels, loss_object, optimizer, train_loss, train_accuracy):
    with tf.GradientTape() as tape:
        # 훈련 과정이므로 BN True로 ㄱㄱ
        prediction = model(images, training=True)
        loss = loss_object(labels, prediction)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    train_loss(loss)
    train_accuracy(labels, prediction)

# inference 단계    
@tf.function
def test_step(model, images, labels, loss_object, optimizer, test_loss, test_accuracy):
    # BN 학습 ㄴㄴ
    prediction = model(images, training=False)
    t_loss = loss_object(labels, prediction)
    test_loss(t_loss)
    test_accuracy(labels, prediction)

In [40]:
model = ResNet()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.RMSprop(lr=1e-2)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

## 연습

In [67]:
import tensorflow as tf
import numpy as np

class ResidualUnit(tf.keras.Model):
    def __init__(self, filter_in, filter_out, kernel_size):
        super(ResidualUnit, self).__init__()
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.conv1 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='same')
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.conv2 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='same')
        
        if filter_in == filter_out:
            self.identity = lambda x: x
        else:
            self.identity = tf.keras.layers.Conv2D(filter_out, (1, 1), padding='same')
    
    def call(self, x, training=False):
        h = self.bn1(x, training=training)
        h = tf.nn.relu(h)
        h = self.conv1(h)
        h = self.bn2(h, training=training)
        h = tf.nn.relu(h)
        h = self.conv2(h)
        return self.identity(x) + h
    
class ResnetLayer(tf.keras.Model):
    def __init__(self, filter_in, filters, kernel_size):
        super(ResnetLayer, self).__init__()
        self.sequences = list()
        for f_in, f_out in zip([filter_in]+list(filters), filters):
            self.sequences.append(ResidualUnit(f_in, f_out, kernel_size))
    def call(self, x, training=False):
        for unit in self.sequences:
            x = unit(x, training=training)
        return x

class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        # feature extraction 용
        self.conv1 = tf.keras.layers.Conv2D(6, (3, 3), padding='same', activation='relu')
        self.res1 = ResnetLayer(6, (12, 12, 12), (3, 3))
        self.pool1 = tf.keras.layers.MaxPool2D((2,2))
        self.res2 = ResnetLayer(12, (24, 24, 24), (3, 3))
        self.pool2 = tf.keras.layers.MaxPool2D((2, 2))
        self.res3 = ResnetLayer(24, (48, 48, 48), (3, 3))
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(100, activation='relu')
        self.dense2 = tf.keras.layers.Dense(10, activation='softmax')
    def call(self, x, training=False):
        x = self.conv1(x)
        x = self.res1(x, training=training)
        x = self.pool1(x)
        x = self.res2(x, training=training)
        x = self.pool2(x)
        x = self.res3(x, training=training)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        return x

@tf.function
def train_step(model, x, y, loss_obj, optm, train_loss, train_acc):
    with tf.GradientTape() as tape:
        prediction = model(x, training=True)
        loss = loss_obj(y, prediction)
    grads = tape.gradient(loss, model.trainable_variables)
    optm.apply_gradients(zip(grads, model.trainable_variables))
    train_loss(loss)
    train_acc(y, prediction)

@tf.function
def test_step(model, x, y, loss_obj, optm, test_loss, test_acc):
    prediction = model(x, training=False)
    t_loss = loss_obj(y, prediction)
    test_loss(t_loss)
    test_acc(y, prediction)

In [55]:
mnist = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train[..., tf.newaxis]/255.
X_test = X_test[..., tf.newaxis]/255.
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(112).batch(32)
test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

In [68]:
model = ResNet()
loss_obj = tf.keras.losses.SparseCategoricalCrossentropy()
optm = tf.keras.optimizers.Nadam(learning_rate=0.01)
train_acc = tf.keras.metrics.SparseCategoricalAccuracy(name='train_acc')
train_loss = tf.keras.metrics.Mean(name='train_loss')
test_acc = tf.keras.metrics.SparseCategoricalAccuracy(name='test_acc')
test_loss = tf.keras.metrics.Mean(name='test_loss')

In [None]:
for epoch in range(10):
    for x, y in train_ds:
        train_step(model, x, y, loss_obj, optm, train_loss, train_acc)
    for test_x, test_y in test_ds:
        test_step(model, test_x, test_y, loss_obj, optm, test_loss, test_acc)
    print(f'epoch : {epoch + 1}, train loss : {train_loss:.3f}, train acc : {train_acc:.3f}, test loss : {test_loss:.3f}, test acc : {test_acc:.3f}')



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

