<참조 : https://www.tensorflow.org/tutorials/distribute/custom_training?hl=ko>

## custon training loops

In [2]:
import tensorflow as tf

import numpy as np
import os

print(tf.__version__)

2.7.0-dev20210708


### 01. Dataset Download

In [3]:
fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


- 배열에 한 차원 추가 : shape=(28, 28, 1)
  - 모델에서 첫 번째 층이 합성곱 층 : 4D 입력을 요구
  - (batch_size, height, width, channels)
  - batch_size 차원은 나중에 추가

In [4]:
train_images = train_images[..., None]
test_images = test_images[..., None]

train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

### 02. 분산 전략 수립
- tf.distribute.MirroredStrategy 동작
  - 모든 변수와 모델 그래프는 장치(replicas)에 복제됩니다
  - 입력은 장치에 고르게 분배되어 들어갑니다
  - 각 장치는 주어지는 입력에 대해서 손실(loss)과 그래디언트를 계산합니다
  - 그래디언트들을 전부 더함으로써 모든 장치들 간에 그래디언트들이 동기화됩니다
  - 동기화된 후에 동일한 업데이트가 각 장치에 있는 변수의 복사본에 동일하게 적용됩니다

In [6]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [7]:
print('장치 수: {}'.format(strategy.num_replicas_in_sync))

장치 수: 1


### 03. 입력 파이프라인 설정

In [9]:
BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA *strategy.num_replicas_in_sync

EPOCHS = 10

In [11]:
with strategy.scope():
    train_dataset = tf.data.Dataset.from_tensor_slices((train_images, 
                    train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)

    test_dataset = tf.data.Dataset.from_tensor_slices((test_images, 
                    test_labels)).batch(GLOBAL_BATCH_SIZE)
    test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)


### 04. 모델 만들기

In [12]:
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    
    return model

In [13]:
checkpoint_dir = './cT_training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')

### 05. 손실함수  
  
  - tf.distribute.Strategy 손실 계산 방법
    - GPU 4개가 있고 입력 배치 크기가 64
      - 입력 배치 하나가 GPU 4개에 분배됩니다
      - 각 장치는 크기가 16(64 / 4 = 16)인 입력을 받습니다
    - 각 장치에 있는 모델은 해당 입력에 대해 정방향 계산(forward pass)을 수행하고 손실 계산
      - 손실은 16이 아니라 64로 나누어야 합니다  
          
    - 그래디언트들이 각 장치에서 계산된 다음, 모든 장치를 동기화하기 위해 이 그래디언트 값들을 전부 더하기 때문

##### 텐서플로우
- custom training loops 작성
  - 샘플당 손실을 더하고 GLOBAL_BATCH_SIZE로 나누어야 합니다
  - scale_loss = tf.reduce_sum(loss) * (1./GLOBAL_BATCH_SIZE) 
  - 또는 tf.nn.compute_average_loss 함수 사용
    - 샘플당 손실과 선택적으로 샘플 가중치, GLOBAL_BATCH_SIZE를 매개변수 값으로 받고 스케일이 조정된 손실을 반환  
      
  - 규제 손실을 사용하는 메델 : 장치 개수로 손실 값을 스케일 조정해야 합니다
    - tf.nn.scale_regularization_loss 함수 사용하여 처리할 수 있습니다  
      
  - tf.reduct_main 사용은 추천하지 않습니다. 실제 장치당 배치 크기는 각 단계(step)마다 다를 수 있기 때문  
    
  - 축소(reduction)와 스케일 조정은 케라스의 model.compile과 model.fit에서 자동으로 수행됩니다

In [14]:
with strategy.scope():
    # reduction을 'None'으로 설정합니다.
    # 축소는 나중에 하고, GLOBAL_BATCH_SIZE로 나눌 수 있습니다
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        reduction=tf.keras.losses.Reduction.NONE)
    
    # loss_fn=tf.keras.losses.sparse_categorical_crossentropy를 사용해도 됩니다
    
    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

### 06. 손실과 정확도를 기록
  - 지표(metrics) : 테스트 손실과 훈련 정확도, 테스트 정확도를 기록
  - .result() 사용 : 누적된 통계값들을 볼 수 있습니다

In [15]:
with strategy.scope():
    test_loss = tf.keras.metrics.Mean(name='test_loss')
    
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        name='train_accuracy')
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        name='test_accuracy')

### 07. 훈련 루프
- 모델과 옵티마이저는 'strategy.scope'에서 만들어져야 합니다

In [17]:
with strategy.scope():
    model = create_model()
    
    optimizer = tf.keras.optimizers.Adam()
    
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

In [23]:
with strategy.scope():
    def train_step(inputs):
        images, labels = inputs
        
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = compute_loss(labels, predictions)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        train_accuracy.update_state(labels, predictions)
        return loss
    
    def test_step(inputs):
        images, labels = inputs
        
        predictions = model(images, training=False)
        t_loss = loss_object(labels, predictions)
        
        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)

In [24]:
with strategy.scope():
    # 'run'은 주어진 계산을 복사하고, 분산된 입력으로 계산을 수행합니다.
    
    @tf.function
    def distributed_train_step(dataset_inputs):
        per_replica_losses = strategy.run(train_step,
                                         args=(dataset_inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM,
                              per_replica_losses, axis=None)
    
    @tf.function
    def distributed_test_step(dataset_inputs):
        return strategy.run(test_step, args=(dataset_inputs,))
    
    for epoch in range(EPOCHS):
        # train loop
        total_loss = 0.0
        num_batches = 0
        
        for x in train_dist_dataset:
            total_loss += distributed_train_step(x)
            num_batches += 1
        train_loss = total_loss / num_batches
        
        # test loop
        for x in test_dist_dataset:
            distributed_test_step(x)
            
        if epoch % 2 == 0:
            checkpoint.save(checkpoint_prefix)
            
        template = ('에포크 {}, 손실: {}, 정확도: {}, 테스트 손실: {}, '
                   '테스트 정확도:{}')
        
        print(template.format(epoch+1, 
                             train_loss,
                             train_accuracy.result()*100,
                             test_loss.result(),
                             test_accuracy.result()*100))
        
        test_loss.reset_states()
        train_accuracy.reset_states()
        test_accuracy.reset_states()

에포크 1, 손실: 0.5145577788352966, 정확도: 81.40833282470703, 테스트 손실: 0.38852205872535706, 테스트 정확도:86.18000030517578
에포크 2, 손실: 0.341645210981369, 정확도: 87.71333312988281, 테스트 손실: 0.33769115805625916, 테스트 정확도:87.73999786376953
에포크 3, 손실: 0.29546600580215454, 정확도: 89.3316650390625, 테스트 손실: 0.31100043654441833, 테스트 정확도:88.73999786376953
에포크 4, 손실: 0.2626648247241974, 정확도: 90.37000274658203, 테스트 손실: 0.2878546118736267, 테스트 정확도:89.58000183105469
에포크 5, 손실: 0.23875999450683594, 정확도: 91.24333190917969, 테스트 손실: 0.27885299921035767, 테스트 정확도:89.75
에포크 6, 손실: 0.21810439229011536, 정확도: 92.02166748046875, 테스트 손실: 0.27133476734161377, 테스트 정확도:90.02999877929688
에포크 7, 손실: 0.2026626169681549, 정확도: 92.39166259765625, 테스트 손실: 0.2953889071941376, 테스트 정확도:89.60000610351562
에포크 8, 손실: 0.18445955216884613, 정확도: 93.15666198730469, 테스트 손실: 0.2716430127620697, 테스트 정확도:90.29000091552734
에포크 9, 손실: 0.16890506446361542, 정확도: 93.7683334350586, 테스트 손실: 0.25887611508369446, 테스트 정확도:90.94999694824219
에포크 10, 손실: 0.155281931

- 이 예제는 train_dist_dataset과 test_dist_dataset을 for x in ... 구조를 통해서 반복합니다
- 스케일이 조정된 손실은 distributed_train_step의 반환값입니다. tf.distribute.Strategy.reduce 호출을 사용해서 장치들 간의 스케일이 조정된 손실 값을 전부 합칩니다. 그리고 나서 tf.distribute.Strategy.reduce 반환 값을 더하는 식으로 배치 간의 손실을 모읍니다
- tf.keras.Metrics는 tf.distribute.Strategy.run에 의해서 실행되는 train_step과 test_step 함수 안에서 업데이트 되어야 합니다
- tf.distribute.Strategy.run는 그 전략안에 포함된 각 지역 복제 모델로부터 결과값을 반환해 줍니다. 그리고 이 결과를 사용하는 몇 가지 방법들이 있습니다. tf.distribute.Strategy.reduce를 이용하여 값들을 합칠 수 있습니다. tf.distribute.Strategy.experimental_local_results를 사용해서 결과값(지역 복제 모델 당 하나의 결과값)에 들어있는 값들의 리스트를 얻을 수도 있습니다

### 08. 최신 체크포인트를 불러와서 테스트하기

In [25]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
    name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

In [26]:
@tf.function
def eval_step(images, labels):
    predictions = new_model(images, training=False)
    eval_accuracy(labels, predictions)

In [27]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
    eval_step(images, labels)
    
print('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: {}'.format(
    eval_accuracy.result()*100))

전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: 90.94999694824219


### 09. 데이터셋에 대해 반복작업을 하는 다른 방법들

#### 09. 01. 반복자(iterator) 사용하기

In [29]:
with strategy.scope():
    for _ in range(EPOCHS):
        total_loss = 0.0
        num_batches = 0
        train_iter = iter(train_dist_dataset)
        
        for _ in range(10):
            total_loss += distributed_train_step(next(train_iter))
            num_batches += 1
            
        average_train_loss = total_loss / num_batches
        
        template = ('에포크 {}, 손실: {}, 정확도: {}')
        print(template.format(epoch+1,
                             average_train_loss,
                             train_accuracy.result()*100))
        train_accuracy.reset_states()

에포크 10, 손실: 0.14657068252563477, 정확도: 94.375
에포크 10, 손실: 0.15810582041740417, 정확도: 94.6875
에포크 10, 손실: 0.14461565017700195, 정확도: 95.46875
에포크 10, 손실: 0.16386650502681732, 정확도: 94.53125
에포크 10, 손실: 0.13576357066631317, 정확도: 94.21875
에포크 10, 손실: 0.10729018598794937, 정확도: 96.09375
에포크 10, 손실: 0.13755884766578674, 정확도: 95.3125
에포크 10, 손실: 0.15774953365325928, 정확도: 93.59375
에포크 10, 손실: 0.16980639100074768, 정확도: 92.96875
에포크 10, 손실: 0.14132282137870789, 정확도: 94.6875


#### 09. 02. tf.function 내부에서 반복하기

In [30]:
with strategy.scope():
    @tf.function
    def distributed_train_epoch(dataset):
        total_loss = 0.0
        num_batches = 0
        for x in dataset:
            per_replica_losses = strategy.run(train_step,
                                             args=(x,))
            total_loss += strategy.reduce(
                tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
            num_batches += 1
        return total_loss / tf.cast(num_batches, dtype=tf.float32)
    
    for epoch in range(EPOCHS):
        train_loss = distributed_train_epoch(train_dist_dataset)
        
        template = ('Epoch {}, Loss: {}, Accuracy: {}')
        print(template.format(epoch+1, train_loss,
                             train_accuracy.result()*100))
        
        train_accuracy.reset_states()



Epoch 1, Loss: 0.1434594839811325, Accuracy: 94.71833038330078
Epoch 2, Loss: 0.13020296394824982, Accuracy: 95.16666412353516
Epoch 3, Loss: 0.120378278195858, Accuracy: 95.47332763671875
Epoch 4, Loss: 0.1110435351729393, Accuracy: 95.82333374023438
Epoch 5, Loss: 0.10154811292886734, Accuracy: 96.24166870117188
Epoch 6, Loss: 0.09127387404441833, Accuracy: 96.6483383178711
Epoch 7, Loss: 0.08671723306179047, Accuracy: 96.77833557128906
Epoch 8, Loss: 0.07656186819076538, Accuracy: 97.19332885742188
Epoch 9, Loss: 0.07146268337965012, Accuracy: 97.34833526611328
Epoch 10, Loss: 0.06595834344625473, Accuracy: 97.54666900634766


### 10. 장치 간의 훈련 손실 기록하기
- 일반적인 규칙으로, tf.keras.Metrics를 사용하여 샘플당 손실 값을 기록하고 장치 내부에서 값이 합쳐지는 것을 피해야 합니다
- tf.metrics.Meas : 손실의 스케일을 조정하는 계산이 수행되기 때문에 추천하지 않습니다

##### 예
- 장치 두 개
- 샘플들이 각 두 개 장치에 의해 처리됩니다
- 손실값을 산출합니다. 각각의 장치에 대해 [2, 3]과 [4, 5]
- Global batch size = 4  
  
    
    
- 샘플당 손실값을 계산
  - (2 + 3) / 4 = 1.25
  - (4 + 5) / 4 = 2.25  
    
  - tf.metrics.Mean : result()가 메서드 호출
    - total : (2 + 3 + 4 + 5) / 4 = 3.5
    - count : 2
    - total / count = 1.75  
      
        
- 두 개 같은 값인 것 같은데, 왜 추천하지 않는지 아직 이해하지 못하고 있습니다
  - [1, 2, 3]과 [8, 10] 이런 경우는 달라질 듯